主要内容

  • 梳理IndexWriter.addDocuments 和 IndexWriter.commit的调用栈
  • 梳理向量索引在Lucene中是如何被存储的主要调用路径,包括如何原始向量如何保存到内存和文件,HNSW索引的构建以及如何持久化的
  • Lucene在处理Segment Merge的过程中是如何处理HNSW索引的

调用栈

stack

调用栈对应的示例

对应的Lucene版本: releases/lucene/9.9.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Path docPath = Paths.get("data/lucene_knn_demo");

if (Files.exists(docPath)) {
String absPath = docPath.toAbsolutePath().toString();
File directory = new File(absPath);
FileUtils.deleteDirectory(directory);
}

Directory directory = FSDirectory.open(docPath);
IndexWriterConfig config = new IndexWriterConfig();

FileOutputStream output = new FileOutputStream("lucene_knn.log");

InfoStream customInfoStream = new PrintStreamInfoStream(new PrintStream(output));

System.out.println("IW Component is enabled: " + customInfoStream.isEnabled("IW"));
config.setInfoStream(customInfoStream);


config.setUseCompoundFile(false);

IndexWriter indexWriter = new IndexWriter(directory, config);

int count = 10000;
int dim = 128;
List<Document> docs = new ArrayList<>();
for (int i = 0; i < count; i++) {
Document doc = new Document();
doc.add(new KeywordField("id", Integer.toString(i), Field.Store.YES));
doc.add(new KnnFloatVectorField("fvecs", generateFVector(dim)));
docs.add(doc);
}
long start = System.currentTimeMillis();
indexWriter.addDocuments(docs);
indexWriter.commit();

System.out.printf("%d vectors consumes: %d ms\n", count, System.currentTimeMillis() - start);
start = System.currentTimeMillis();

// indexWriter.forceMerge(1);
// System.out.printf("forceMerge consumes: %d ms\n", System.currentTimeMillis() - start);

indexWriter.close();

Lucene如何在内存中存储向量的

向量相关的分为两个部分,一个是原始向量,一个是向量索引

内存中向量的存储

原始向量

从调用链路上看,indexChain.indexVectorValue 涉及到了向量相关的存储,根据向量类型的(Float32或者Byte), 本文例子中的使用Float32数组,最终会调用Lucene99FlatVectorsWriter.FieldWriter.addValue, 下面详细分析一下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void addValue(int docID, T vectorValue) throws IOException {
if (docID == lastDocID) {
throw new IllegalArgumentException(
"VectorValuesField \""
+ fieldInfo.name
+ "\" appears more than once in this document (only one value is allowed per field)");
}
assert docID > lastDocID;
// 首先从IndexableField里的VectorValues复制一份原始向量
T copy = copyValue(vectorValue);
docsWithField.add(docID);
// vectors 是List<T> 对象,说明原始向量在内存中直接存入到一个List中
vectors.add(copy);
lastDocID = docID;
if (indexingDelegate != null) {
// 这里就是索引相关的计算
indexingDelegate.addValue(docID, copy);
}
}

索引

内存中HNSW索引的构建的入口是indexingDelegate.addValue, 在本文中indexingDelegate就是Lucene99HnswVectorsWriter.FieldWriter, 下面简要分析一下其源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void addValue(int docID, T vectorValue) throws IOException {
if (docID == lastDocID) {
throw new IllegalArgumentException(
"VectorValuesField \""
+ fieldInfo.name
+ "\" appears more than once in this document (only one value is allowed per field)");
}
assert docID > lastDocID;
// 原始向量又被存储了一次
vectors.add(vectorValue);
docsWithField.add(docID);
// 构建索引
hnswGraphBuilder.addGraphNode(node);
node++;
lastDocID = docID;
}

