前言

在ES中我们经常使用的数据格式json,ES也支持常见的CRUD操作,这里我们主要介绍写入相关的操作(创建,更新,删除),ES的底层存储引擎是Lucene,Lucene也有相关创建更新删除的操作,但是Lucene是没有显示的根据主键更新文档的api的,本文主要介绍的是在ES有_id的情况下,ES是如何基于Lucene实现增删改的操作的,其中的数据模型又是如何映射的.

ps: 本文不考虑ES中的数据类型到Lucene中的数据类型的映射(Field),所有的代码片段都是基于以下给定的类型映射

Name ES type Lucene Field
item_id keyword StringField
name keyword StringField
color keyword StringField

lucene 如何基于id(主键)部分更新文档 (基于lucene 9.7.0)

1
2
3
4
5
 <dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>9.7.0</version>
</dependency>

1. 创建文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public static void createDocuments(IndexWriter indexWriter) throws IOException {
Document doc1 = new Document();
doc1.add(new StringField("item_id", "1", Field.Store.YES ));
doc1.add(new StringField("name", "item1", Field.Store.YES));
doc1.add(new StringField("color", "red", Field.Store.YES));

Document doc2 = new Document();
doc2.add(new StringField("item_id", "2", Field.Store.YES));
doc2.add(new StringField("name", "item2", Field.Store.YES));
doc2.add(new StringField("color", "blue", Field.Store.YES));

indexWriter.addDocument(doc1);
indexWriter.addDocument(doc2);

indexWriter.commit();
indexWriter.flush();
}

2. 读取文档验证写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void getDocumentById(IndexReader reader, IndexSearcher searcher, String itemId) throws IOException {
Query query = new TermQuery(new Term("item_id", itemId));
TopDocs topdocs = searcher.search(query, 10);
assert topdocs.totalHits.value == 1;

for (ScoreDoc doc: topdocs.scoreDocs) {
int docId = doc.doc;
Document fullDoc = getDocumentByDocId(reader, docId);

fullDoc.getFields().forEach(field -> {
System.out.println(" " + field.name() + ": " + field.stringValue());
});
}
}
1
2
3
4
5
6
7
8
9
// item_id = "1"
getDocumentById(reader, searcher, "1");
// item_id: 1
// name: item1
// color: red
getDocumentById(reader, searcher, "2");
// item_id: 2
// name: item2
// color: blue

3. 部分更新文档

1
2
3
4
5
6
7
8
9
public static void partialUpdateDocumentNameByItemId(IndexWriter writer, String itemId, String newName) throws IOException {
Document doc = new Document();
doc.add(new StringField("name", newName, Field.Store.YES));
// important: 这个作为主键的Term一定要带上
doc.add(new StringField("item_id", itemId, Field.Store.YES));
Term mainTerm = new Term("item_id", itemId);
writer.updateDocument(mainTerm, doc);
}

4. 验证更新文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
partialUpdateDocumentNameByItemId(indexWriter, "1", "item1_updated");
partialUpdateDocumentNameByItemId(indexWriter, "2", "item2_updated");
indexWriter.commit();
indexWriter.flush();

indexReader = DirectoryReader.open(readDirectory);
indexSearcher = new IndexSearcher(indexReader);
getDocumentById(indexSearcher, "1");

indexReader = DirectoryReader.open(readDirectory);
indexSearcher = new IndexSearcher(indexReader);
getDocumentById(indexSearcher, "2");
//
// name: item1_updated
// item_id: 1
// name: item2_updated
// item_id: 2

结论: 由于在partialUpdateDocumentNameByItemId 中只写了item_id和name属性(符合update的直觉),但是可以看出Lucene把updateDocument中的doc对象当成了最新且完整的mainTerm对应的doc,这就导致了虽然我们目的是部分更新,但是会丢失没有写入(没有变化)的那些属性,这个例子也可以看出Lucene本质上是用新文档覆盖旧文档的形式,用一个可以代表主键的Term做关联,来实现部分更新字段的目的,这个和一般RDBMS的存储模型有点区别.

5. 完全更新所有字段(不变的field也要加入将要更新的document中)

1
2
3
4
5
6
7
8
9
public static void fullUpdateDocumentNameByItemId(IndexWriter writer, String itemId, String newName, String oldColor) throws IOException {
Document doc = new Document();
doc.add(new StringField("name", newName, Field.Store.YES));
doc.add(new StringField("item_id", itemId, Field.Store.YES));
doc.add(new StringField("color", oldColor, Field.Store.YES));

Term mainTerm = new Term("item_id", itemId);
writer.updateDocument(mainTerm, doc);
}

6. 验证完全更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fullUpdateDocumentNameByItemId(indexWriter, "1", "item1_updated", "red");
fullUpdateDocumentNameByItemId(indexWriter, "2", "item2_updated", "blue");

indexWriter.commit();
indexWriter.flush();

