YWW
YWW
发布于 2025-09-21 / 7 阅读
0
0

RAG系统关键点

一个完善的RAG流程 需要做到以下几点

(1)文档准备【文档收集,切分,向量化】

(2)文档检索【多路召回,粗排,重排rerank】

(3)后处理,置信度评估,结果过滤

(4)大模型生成结果

架构图【待补充】

技术方案

1.文本分块模型 Embedding-v4

- Embedding模型:使用外部传入embedding_model(类型EmbeddingModel)来获取文本的向量表示

- 相似度计算:使用余弦相似度(cosine similarity)计算句子/文本块之间的语义相似度

- 语言支持:支持中英文,参数控制(默认为中Language.ZH

2. 分块算法及执行逻辑

核心算法

1. 基于语义相似度的分块

- 使用滑动窗口计算相邻文本片段的语义相似度

- 当相似度低于阈值时,在该位置进行分块

  • 列举两种分组模式:

- window模式:使用固定大小的窗口计算相似度

 def _group_sentences_window(self, sentences: list[SemanticSentence]) -> list[list[SemanticSentence]]:
        """
        基于窗口模式将句子分组
        
        Args:
            sentences: 句子列表
            
        Returns:
            句子组列表
        """
        similarities = self._compute_window_similarities(sentences)
        split_indices = self._get_split_indices(similarities, self.similarity_threshold)
        groups = [
            sentences[split_indices[i]:split_indices[i+1]]
            for i in range(len(split_indices) - 1)
        ]
        return groups

- cumulative模式:累积计算相似度【基于累计的模式将句子分组】

def _group_sentences_cumulative(self, sentences: list[SemanticSentence]) -> list[list[SemanticSentence]]:
        """
        基于累积模式将句子分组
        
        Args:
            sentences: 句子列表
            
        Returns:
            句子组列表
        """
        groups = []
        current_group = sentences[:self.min_sentences]
        current_embedding = self._compute_group_embedding(current_group)

        for sentence in sentences[self.min_sentences:]:
            # 将新句子与当前组的平均embedding比较
            similarity = self._get_semantic_similarity(current_embedding, sentence.embedding)

            if similarity >= self.similarity_threshold:
                # 添加到当前组
                current_group.append(sentence)
                # 更新平均embedding
                current_embedding = self._compute_group_embedding(current_group)
            else:
                # 开始新的组
                if current_group:
                    groups.append(current_group)
                current_group = [sentence]
                current_embedding = sentence.embedding

        # 添加最后一组
        if current_group:
            groups.append(current_group)

        return groups

2. 阈值计算

  • 列举三种方式确定相似度阈值:

- 直接指定阈值(0-1之间)

- 基于百分位数自动计算

 def _calculate_threshold_via_percentile(self, sentences: list[SemanticSentence]) -> float:
        """
        通过百分位数计算相似度阈值
        
        Args:
            sentences: 句子列表
            
        Returns:
            相似度阈值
        """
        # 计算所有相似度
        all_similarities = self._compute_window_similarities(sentences)
        return float(np.percentile(all_similarities, 100 - self.similarity_percentile))

- 通过二分搜索自动寻找最优阈值

def _calculate_threshold_via_binary_search(self, sentences: list[SemanticSentence]) -> float:
        """
        通过二分搜索计算最优相似度阈值
        
        Args:
            sentences: 句子列表
            
        Returns:
            最优相似度阈值
        """
        # 获取token计数和累计token计数
        token_counts = [sent.token_count for sent in sentences]
        cumulative_token_counts = np.cumsum([0] + token_counts)

        # 计算所有相似度
        similarities = self._compute_window_similarities(sentences)

        # 计算相似度的中位数和标准差
        median = np.median(similarities)
        std = np.std(similarities)

        # 设置阈值的搜索范围
        low = max(median - 1 * std, 0.0)
        high = min(median + 1 * std, 1.0)

        # 初始化计数器和阈值
        iterations = 0
        threshold = (low + high) / 2

        # 二分搜索最优阈值
        while abs(high - low) > self.threshold_step:
            threshold = (low + high) / 2
            
            # 获取分割点
            split_indices = self._get_split_indices(similarities, threshold)
            
            # 获取分割点处的累计token数
            split_token_counts = np.diff(cumulative_token_counts[split_indices])
            
            # 检查所有分块是否满足大小要求
            if all(self.min_chunk_size <= count <= self.chunk_size for count in split_token_counts):
                break
            # 检查是否有分块超出最大大小
            elif any(count > self.chunk_size for count in split_token_counts):
                low = threshold + self.threshold_step
            # 检查是否有分块小于最小大小
            else:
                high = threshold - self.threshold_step
                
            iterations += 1
            if iterations > 10:
                g_warning("阈值计算迭代次数过多,停止...")
                break

        return threshold

### 执行流程

1. 文本预处理

  • 使用标点符号(如".", "!", "?"等)将文本分割成句子

  • 处理过长的句子,确保每个句子不超过最大token限制
    示范:

def _split_sentences(self, text: str) -> list[str]:
        """
        快速分割句子的方法,同时考虑token数量限制

        Args:
            text: 输入文本

        Returns:
            分割后的句子列表
        """
        t = text
        for c in self.delim:
            if self.include_delim == "prev":
                t = t.replace(c, c + self.sep)
            elif self.include_delim == "next":
                t = t.replace(c, self.sep + c)
            else:
                t = t.replace(c, self.sep)

        # 初始分割
        splits = [s for s in t.split(self.sep) if s != ""]

        # 将短句与前一句合并,同时考虑token数量限制
        current = ""
        sentences = []
        current_tokens = 0

        for s in splits:
            # 估算当前句子的token数
            s_tokens = int(len(s) * self.get_lang_token())  # 使用与 _count_tokens 相同的估算方法
            
            # 如果单个句子就超过chunk_size,先进行切割
            if s_tokens > self.chunk_size:
                # 如果当前有累积的句子,先保存
                if current:
                    sentences.append(current)
                    current = ""
                    current_tokens = 0
                
                # 切割长句子
                long_sentence_parts = self._split_long_sentence(s, self.chunk_size)
                sentences.extend(long_sentence_parts)
                continue
            
            # 如果当前累积的token数加上新句子的token数超过chunk_size,且当前累积不为空
            if current and current_tokens + s_tokens > self.chunk_size:
                sentences.append(current)
                current = s
                current_tokens = s_tokens
                continue

            # 处理短句合并
            if len(s) < self.min_characters_per_sentence:
                if current_tokens + s_tokens <= self.chunk_size:
                    current += s
                    current_tokens += s_tokens
                else:
                    if current:
                        sentences.append(current)
                    current = s
                    current_tokens = s_tokens
            else:
                if current:
                    current += s
                    current_tokens += s_tokens
                else:
                    current = s
                    current_tokens = s_tokens

            # 如果当前累积的token数已经达到chunk_size,保存当前句子
            if current_tokens >= self.chunk_size:
                sentences.append(current)
                current = ""
                current_tokens = 0

        # 如果还有未处理的句子,添加到句子列表
        if current:
            sentences.append(current)

        return sentences

2. 句子向量化

- 为每个句子生成embedding

- 对于窗口模式,会为每个句子及其上下文生成组合embedding

3. 相似度计算

  • 计算相邻句子/文本块之间的余弦相似度

  • 根据选择的模式(window/cumulative)计算相似度

4. 分块决策

- 根据相似度阈值确定分块点

- 确保每个分块满足最小/最大token数限制

- 合并过小的分块

## 3. 文档分块的方法调用链

1. 主入口方法

- 输入:原始文本

- 输出:分块后的文本列表

2. 主要调用链

chunk()

├── preparesentences() # 准备句子,生成embedding

│ ├── splitsentences() # 分割文本为句子

│ └── counttokens() # 计算token数量

├── calculatesimilarity_threshold() # 计算相似度阈值

│ ├── calculatethreshold_via_binary_search() # 二分搜索法

│ └── calculatethreshold_via_percentile() # 百分位法

├── groupsentences() # 根据相似度将句子分组

│ ├── groupsentences_window() # 窗口模式分组

│ └── groupsentences_cumulative()# 累积模式分组

│ └── computegroup_embedding() # 计算组embedding

└── splitchunks() # 将句子组分割为最终分块

└── createchunk() # 从句子列表创建分块

3. 关键参数配置

- chunk_size:每个分块的最大token数(默认512)

- min_chunk_size:每个分块的最小token数(默认50)

- min_sentences:每个分块的最小句子数(默认5)

- similarity_window:计算相似度时考虑的句子数量(窗口大小)

- threshold:相似度阈值或百分位(默认0.7)

- mode:分组模式,'window'或'cumulative'(默认'window')

未完结...


评论