HNSW索引是如何在内存中表示的,Lucene使用了OnHeapHnswGraph表示了索引相关信息的存储(主要是节点之间的连接信息).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 省略了其他一些字段
public final class OnHeapHnswGraph extends HnswGraph implements Accountable {

// ...
private final AtomicReference<EntryNode> entryNode; // 表示了HNSW中整个索引的entryPoint

// the internal graph representation where the first dimension is node id and second dimension is
// level
// e.g. graph[1][2] is all the neighbours of node 1 at level 2
// 索引中节点的邻接信息
private NeighborArray[][] graph;

OnHeapHnswGraph(int M, int numNodes) {
this.entryNode = new AtomicReference<>(new EntryNode(-1, 1));
// Neighbours' size on upper levels (nsize) and level 0 (nsize0)
// We allocate extra space for neighbours, but then prune them to keep allowed maximum
this.nsize = M + 1;
this.nsize0 = (M * 2 + 1);
noGrowth = numNodes != -1;
if (noGrowth == false) {
numNodes = INIT_SIZE;
}
this.graph = new NeighborArray[numNodes][];
}
}

文件中向量的存储

向量相关的索引文件涉及四种

  • *.vec 存储原始向量
  • *.vemf 原始向量的meta文件
  • *.vex 向量索引文件, Lucene中目前用的HNSW索引
  • *.vem 向量索引文件的meta文件
  1. 向量相关的文件写入(flush)是由Lucene99HnswVectorsWriter.flush实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
// 写入原始向量
flatVectorWriter.flush(maxDoc, sortMap);
for (FieldWriter<?> field : fields) {
if (sortMap == null) {
writeField(field);
} else {
writeSortingField(field, sortMap);
}
}
}


  1. 原始向量的写入 Lucene99FlatVectorsWriter.flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
for (FieldWriter<?> field : fields) {
if (sortMap == null) {
writeField(field, maxDoc);
} else {
writeSortingField(field, maxDoc, sortMap);
}
}
}

private void writeField(FieldWriter<?> fieldData, int maxDoc) throws IOException {
// write vector values
long vectorDataOffset = vectorData.alignFilePointer(Float.BYTES);
switch (fieldData.fieldInfo.getVectorEncoding()) {
case BYTE:
writeByteVectors(fieldData);
break;
case FLOAT32:
// 写入到原始向量文件里
writeFloat32Vectors(fieldData);
break;
}
long vectorDataLength = vectorData.getFilePointer() - vectorDataOffset;

writeMeta(
fieldData.fieldInfo, maxDoc, vectorDataOffset, vectorDataLength, fieldData.docsWithField);
}

private void writeFloat32Vectors(FieldWriter<?> fieldData) throws IOException {
final ByteBuffer buffer =
ByteBuffer.allocate(fieldData.dim * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN);

// 将之前存在内存中的vector的list写入到vectorData对应的文件里
for (Object v : fieldData.vectors) {
buffer.asFloatBuffer().put((float[]) v);
vectorData.writeBytes(buffer.array(), buffer.array().length);
}
}

  1. 向量索引相关的写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

// 每个字段对应向量索引的写入
private void writeField(FieldWriter<?> fieldData) throws IOException {
// write graph
long vectorIndexOffset = vectorIndex.getFilePointer();
OnHeapHnswGraph graph = fieldData.getGraph();
int[][] graphLevelNodeOffsets = writeGraph(graph);
// 向量索引的长度
long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset;

// graphLevelNodeOffsets 经过编码后会写入到vectorIndexOutput中,而不是meta文件中
writeMeta(
fieldData.fieldInfo,
vectorIndexOffset,
vectorIndexLength,
fieldData.docsWithField.cardinality(),
graph,
graphLevelNodeOffsets);
}