indexReader = DirectoryReader.open(readDirectory);
indexSearcher = new IndexSearcher(indexReader);
getDocumentById(indexSearcher, "1");

indexReader = DirectoryReader.open(readDirectory);
indexSearcher = new IndexSearcher(indexReader);
getDocumentById(indexSearcher, "2");

// output
// name: item1_updated
// item_id: 1
// color: red
// name: item2_updated
// item_id: 2
// color: blue

7. 删除文档

1
2
3
4
public static void deleteDocument(IndexWriter writer, String itemId) throws IOException {
Term mainTerm = new Term("item_id", itemId);
writer.deleteDocuments(mainTerm);
}

8. 验证删除文档

1
2
3
4
5
getDocumentById(indexSearcher, "1");
getDocumentById(indexSearcher, "2");

// cannot found 1
// cannot found 2

结论:

  1. 必须将原始文档的所有字段全部获取到再用updateDocument的方式更新,才能实现我们预期中部分更新字段的目的,可以看到成本还是比较高的
  2. 由于Lucene的这种机制也导致了,ES的CRUD模型中需要实现一些额外的机制才能使用到Lucene的能力

ES中的操作

ES mapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
PUT /my_index
{
"mappings": {
"properties": {
"item_id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"color": {
"type": "keyword"
}
}
}
}

1. 创建文档

1
2
3
4
5
6
7
8
9
10
11
12
13
PUT my_index/_doc/1
{
"item_id": "1",
"name":"item1",
"color": "red"
}

PUT my_index/_doc/2
{
"item_id": "2",
"name":"item2",
"color": "blue"
}

2. 获取文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
GET my_index/_search
// output
{
...
"hits": [
{
"_index": "my_index",
"_id": "1",
"_score": 1,
"_source": {
"item_id": "1",
"name": "item1",
"color": "red"
}
},
{
"_index": "my_index",
"_id": "2",
"_score": 1,
"_source": {
"item_id": "2",
"name": "item2",
"color": "blue"
}
}
]
}

3.部分更新

1
2
3
4
5
6
7
8
9
10
11
12
13
POST my_index/_update/1
{
"doc": {
"name": "item1_updated"
}
}

POST my_index/_update/2
{
"doc": {
"name": "item2_updated"
}
}

4.获取部分更新后的文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
GET my_index/_search
// output
{
...
"hits": [
{
"_index": "my_index",
"_id": "1",
"_score": 1,
"_source": {
"item_id": "1",
"name": "item1_updated",
"color": "red"
}
},
{
"_index": "my_index",
"_id": "2",
"_score": 1,
"_source": {
"item_id": "2",
"name": "item2_updated",
"color": "blue"
}
}
]
}

说明: 可以看ES中部分更新是可以正常的工作的, 原因就在于ES在处理update的时候会自动拉取原始文档的所有字段和新的更新的字段组合成一份完整的新的全量字段的文档,再去更新Lucene

5. 完整更新所有字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 方式一
POST my_index/_update/1
{
"doc": {
"name": "item1_updated",
"item_id": "1",
"color": "red"
}
}

// 方式二
PUT my_index/_doc/1
{
"item_id": "1",
"name":"item1_update_by_put",
"color": "red"
}

6. 验证完全更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
GET my_index/_search
// output
{
...
"hits": [
{
"_index": "my_index",
"_id": "1",
"_score": 1,
"_source": {
"item_id": "1",
"name": "item1_update_by_put",
"color": "red"
}
},
{
"_index": "my_index",
"_id": "2",
"_score": 1,
"_source": {
"item_id": "2",
"name": "item2_updated",
"color": "blue"
}
}
]
}

7. 删除文档

1
2
DELETE my_index/_doc/1
DELETE my_index/_doc/2

8. 验证删除文档

1
2
3
4
5
6
7
8
GET my_index/_search

// output
{
...
"hits": []
}

ES如何在Lucene的基础上实现基于主键(_id)的删改

  1. ES 使用_id的内部字段作为文档的主键,这个_id可以用户指定,上面的例子中item_id就是对应了_id,Lucene中没有主键的概念,所以需要使用_id 作为一个独特的Term的维持文档的唯一性, 后续的更新, 删除也是和_id对应的Term绑定. Lucene的创建操作不具备幂等性(addDocument)指定了Term之后,这个Term下可以关联N个document,不具备唯一性.
  2. ES 在部分字段的更新中,自己封装了一层获取原始文档的操作,之后使用update的方式更新Lucene.
  3. ES 在删除文档操作中,使用_id对应的Term 去调用Lucene的API.
  4. ES 在创建文档操作中,PUT 相同_id的文档 同样能保持唯一性, 通常情况下ES也是用Lucene update的方式实现创建的请求.

总结

  1. Lucene整体是一个Append Only的存储引擎,且没有主键的概念.
  2. ES 本身封装了一系列的操作使得整个CRUD操作更加方便使用,这也不可避免的带了一些额外的开销,通过理解这些操作的底层原理,有助于我们做出一些最佳实践的选择.