LangChain 中大文本分块的策略总结
1. 概览
1.1 整体介绍
由于 LLM 的上下文 Token 限制,所以对于大文本传给 LLM 进行总结之前,需要先拆分一下再总结,本文就是对这几种拆分方式的说明。 整体来说有以下几种方式:
-
基于长度拆分:
- 基于字符计算长度
- 基于 token 计算长度
-
基于文本结构拆分
-
基于语义拆分
1.2 安装依赖
# tiktoken 在基于 token 分块的时候会用到
uv add langchain-text-splitters tiktoken
1.3 数据准备
# load meeting text
with open("./data/alimeeting_content.txt", "r", encoding="utf-8") as f:
meeting_text = f.read()
print(meeting_text)
print(f"文本长度:{len(meeting_text)}")
# 输出内容
...
...
发言人 1
就能交稿了,再再定一下稿,定好,定好了以后,咱们就转给客户。
发言人 3
然后行。行行行。
文本长度:9463
2. 基于长度拆分
2.1 基于字符计算长度
主要用到的类是CharacterTextSplitter
关键参数解释:
- separator:文本分块的分隔符
- chunk_size: 每一块的长度
- chunk_overlap:块与块直接重叠的长度,加一些重叠会更连贯一些
- length_function:计算长度的函数
- is_separator_regex:是否是正则分割符
import os
from langchain_text_splitters import CharacterTextSplitter
text_splitter = CharacterTextSplitter(
separator="\n",
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
texts = text_splitter.create_documents([meeting_text])
print(f"共拆分为 {len(texts)} 块")
dir_name = "split"
os.makedirs(dir_name, exist_ok=True)
file_path = f"{dir_name}/meeting_based_character.txt"
with open(file_path, "w", encoding="utf-8") as f:
for i, doc in enumerate(texts):
f.write(doc.page_content)
if i != len(texts) - 1:
f.write("\n=======================================\n")
print(f"拆分结果已存储到:{file_path}")
# output
共拆分为 12 块
拆分结果已存储到:split/meeting_based_character.txt
2.2 基于 token 计算长度
这个主要用到的是RecursiveCharacterTextSplitter这个类的from_tiktoken_encoder方法,通过这个方法创建一个基于tiktoken的文本分割器。
关键参数:
- encoding_name:token 编码器的名称
- model_name:也可以传入具体模型的名称
- chunk_size:每一块的大小
- chunk_overlap:块与块直接重叠的大小
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# encoding_name = "cl100k_base",
model_name="gpt-4",
chunk_size=1000,
chunk_overlap=200,
)
texts = text_splitter.split_text(meeting_text)
print(f"共拆分为 {len(texts)} 块")
print(texts[0])
file_path = f"{dir_name}/meeting_based_tiktoken.txt"
with open(file_path, "w", encoding="utf-8") as f:
for i, doc in enumerate(texts):
f.write(doc)
if i != len(texts) - 1:
f.write("\n=======================================\n")
print(f"拆分结果已存储到:{file_path}")
# output
共拆分为 13 块
...
...
发言人 3
那对,让他们自己定。主要的是咱们这边儿也不太清楚人家请的宾客,主要是让他这边出名单。
发言人 5
拆分结果已存储到:split/meeting_based_tiktoken.txt
3. 基于文本结构拆分
对于文章和小说一类的好一点,RecursiveCharacterTextSplitter 尝试保持较大的单位(例如,段落)不变。
如果一个单位超过块大小,它会移动到下一个级别(例如,句子)。
如有必要,此过程将继续到单词级别。
关键参数
- separators:会递归的使用这个列表里面的分隔符进行分割,直到找到一种可行的块,默认值是:
["\n\n", "\n", " ", ""],代入到文章里面来说就是先找段落,再找句子级别的,再找单词级别 - chunk_size:块大小
- chunk_overlap:块重叠大小
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
separators=["\n"], chunk_size=1000, chunk_overlap=200
)
texts = text_splitter.split_text(meeting_text)
print(f"共拆分为 {len(texts)} 块")
print(texts[0])
file_path = f"{dir_name}/meeting_based_structure.txt"
with open(file_path, "w", encoding="utf-8") as f:
for i, doc in enumerate(texts):
f.write(doc)
if i != len(texts) - 1:
f.write("\n=======================================\n")
print(f"拆分结果已存储到:{file_path}")
# output
共拆分为 12 块
...
...
4. 基于语义拆分
整体是先按照句子拆分,长文本拆分成一句一句的,然后默认每 3 句合并为一个初始块,计算相邻句的 embedding 距离,判断“断点”,如果某两组句子超过阈值,就拆分这一块。 关键参数:
- breakpoint_threshold_type: 阈值类型(percentile, standard_deviation, interquartile, gradient)
- breakpoint_threshold_amount: 阈值具体数值(如百分位、标准差倍数等)
- min_chunk_size: 控制最小块大小
阈值类型及使用场景:
| 类型 | 描述 | 使用场景 |
|---|---|---|
| percentile(默认) | 找出距离分布中的高百分位值(默认 95%)作为割点。 | 文本整体风格均匀,但希望自动找出语义跳变点。 |
| standard_deviation | 阈值 = 平均距离 + k × 标准差(k 可调)。 | 适合语义距离稳定,但偶尔含有异常跳变的文本,强调异常识别。 |
| interquartile | 阈值 = 平均 + k × IQR(四分位间距),适合识别异常点。 | 当距离分布有异常值时,对异常点敏感的领域(比如法律、医疗文本)。 |
| gradient | 基于距离变化梯度来识别突变,用百分位如 95% 来提取剧烈变化的跳点。 | 特别适用于语义结构变化剧烈的文本,如章节切换或语调转折明显的文档 |
from dotenv import load_dotenv
from langchain_community.embeddings import OllamaEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
load_dotenv()
embeddings = OllamaEmbeddings(
model="bge-m3:latest", base_url="http://localhost:11434"
)
text = "This is a test query."
query_result = embeddings.embed_query(text)
print(f"embedding len: {len(query_result)}")
text_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile",
min_chunk_size=500,
breakpoint_threshold_amount=90.0,
)
docs = text_splitter.create_documents([meeting_text])
print(f"chunks len: {len(docs)}")