// hnsw节点连接信息的写入
private int[][] writeGraph(OnHeapHnswGraph graph) throws IOException {
if (graph == null) return new int[0][0];
// write vectors' neighbours on each level into the vectorIndex file
int countOnLevel0 = graph.size();
int[][] offsets = new int[graph.numLevels()][];
for (int level = 0; level < graph.numLevels(); level++) {
// graph.getNodesOnLevel获取到每个level出现的node数组
int[] sortedNodes = NodesIterator.getSortedNodes(graph.getNodesOnLevel(level));
offsets[level] = new int[sortedNodes.length];
int nodeOffsetId = 0;
for (int node : sortedNodes) {
// 根据node大小获取该node在level层级的节点连接信息
NeighborArray neighbors = graph.getNeighbors(level, node);
int size = neighbors.size();
// Write size in VInt as the neighbors list is typically small
long offsetStart = vectorIndex.getFilePointer();
// 写入这个节点对应的连接数
vectorIndex.writeVInt(size);
// Destructively modify; it's ok we are discarding it after this
int[] nnodes = neighbors.node();
// 根据node大小做升序排序,方便后面做差值压缩编码
Arrays.sort(nnodes, 0, size);
// Now that we have sorted, do delta encoding to minimize the required bits to store the
// information
for (int i = size - 1; i > 0; --i) {
assert nnodes[i] < countOnLevel0 : "node too large: " + nnodes[i] + ">=" + countOnLevel0;
nnodes[i] -= nnodes[i - 1];
}
// 顺序写入经过差值压缩后的编码,nnodes[0]是原值
for (int i = 0; i < size; i++) {
vectorIndex.writeVInt(nnodes[i]);
}
// 将这个节点的连接信息的长度记录到offset表中
offsets[level][nodeOffsetId++] =
Math.toIntExact(vectorIndex.getFilePointer() - offsetStart);
}
}
return offsets;
}

// 图索引meta信息的写入
private void writeMeta(
FieldInfo field,
long vectorIndexOffset,
long vectorIndexLength,
int count,
HnswGraph graph,
int[][] graphLevelNodeOffsets)
throws IOException {
meta.writeInt(field.number);
// 向量的编码FLOAT or BYTES
meta.writeInt(field.getVectorEncoding().ordinal());
// 向量距离的计算方式
meta.writeInt(field.getVectorSimilarityFunction().ordinal());
// 向量索引的开始位置
meta.writeVLong(vectorIndexOffset);
// 向量索引的总长度
meta.writeVLong(vectorIndexLength);
// 向量的维度
meta.writeVInt(field.getVectorDimension());
// 向量的数量
meta.writeInt(count);
meta.writeVInt(M);
// write graph nodes on each level
if (graph == null) {
meta.writeVInt(0);
} else {
// HNSW索引的层级
meta.writeVInt(graph.numLevels());
long valueCount = 0;
for (int level = 0; level < graph.numLevels(); level++) {
// 获取每个层级的节点
NodesIterator nodesOnLevel = graph.getNodesOnLevel(level);
valueCount += nodesOnLevel.size();
if (level > 0) {
// 非level0 的才需要写入meta
int[] nol = new int[nodesOnLevel.size()];
int numberConsumed = nodesOnLevel.consume(nol);
// 方便后面差值编码
Arrays.sort(nol);
assert numberConsumed == nodesOnLevel.size();
// 写入该层级的节点数
meta.writeVInt(nol.length); // number of nodes on a level
for (int i = nodesOnLevel.size() - 1; i > 0; --i) {
nol[i] -= nol[i - 1];
}
for (int n : nol) {
assert n >= 0 : "delta encoding for nodes failed; expected nodes to be sorted";
// 写入该层级的节点压缩编码
meta.writeVInt(n);
}
} else {
assert nodesOnLevel.size() == count : "Level 0 expects to have all nodes";
}
}
long start = vectorIndex.getFilePointer();
// 记录vectorIndex当前的文件位置(已经写完graph的连接信息的位置)
meta.writeLong(start);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
// graphLevelNodeOffsets的压缩编码, valueCount是说有level的总连接点的数量
// graphLevelNodeOffsets写入的也是VectorIndex
final DirectMonotonicWriter memoryOffsetsWriter =
DirectMonotonicWriter.getInstance(
meta, vectorIndex, valueCount, DIRECT_MONOTONIC_BLOCK_SHIFT);
long cumulativeOffsetSum = 0;
for (int[] levelOffsets : graphLevelNodeOffsets) {
for (int v : levelOffsets) {
memoryOffsetsWriter.add(cumulativeOffsetSum);
cumulativeOffsetSum += v;
}
}
memoryOffsetsWriter.finish();
meta.writeLong(vectorIndex.getFilePointer() - start);
}
}

