1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
from typing import List
import numpy as np
import pandas as pd
import torch
from datasets import load_dataset
from sentence_transformers import SentenceTransformer, util
# 加载预训练句子嵌入模型
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
def load_dataset_quora(sample_size: int = 500) -> pd.DataFrame:
"""加载并预处理Quora数据集。"""
raw_dataset = load_dataset("quora", split="train", trust_remote_code=True)
data = {"question1": [x["questions"]["text"][0] for x in raw_dataset], "question2": [x["questions"]["text"][1] for x in raw_dataset], "is_duplicate": [x["is_duplicate"] for x in raw_dataset]}
dataset = pd.DataFrame(data)
return dataset.sample(sample_size, random_state=42)
def chunk_text(text: str, min_chunk: int = 18, max_chunk: int = 150) -> List[str]:
"""使用最优参数将文本动态切分为片段。"""
words = text.split()
chunk_size = max(min_chunk, min(len(words) // 4, max_chunk))
return [" ".join(words[i : i + chunk_size]) for i in range(0, len(words), chunk_size)]
def embed_texts(texts: List[str]) -> torch.Tensor:
"""对一组文本进行嵌入。"""
return model.encode(texts, convert_to_tensor=True)
def compute_baseline_similarity(dataset: pd.DataFrame) -> pd.DataFrame:
"""使用全文嵌入计算相似度分数。"""
embeddings_q1 = embed_texts(dataset["question1"].tolist())
embeddings_q2 = embed_texts(dataset["question2"].tolist())
dataset["similarity_baseline"] = util.pytorch_cos_sim(embeddings_q1, embeddings_q2).diagonal().cpu().numpy()
return dataset
def compute_chunked_embeddings(dataset: pd.DataFrame) -> pd.DataFrame:
"""计算分块文本的嵌入。"""
dataset = dataset.copy()
dataset["q1_chunks"] = dataset["question1"].apply(lambda x: embed_texts(chunk_text(x))).values
dataset["q2_chunks"] = dataset["question2"].apply(lambda x: embed_texts(chunk_text(x))).values
return dataset
def compute_top_k_similarity(q1_chunks: torch.Tensor, q2_chunks: torch.Tensor, top_k: int = 10) -> float:
"""计算片段对之间Top-K平均相似度。"""
similarities = [float(util.pytorch_cos_sim(q1, q2).item()) for q1 in q1_chunks for q2 in q2_chunks]
return float(np.mean(sorted(similarities, reverse=True)[:top_k])) if similarities else 0.0
def compute_chunked_similarity(dataset: pd.DataFrame, top_k: int = 3) -> pd.DataFrame:
"""使用最优参数计算分块嵌入的相似度分数。"""
dataset["similarity_chunking"] = dataset.apply(lambda row: compute_top_k_similarity(row["q1_chunks"], row["q2_chunks"], top_k), axis=1)
return dataset
def evaluate_accuracy(dataset: pd.DataFrame, similarity_column: str, threshold: float = 0.7) -> float:
"""基于相似度阈值评估准确率。"""
predictions = (dataset[similarity_column] >= threshold).astype(int)
accuracy = (predictions == dataset["is_duplicate"]).mean()
print(f"Accuracy for {similarity_column} at threshold {threshold}: {accuracy:.4f}")
return accuracy
def main():
# 加载并处理数据集
dataset = load_dataset_quora()
# 计算基线相似度(传统方法)
dataset = compute_baseline_similarity(dataset)
baseline_accuracy = evaluate_accuracy(dataset, "similarity_baseline")
# 使用最优参数计算分块相似度
dataset = compute_chunked_embeddings(dataset)
dataset = compute_chunked_similarity(dataset)
chunking_accuracy = evaluate_accuracy(dataset, "similarity_chunking")
print("\n最终结果:")
print(f"基线准确率: {baseline_accuracy:.4f}")
print(f"分块准确率: {chunking_accuracy:.4f}")
print(f"绝对提升: {(chunking_accuracy - baseline_accuracy):.4f}")
print(f"相对提升: {((chunking_accuracy - baseline_accuracy) / baseline_accuracy * 100):.2f}%")
if __name__ == "__main__":
main()
|