一个完善的RAG流程 需要做到以下几点
(1)文档准备【文档收集,切分,向量化】
(2)文档检索【多路召回,粗排,重排rerank】
(3)后处理,置信度评估,结果过滤
(4)大模型生成结果
架构图【待补充】
技术方案
1.文本分块模型 Embedding-v4
- Embedding模型:使用外部传入embedding_model
(类型EmbeddingModel
)来获取文本的向量表示
- 相似度计算:使用余弦相似度(cosine similarity)计算句子/文本块之间的语义相似度
- 语言支持:支持中英文,参数控制(默认为中Language.ZH
)
2. 分块算法及执行逻辑
核心算法
1. 基于语义相似度的分块:
- 使用滑动窗口计算相邻文本片段的语义相似度
- 当相似度低于阈值时,在该位置进行分块
列举两种分组模式:
- window
模式:使用固定大小的窗口计算相似度
def _group_sentences_window(self, sentences: list[SemanticSentence]) -> list[list[SemanticSentence]]:
"""
基于窗口模式将句子分组
Args:
sentences: 句子列表
Returns:
句子组列表
"""
similarities = self._compute_window_similarities(sentences)
split_indices = self._get_split_indices(similarities, self.similarity_threshold)
groups = [
sentences[split_indices[i]:split_indices[i+1]]
for i in range(len(split_indices) - 1)
]
return groups
- cumulative
模式:累积计算相似度【基于累计的模式将句子分组】
def _group_sentences_cumulative(self, sentences: list[SemanticSentence]) -> list[list[SemanticSentence]]:
"""
基于累积模式将句子分组
Args:
sentences: 句子列表
Returns:
句子组列表
"""
groups = []
current_group = sentences[:self.min_sentences]
current_embedding = self._compute_group_embedding(current_group)
for sentence in sentences[self.min_sentences:]:
# 将新句子与当前组的平均embedding比较
similarity = self._get_semantic_similarity(current_embedding, sentence.embedding)
if similarity >= self.similarity_threshold:
# 添加到当前组
current_group.append(sentence)
# 更新平均embedding
current_embedding = self._compute_group_embedding(current_group)
else:
# 开始新的组
if current_group:
groups.append(current_group)
current_group = [sentence]
current_embedding = sentence.embedding
# 添加最后一组
if current_group:
groups.append(current_group)
return groups
2. 阈值计算:
列举三种方式确定相似度阈值:
- 直接指定阈值(0-1之间)
- 基于百分位数自动计算
def _calculate_threshold_via_percentile(self, sentences: list[SemanticSentence]) -> float:
"""
通过百分位数计算相似度阈值
Args:
sentences: 句子列表
Returns:
相似度阈值
"""
# 计算所有相似度
all_similarities = self._compute_window_similarities(sentences)
return float(np.percentile(all_similarities, 100 - self.similarity_percentile))
- 通过二分搜索自动寻找最优阈值
def _calculate_threshold_via_binary_search(self, sentences: list[SemanticSentence]) -> float:
"""
通过二分搜索计算最优相似度阈值
Args:
sentences: 句子列表
Returns:
最优相似度阈值
"""
# 获取token计数和累计token计数
token_counts = [sent.token_count for sent in sentences]
cumulative_token_counts = np.cumsum([0] + token_counts)
# 计算所有相似度
similarities = self._compute_window_similarities(sentences)
# 计算相似度的中位数和标准差
median = np.median(similarities)
std = np.std(similarities)
# 设置阈值的搜索范围
low = max(median - 1 * std, 0.0)
high = min(median + 1 * std, 1.0)
# 初始化计数器和阈值
iterations = 0
threshold = (low + high) / 2
# 二分搜索最优阈值
while abs(high - low) > self.threshold_step:
threshold = (low + high) / 2
# 获取分割点
split_indices = self._get_split_indices(similarities, threshold)
# 获取分割点处的累计token数
split_token_counts = np.diff(cumulative_token_counts[split_indices])
# 检查所有分块是否满足大小要求
if all(self.min_chunk_size <= count <= self.chunk_size for count in split_token_counts):
break
# 检查是否有分块超出最大大小
elif any(count > self.chunk_size for count in split_token_counts):
low = threshold + self.threshold_step
# 检查是否有分块小于最小大小
else:
high = threshold - self.threshold_step
iterations += 1
if iterations > 10:
g_warning("阈值计算迭代次数过多,停止...")
break
return threshold
### 执行流程
1. 文本预处理:
使用标点符号(如".", "!", "?"等)将文本分割成句子
处理过长的句子,确保每个句子不超过最大token限制
示范:
def _split_sentences(self, text: str) -> list[str]:
"""
快速分割句子的方法,同时考虑token数量限制
Args:
text: 输入文本
Returns:
分割后的句子列表
"""
t = text
for c in self.delim:
if self.include_delim == "prev":
t = t.replace(c, c + self.sep)
elif self.include_delim == "next":
t = t.replace(c, self.sep + c)
else:
t = t.replace(c, self.sep)
# 初始分割
splits = [s for s in t.split(self.sep) if s != ""]
# 将短句与前一句合并,同时考虑token数量限制
current = ""
sentences = []
current_tokens = 0
for s in splits:
# 估算当前句子的token数
s_tokens = int(len(s) * self.get_lang_token()) # 使用与 _count_tokens 相同的估算方法
# 如果单个句子就超过chunk_size,先进行切割
if s_tokens > self.chunk_size:
# 如果当前有累积的句子,先保存
if current:
sentences.append(current)
current = ""
current_tokens = 0
# 切割长句子
long_sentence_parts = self._split_long_sentence(s, self.chunk_size)
sentences.extend(long_sentence_parts)
continue
# 如果当前累积的token数加上新句子的token数超过chunk_size,且当前累积不为空
if current and current_tokens + s_tokens > self.chunk_size:
sentences.append(current)
current = s
current_tokens = s_tokens
continue
# 处理短句合并
if len(s) < self.min_characters_per_sentence:
if current_tokens + s_tokens <= self.chunk_size:
current += s
current_tokens += s_tokens
else:
if current:
sentences.append(current)
current = s
current_tokens = s_tokens
else:
if current:
current += s
current_tokens += s_tokens
else:
current = s
current_tokens = s_tokens
# 如果当前累积的token数已经达到chunk_size,保存当前句子
if current_tokens >= self.chunk_size:
sentences.append(current)
current = ""
current_tokens = 0
# 如果还有未处理的句子,添加到句子列表
if current:
sentences.append(current)
return sentences
2. 句子向量化:
- 为每个句子生成embedding
- 对于窗口模式,会为每个句子及其上下文生成组合embedding
3. 相似度计算:
计算相邻句子/文本块之间的余弦相似度
根据选择的模式(window/cumulative)计算相似度
4. 分块决策:
- 根据相似度阈值确定分块点
- 确保每个分块满足最小/最大token数限制
- 合并过小的分块
## 3. 文档分块的方法调用链
1. 主入口方法:
- 输入:原始文本
- 输出:分块后的文本列表
2. 主要调用链:
chunk()
├── preparesentences() # 准备句子,生成embedding
│ ├── splitsentences() # 分割文本为句子
│ └── counttokens() # 计算token数量
│
├── calculatesimilarity_threshold() # 计算相似度阈值
│ ├── calculatethreshold_via_binary_search() # 二分搜索法
│ └── calculatethreshold_via_percentile() # 百分位法
│
├── groupsentences() # 根据相似度将句子分组
│ ├── groupsentences_window() # 窗口模式分组
│ └── groupsentences_cumulative()# 累积模式分组
│ └── computegroup_embedding() # 计算组embedding
│
└── splitchunks() # 将句子组分割为最终分块
└── createchunk() # 从句子列表创建分块
3. 关键参数配置:
- chunk_size
:每个分块的最大token数(默认512)
- min_chunk_size
:每个分块的最小token数(默认50)
- min_sentences
:每个分块的最小句子数(默认5)
- similarity_window
:计算相似度时考虑的句子数量(窗口大小)
- threshold
:相似度阈值或百分位(默认0.7)
- mode
:分组模式,'window'或'cumulative'(默认'window')
未完结...