小结

  1. Lucene中对整个向量相关的存储整体是分成两部分(原始向量和HNSW向量索引).
  2. 在存储向量的索引过程中整体分成两个步骤
    • 第一步存储各个层级节点的连接信息(graph),通过对连接节点的排序,存储差值对连接信息进行了压缩,在写入节点信息的过程过程中,会生成offset表,表示了每个层级中节点对应的连接信息的offset
    • 第二步存储第一步生成的offset表,利用了DirectMonotonicWriter 进行了压缩处理
  3. 整体可以看到,Lucene并没有存储索引的原始信息,而是经过一系列精巧的压缩处理,减少了磁盘占用,后面的文章中会深入探讨这种压缩处理的利弊

Lucene在Merge的过程中如何处理HNSW索引的

上文介绍的内容都是Lucene在生成一个Segment的过程中对向量及其索引的计算和存储,在实际的系统中,比如Elasticsearch一般都会有多个Lucene Segment的生成,Lucene本身也会对多个小Segment进行Merge, 我们需要知道在merge的过程中Lucene是如何处理向量相关的索引的

本文使用IndexWriter.forceMerge(1)触发merge操作,得到下文的调用栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
 public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOException {
CloseableRandomVectorScorerSupplier scorerSupplier =
flatVectorWriter.mergeOneFieldToIndex(fieldInfo, mergeState);
boolean success = false;
try {
long vectorIndexOffset = vectorIndex.getFilePointer();
// build the graph using the temporary vector data
// we use Lucene99HnswVectorsReader.DenseOffHeapVectorValues for the graph construction
// doesn't need to know docIds
// TODO: separate random access vector values from DocIdSetIterator?
OnHeapHnswGraph graph = null;
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
// Merger是IncrementalHnswGraphMerger
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
// 这里会选出节点数最大的graph,作为merge过程中初始的graph以减少图的构建
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
}
final DocIdSetIterator mergedVectorIterator;
switch (fieldInfo.getVectorEncoding()) {
case BYTE:
mergedVectorIterator =
KnnVectorsWriter.MergedVectorValues.mergeByteVectorValues(fieldInfo, mergeState);
break;
case FLOAT32:
// 获取每个Segment对应的原始向量,使用 OffHeapFloatVectorValues.load 获取每个原始向量
mergedVectorIterator =
KnnVectorsWriter.MergedVectorValues.mergeFloatVectorValues(fieldInfo, mergeState);
break;
default:
throw new IllegalStateException(
"Unsupported vector encoding: " + fieldInfo.getVectorEncoding());
}
// 多个Segment中的HNSW graph合并成一个OnHeapHnswGraph
graph =
merger.merge(
mergedVectorIterator,
segmentWriteState.infoStream,
scorerSupplier.totalVectorCount());
// 这里的写入和上文中的处理是一致的
vectorIndexNodeOffsets = writeGraph(graph);
}
// 这里同上文也是一致的
long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset;
writeMeta(
fieldInfo,
vectorIndexOffset,
vectorIndexLength,
scorerSupplier.totalVectorCount(),
graph,
vectorIndexNodeOffsets);
success = true;
} finally {
if (success) {
IOUtils.close(scorerSupplier);
} else {
IOUtils.closeWhileHandlingException(scorerSupplier);
}
}
}

// 选出最大的graph 作为初始graph
public IncrementalHnswGraphMerger addReader(
KnnVectorsReader reader, MergeState.DocMap docMap, Bits liveDocs) throws IOException {
KnnVectorsReader currKnnVectorsReader = reader;
if (reader instanceof PerFieldKnnVectorsFormat.FieldsReader) {
currKnnVectorsReader =
((PerFieldKnnVectorsFormat.FieldsReader) reader).getFieldReader(fieldInfo.name);
}

if (!(currKnnVectorsReader instanceof HnswGraphProvider) || !noDeletes(liveDocs)) {
return this;
}

int candidateVectorCount = 0;
switch (fieldInfo.getVectorEncoding()) {
case BYTE:
ByteVectorValues byteVectorValues =
currKnnVectorsReader.getByteVectorValues(fieldInfo.name);
if (byteVectorValues == null) {
return this;
}
candidateVectorCount = byteVectorValues.size();
break;
case FLOAT32:
FloatVectorValues vectorValues = currKnnVectorsReader.getFloatVectorValues(fieldInfo.name);
if (vectorValues == null) {
return this;
}
candidateVectorCount = vectorValues.size();
break;
default:
throw new IllegalStateException(
"Unexpected vector encoding: " + fieldInfo.getVectorEncoding());
}
// 选取最大节点数(向量数)的graph作为初始化的graph,这里使用initReader代替,后续使用initReader读取graph的时候就以这个graph作为初始graph
if (candidateVectorCount > initGraphSize) {
initReader = currKnnVectorsReader;
initDocMap = docMap;
initGraphSize = candidateVectorCount;
}
return this;
}

// 处理merge的入口
@Override
public OnHeapHnswGraph merge(
DocIdSetIterator mergedVectorIterator, InfoStream infoStream, int maxOrd) throws IOException {
// 创建 InitializedHnswGraphBuilder 对象, 同时会将最大的初始化graph load进内存变成`OnHeapHnswGraph`
HnswBuilder builder = createBuilder(mergedVectorIterator, maxOrd);
builder.setInfoStream(infoStream);
return builder.build(maxOrd);
}

// 初始化graph的 init过程
public static OnHeapHnswGraph initGraph(
int M, HnswGraph initializerGraph, int[] newOrdMap, int totalNumberOfVectors)
throws IOException {
// 用新的总向量数量和代替之前的节点数
OnHeapHnswGraph hnsw = new OnHeapHnswGraph(M, totalNumberOfVectors);
for (int level = initializerGraph.numLevels() - 1; level >= 0; level--) {
HnswGraph.NodesIterator it = initializerGraph.getNodesOnLevel(level);
while (it.hasNext()) {
int oldOrd = it.nextInt();
int newOrd = newOrdMap[oldOrd];
hnsw.addNode(level, newOrd);
hnsw.trySetNewEntryNode(newOrd, level);
NeighborArray newNeighbors = hnsw.getNeighbors(level, newOrd);
initializerGraph.seek(level, oldOrd);
// 直接复制之前的连接信息
for (int oldNeighbor = initializerGraph.nextNeighbor();
oldNeighbor != NO_MORE_DOCS;
oldNeighbor = initializerGraph.nextNeighbor()) {
int newNeighbor = newOrdMap[oldNeighbor];
// we will compute these scores later when we need to pop out the non-diverse nodes
newNeighbors.addOutOfOrder(newNeighbor, Float.NaN);
}
}
}
return hnsw;
}
// merge过程中处理Vector相关的代码
protected void addVectors(int minOrd, int maxOrd) throws IOException {
long start = System.nanoTime(), t = start;
if (infoStream.isEnabled(HNSW_COMPONENT)) {
infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")");
}
for (int node = minOrd; node < maxOrd; node++) {
// 添加新的node到graph中
addGraphNode(node);
if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
t = printGraphBuildStatus(node, start, t);
}
}
}
// 实际的addGraphNode 调用,`InitializedHnswGraphBuilder.addGraphNode`
public void addGraphNode(int node) throws IOException {
// merge的时候会选出节点数最多的索引作为初始的graph, 初始graph的所有信息都是构建好的,所以就不要添加了
if (initializedNodes.get(node)) {
return;
}
// 向当前的graph中添加节点,执行标准hnsw构建流程
super.addGraphNode(node);
}

小结

  • merge过程中,核心点是选出一个最大的graph作为初始化graph,然后将其他segment的hsnw graph 写入到这个初始化graph中(省去了一些计算),不过由于其他segment的graph的节点仍然需要重新计算, 导致merge的cpu成本也比较高
  • merge过程中,所有的向量都会被读取到OnHeapHnswGraph中,所以内存压力比较高
  • 对于HNSW 索引来说,merge一个成本高昂的操作