This is an automated email from the ASF dual-hosted git repository.
qingwzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 45b196954 feat: support vector store. (#637)
45b196954 is described below
commit 45b19695464ac1f7fecaf99a4281409320513e99
Author: Haodong Tang <[email protected]>
AuthorDate: Wed Dec 31 15:31:29 2025 +0800
feat: support vector store. (#637)
Co-authored-by: haodong.thd <[email protected]>
---
.github/workflows/ci.yml | 3 +-
.../{ => geaflow-store-vector}/pom.xml | 32 +--
.../geaflow/store/lucene/GraphVectorIndex.java | 192 ++++++++++++++++
.../apache/geaflow/store/lucene/IVectorIndex.java | 30 +++
.../geaflow/store/lucene/GraphVectorIndexTest.java | 158 +++++++++++++
.../apache/geaflow/store/lucene/LuceneTest.java | 244 +++++++++++++++++++++
geaflow/geaflow-plugins/geaflow-store/pom.xml | 1 +
geaflow/pom.xml | 8 +
8 files changed, 640 insertions(+), 28 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 0ef466df8..582cccde3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -78,4 +78,5 @@ jobs:
version: "21.7"
- name: Build and Test On JDK 8
- run: mvn -B -e clean test -Pjdk8 -Duser.timezone=Asia/Shanghai
-Dlog4j.configuration="log4j.rootLogger=WARN, stdout"
+ run: mvn -B -e clean test -Pjdk8 -pl
!geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector
+ -Duser.timezone=Asia/Shanghai -Dlog4j.configuration="log4j
.rootLogger=WARN, stdout"
diff --git a/geaflow/geaflow-plugins/geaflow-store/pom.xml
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/pom.xml
similarity index 60%
copy from geaflow/geaflow-plugins/geaflow-store/pom.xml
copy to geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/pom.xml
index b6979332a..0ad55b5f6 100644
--- a/geaflow/geaflow-plugins/geaflow-store/pom.xml
+++ b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/pom.xml
@@ -21,41 +21,19 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>geaflow-plugins</artifactId>
+ <artifactId>geaflow-store</artifactId>
<groupId>org.apache.geaflow</groupId>
<version>0.8.0-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>geaflow-store</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>geaflow-store-api</module>
- <module>geaflow-store-memory</module>
- <module>geaflow-store-rocksdb</module>
- <module>geaflow-store-redis</module>
- <module>geaflow-store-jdbc</module>
- <module>geaflow-store-paimon</module>
- </modules>
+ <artifactId>geaflow-store-vector</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-model</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-state-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-state-api</artifactId>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/GraphVectorIndex.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/GraphVectorIndex.java
new file mode 100644
index 000000000..ed49e667c
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/GraphVectorIndex.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.lucene;
+
+import java.io.IOException;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.KnnVectorField;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnVectorQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.ByteBuffersDirectory;
+import org.apache.lucene.store.Directory;
+
+/**
+ * Graph vector index implementation using Apache Lucene.
+ *
+ * <p><strong>Thread Safety Note:</strong> This class is <strong>not
thread-safe</strong>.
+ * All public methods in this class are <strong>not designed for concurrent
access</strong>.
+ * External synchronization is required if instances of this class may be
accessed
+ * from multiple threads.</p>
+ *
+ * <p>Specifically:</p>
+ * <ul>
+ * <li>{@link #addVectorIndex} uses a non-thread-safe {@link
IndexWriter}</li>
+ * <li>{@link #searchVectorIndex} may see inconsistent data during
concurrent writes</li>
+ * <li>The underlying {@link ByteBuffersDirectory} is not designed for
multi-threaded access</li>
+ * </ul>
+ *
+ * @param <K> the type of keys used to identify vectors
+ */
+public class GraphVectorIndex<K> implements IVectorIndex<K> {
+
+ private final Directory directory;
+ private final IndexWriter writer;
+ private final Class<K> keyClass;
+
+ private static final String KEY_FIELD_NAME = "key_field";
+
+ public GraphVectorIndex(Class<K> keyClas) {
+ try {
+ this.directory = new ByteBuffersDirectory();
+ StandardAnalyzer analyzer = new StandardAnalyzer();
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ this.writer = new IndexWriter(directory, config);
+ this.keyClass = keyClas;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to initialize
GraphVectorIndex", e);
+ }
+ }
+
+ /**
+ * Adds a vector to the index with the given key.
+ *
+ * <p><strong>Note:</strong> This method is <strong>not
thread-safe</strong>.
+ * The underlying {@link IndexWriter} is not designed for concurrent
access.
+ * External synchronization is required if this method may be called from
+ * multiple threads.</p>
+ *
+ * @param isVertex whether this is a vertex (unused in current
implementation)
+ * @param key the key associated with the vector
+ * @param fieldName the name of the vector field in the index
+ * @param vector the vector data to be indexed
+ * @throws RuntimeException if an I/O error occurs during indexing
+ */
+ @Override
+ public void addVectorIndex(boolean isVertex, K key, String fieldName,
float[] vector) {
+ try {
+ // Create document
+ Document doc = new Document();
+
+ // Select different Field based on the type of K
+ if (key instanceof Float) {
+ doc.add(new StoredField(KEY_FIELD_NAME, (float) key));
+ } else if (key instanceof Long) {
+ doc.add(new StoredField(KEY_FIELD_NAME, (long) key));
+ } else if (key instanceof Integer) {
+ doc.add(new StoredField(KEY_FIELD_NAME, (int) key));
+ } else if (key instanceof String) {
+ doc.add(new TextField(KEY_FIELD_NAME, (String) key,
Field.Store.YES));
+ } else {
+ throw new IllegalArgumentException("Unsupported key type: " +
key.getClass().getName());
+ }
+
+ // Add vector field
+ doc.add(new KnnVectorField(fieldName, vector));
+
+ // Add document to index
+ writer.addDocument(doc);
+
+ // Commit write operation
+ writer.commit();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to add vector index", e);
+ }
+ }
+
+ /**
+ * Searches for the closest vector in the index.
+ *
+ * <p><strong>Note:</strong> This method is <strong>not
thread-safe</strong>.
+ * The underlying index access is not synchronized with write operations.
+ * External synchronization is required if this method may be called
concurrently
+ * with {@link #addVectorIndex}.</p>
+ *
+ * @param isVertex whether this is a vertex (unused in current
implementation)
+ * @param fieldName the name of the vector field to search
+ * @param vector the query vector
+ * @param topK the number of top results to return
+ * @return the key associated with the closest vector
+ * @throws RuntimeException if an I/O error occurs during search
+ */
+ @Override
+ public K searchVectorIndex(boolean isVertex, String fieldName, float[]
vector, int topK) {
+ try {
+ // Open index reader
+ IndexReader reader = DirectoryReader.open(directory);
+ IndexSearcher searcher = new IndexSearcher(reader);
+
+ // Create KNN vector query
+ KnnVectorQuery knnQuery = new KnnVectorQuery(fieldName, vector,
topK);
+
+ // Execute search
+ TopDocs topDocs = searcher.search(knnQuery, topK);
+
+ Document firstDoc = searcher.doc(topDocs.scoreDocs[0].doc);
+
+ K result;
+ if (keyClass == String.class) {
+ String value = firstDoc.get(KEY_FIELD_NAME);
+ result = (K) value;
+ } else if (keyClass == Long.class) {
+ Number value =
firstDoc.getField(KEY_FIELD_NAME).numericValue();
+ result = (K) Long.valueOf(value.longValue());
+ } else if (keyClass == Integer.class) {
+ Number value =
firstDoc.getField(KEY_FIELD_NAME).numericValue();
+ result = (K) Integer.valueOf(value.intValue());
+ } else if (keyClass == Float.class) {
+ Number value =
firstDoc.getField(KEY_FIELD_NAME).numericValue();
+ result = (K) Float.valueOf(value.floatValue());
+ } else {
+ throw new IllegalArgumentException("Unsupported key type: " +
keyClass.getName());
+ }
+
+ reader.close();
+
+ return result;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to search vector index", e);
+ }
+ }
+
+ /**
+ * Close index writer and directory resources.
+ */
+ public void close() {
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ if (directory != null) {
+ directory.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to close resources", e);
+ }
+ }
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/IVectorIndex.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/IVectorIndex.java
new file mode 100644
index 000000000..333447de4
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/main/java/org/apache/geaflow/store/lucene/IVectorIndex.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.lucene;
+
+// Vector index interface, defining basic operations of vector index
+public interface IVectorIndex<K> {
+
+ // Add vector index
+ void addVectorIndex(boolean isVertex, K key, String fieldName, float[]
vector);
+
+ // Search vector index
+ K searchVectorIndex(boolean isVertex, String fieldName, float[] vector,
int topK);
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/GraphVectorIndexTest.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/GraphVectorIndexTest.java
new file mode 100644
index 000000000..3231ebe7c
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/GraphVectorIndexTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.lucene;
+
+import static org.testng.Assert.assertEquals;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class GraphVectorIndexTest {
+
+ private GraphVectorIndex<String> stringIndex;
+ private GraphVectorIndex<Long> longIndex;
+ private GraphVectorIndex<Integer> intIndex;
+ private GraphVectorIndex<Float> floatIndex;
+
+ @BeforeMethod
+ public void setUp() {
+ stringIndex = new GraphVectorIndex<>(String.class);
+ longIndex = new GraphVectorIndex<>(Long.class);
+ intIndex = new GraphVectorIndex<>(Integer.class);
+ floatIndex = new GraphVectorIndex<>(Float.class);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ stringIndex.close();
+ longIndex.close();
+ intIndex.close();
+ floatIndex.close();
+ }
+
+ @Test
+ public void testAddAndSearchStringKey() {
+ String key = "vertex_1";
+ String fieldName = "embedding";
+ float[] vector = {0.1f, 0.2f, 0.3f, 0.4f};
+
+ // Add vector index
+ stringIndex.addVectorIndex(true, key, fieldName, vector);
+
+ // Search vector index
+ String result = stringIndex.searchVectorIndex(true, fieldName, vector,
1);
+
+ assertEquals(result, key);
+ }
+
+ @Test
+ public void testAddAndSearchLongKey() {
+ Long key = 12345L;
+ String fieldName = "embedding";
+ float[] vector = {0.5f, 0.6f, 0.7f, 0.8f};
+
+ // Add vector index
+ longIndex.addVectorIndex(false, key, fieldName, vector);
+
+ // Search vector index
+ Long result = longIndex.searchVectorIndex(false, fieldName, vector, 1);
+
+ assertEquals(result, key);
+ }
+
+ @Test
+ public void testAddAndSearchIntegerKey() {
+ Integer key = 999;
+ String fieldName = "features";
+ float[] vector = {0.9f, 0.8f, 0.7f, 0.6f};
+
+ // Add vector index
+ intIndex.addVectorIndex(true, key, fieldName, vector);
+
+ // Search vector index
+ Integer result = intIndex.searchVectorIndex(true, fieldName, vector,
1);
+
+ assertEquals(result, key);
+ }
+
+ @Test
+ public void testAddAndSearchFloatKey() {
+ Float key = 3.14f;
+ String fieldName = "weights";
+ float[] vector = {0.2f, 0.4f, 0.6f, 0.8f};
+
+ // Add vector index
+ floatIndex.addVectorIndex(false, key, fieldName, vector);
+
+ // Search vector index
+ Float result = floatIndex.searchVectorIndex(false, fieldName, vector,
1);
+
+ assertEquals(result, key);
+ }
+
+ @Test
+ public void testVectorSimilaritySearch() {
+ String fieldName = "similarity_test";
+
+ // Add several vectors with different similarities
+ stringIndex.addVectorIndex(true, "doc1", fieldName, new float[]{1.0f,
0.0f, 0.0f, 0.0f});
+ stringIndex.addVectorIndex(true, "doc2", fieldName, new float[]{0.8f,
0.2f, 0.0f, 0.0f});
+ stringIndex.addVectorIndex(true, "doc3", fieldName, new float[]{0.0f,
0.0f, 1.0f, 0.0f});
+
+ // Query using a vector identical to doc1
+ float[] queryVector = {1.0f, 0.0f, 0.0f, 0.0f};
+ String result = stringIndex.searchVectorIndex(true, fieldName,
queryVector, 1);
+
+ // Should return the most similar document
+ assertEquals(result, "doc1");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testUnsupportedKeyType() {
+ // Use unsupported key type
+ Double key = 1.23;
+ GraphVectorIndex<Double> doubleIndex = new
GraphVectorIndex<>(Double.class);
+
+ try {
+ doubleIndex.addVectorIndex(true, key, "test_field", new
float[]{0.1f, 0.2f});
+ } finally {
+ doubleIndex.close();
+ }
+ }
+
+ @Test
+ public void testMultipleVectorsSameKeyDifferentFields() {
+ String key = "multi_field_vertex";
+ float[] vector1 = {0.1f, 0.2f, 0.3f, 0.4f};
+ float[] vector2 = {0.5f, 0.6f, 0.7f, 0.8f};
+
+ // Add vectors for the same key in different fields
+ stringIndex.addVectorIndex(true, key, "field1", vector1);
+ stringIndex.addVectorIndex(true, key, "field2", vector2);
+
+ // Search different fields separately
+ String result1 = stringIndex.searchVectorIndex(true, "field1",
vector1, 1);
+ String result2 = stringIndex.searchVectorIndex(true, "field2",
vector2, 1);
+
+ assertEquals(result1, key);
+ assertEquals(result2, key);
+ }
+}
diff --git
a/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/LuceneTest.java
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/LuceneTest.java
new file mode 100644
index 000000000..67c5304da
--- /dev/null
+++
b/geaflow/geaflow-plugins/geaflow-store/geaflow-store-vector/src/test/java/org/apache/geaflow/store/lucene/LuceneTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.store.lucene;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.KnnVectorField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.KnnVectorQuery;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.ByteBuffersDirectory;
+import org.apache.lucene.store.Directory;
+import org.testng.annotations.Test;
+
+public class LuceneTest {
+
+ @Test
+ public void test() throws IOException {
+ // Use ByteBuffersDirectory instead of RAMDirectory
+ Directory directory = new ByteBuffersDirectory();
+ StandardAnalyzer analyzer = new StandardAnalyzer();
+
+ // Create index writer configuration
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ IndexWriter writer = new IndexWriter(directory, config);
+
+ // Add documents (including vector fields)
+ addDocuments(writer);
+
+ // Commit and close writer
+ writer.commit();
+ writer.close();
+
+ // Search example
+ searchDocuments(directory, analyzer);
+
+ // Close directory
+ directory.close();
+ }
+
+
+ private static void addDocuments(IndexWriter writer) throws IOException {
+ // Document 1
+ Document doc1 = new Document();
+ doc1.add(new TextField("title", "机器学习基础", Field.Store.YES));
+ doc1.add(new TextField("content", "机器学习是人工智能的重要分支", Field.Store.YES));
+ // Add vector field
+ float[] vector1 = {0.1f, 0.2f, 0.3f, 0.4f};
+ doc1.add(new KnnVectorField("vector_field", vector1));
+ writer.addDocument(doc1);
+
+ // Document 2
+ Document doc2 = new Document();
+ doc2.add(new TextField("title", "深度学习入门", Field.Store.YES));
+ doc2.add(new TextField("content", "深度学习使用神经网络进行学习", Field.Store.YES));
+ float[] vector2 = {0.2f, 0.3f, 0.4f, 0.5f};
+ doc2.add(new KnnVectorField("vector_field", vector2));
+ writer.addDocument(doc2);
+
+ // Document 3
+ Document doc3 = new Document();
+ doc3.add(new TextField("title", "自然语言处理", Field.Store.YES));
+ doc3.add(new TextField("content", "NLP是处理文本的技术", Field.Store.YES));
+ float[] vector3 = {0.15f, 0.25f, 0.35f, 0.45f};
+ doc3.add(new KnnVectorField("vector_field", vector3));
+ writer.addDocument(doc3);
+ }
+
+ private static void searchDocuments(Directory directory, StandardAnalyzer
analyzer) throws IOException {
+ IndexReader reader = DirectoryReader.open(directory);
+ IndexSearcher searcher = new IndexSearcher(reader);
+
+ // Create query vector
+ float[] queryVector = {0.12f, 0.22f, 0.32f, 0.42f};
+
+ // Execute KNN search
+ KnnVectorQuery knnQuery = new KnnVectorQuery("vector_field",
queryVector, 3);
+
+ // Execute search
+ TopDocs topDocs = searcher.search(knnQuery, 10);
+
+ // Verify search results
+ assertEquals(topDocs.scoreDocs.length, 3, "Should return 3 results");
+
+ // Verify results are sorted by relevance (scores from high to low)
+ for (int i = 0; i < topDocs.scoreDocs.length - 1; i++) {
+ assertTrue(topDocs.scoreDocs[i].score >= topDocs.scoreDocs[i +
1].score,
+ "Results should be sorted by score from high to low");
+ }
+
+ // Verify each result has expected fields
+ for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
+ Document doc = searcher.doc(scoreDoc.doc);
+ assertNotNull(doc.get("title"), "Title field should not be null");
+ assertNotNull(doc.get("content"), "Content field should not be
null");
+ }
+
+ reader.close();
+ }
+
+ // Add more test cases to test Lucene vector functionality
+
+ @Test
+ public void testVectorSearchAccuracy() throws IOException {
+ Directory directory = new ByteBuffersDirectory();
+ StandardAnalyzer analyzer = new StandardAnalyzer();
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ IndexWriter writer = new IndexWriter(directory, config);
+
+ // Create test data to ensure vector similarity has obvious differences
+ Document doc1 = new Document();
+ doc1.add(new TextField("id", "1", Field.Store.YES));
+ // Exactly the same vector, should get the highest score
+ float[] vector1 = {1.0f, 0.0f, 0.0f, 0.0f};
+ doc1.add(new KnnVectorField("vector_field", vector1));
+ writer.addDocument(doc1);
+
+ Document doc2 = new Document();
+ doc2.add(new TextField("id", "2", Field.Store.YES));
+ // Partially similar vector
+ float[] vector2 = {0.8f, 0.2f, 0.0f, 0.0f};
+ doc2.add(new KnnVectorField("vector_field", vector2));
+ writer.addDocument(doc2);
+
+ Document doc3 = new Document();
+ doc3.add(new TextField("id", "3", Field.Store.YES));
+ // Completely different vector, should get the lowest score
+ float[] vector3 = {0.0f, 0.0f, 1.0f, 0.0f};
+ doc3.add(new KnnVectorField("vector_field", vector3));
+ writer.addDocument(doc3);
+
+ writer.commit();
+ writer.close();
+
+ // Execute search
+ IndexReader reader = DirectoryReader.open(directory);
+ IndexSearcher searcher = new IndexSearcher(reader);
+
+ // Query vector is exactly the same as the first document
+ float[] queryVector = {1.0f, 0.0f, 0.0f, 0.0f};
+ KnnVectorQuery knnQuery = new KnnVectorQuery("vector_field",
queryVector, 3);
+ TopDocs topDocs = searcher.search(knnQuery, 10);
+
+ // Verify result count
+ assertEquals(topDocs.scoreDocs.length, 3);
+
+ // Verify the first result should be the document with id 1 (highest
score)
+ Document firstDoc = searcher.doc(topDocs.scoreDocs[0].doc);
+ assertEquals(firstDoc.get("id"), "1");
+
+ // Verify score order (first score should be highest)
+ assertTrue(topDocs.scoreDocs[0].score >= topDocs.scoreDocs[1].score);
+ assertTrue(topDocs.scoreDocs[1].score >= topDocs.scoreDocs[2].score);
+
+ reader.close();
+ directory.close();
+ }
+
+ @Test
+ public void testVectorFieldProperties() throws IOException {
+ // Test basic properties of vector fields
+ float[] vector = {0.1f, 0.2f, 0.3f, 0.4f};
+ KnnVectorField vectorField = new KnnVectorField("test_vector", vector);
+
+ // Verify field name
+ assertEquals(vectorField.name(), "test_vector");
+
+ // Verify field value
+ assertEquals(vectorField.vectorValue(), vector);
+ }
+
+ @Test
+ public void testMultipleVectorFields() throws IOException {
+ Directory directory = new ByteBuffersDirectory();
+ StandardAnalyzer analyzer = new StandardAnalyzer();
+ IndexWriterConfig config = new IndexWriterConfig(analyzer);
+ IndexWriter writer = new IndexWriter(directory, config);
+
+ // Create document with multiple vector fields
+ Document doc = new Document();
+ doc.add(new TextField("title", "多向量测试", Field.Store.YES));
+
+ // Add first vector field
+ float[] vector1 = {0.1f, 0.2f, 0.3f, 0.4f};
+ doc.add(new KnnVectorField("vector_field_1", vector1));
+
+ // Add second vector field
+ float[] vector2 = {0.5f, 0.6f, 0.7f, 0.8f};
+ doc.add(new KnnVectorField("vector_field_2", vector2));
+
+ writer.addDocument(doc);
+ writer.commit();
+ writer.close();
+
+ // Verify document in index
+ IndexReader reader = DirectoryReader.open(directory);
+ IndexSearcher searcher = new IndexSearcher(reader);
+
+ assertEquals(reader.numDocs(), 1);
+
+ // Test search on first vector field
+ KnnVectorQuery query1 = new KnnVectorQuery("vector_field_1", vector1,
1);
+ TopDocs results1 = searcher.search(query1, 10);
+ assertEquals(results1.scoreDocs.length, 1);
+
+ // Test search on second vector field
+ KnnVectorQuery query2 = new KnnVectorQuery("vector_field_2", vector2,
1);
+ TopDocs results2 = searcher.search(query2, 10);
+ assertEquals(results2.scoreDocs.length, 1);
+
+ reader.close();
+ directory.close();
+ }
+
+}
diff --git a/geaflow/geaflow-plugins/geaflow-store/pom.xml
b/geaflow/geaflow-plugins/geaflow-store/pom.xml
index b6979332a..252d6196d 100644
--- a/geaflow/geaflow-plugins/geaflow-store/pom.xml
+++ b/geaflow/geaflow-plugins/geaflow-store/pom.xml
@@ -38,6 +38,7 @@
<module>geaflow-store-redis</module>
<module>geaflow-store-jdbc</module>
<module>geaflow-store-paimon</module>
+ <module>geaflow-store-vector</module>
</modules>
<dependencies>
diff --git a/geaflow/pom.xml b/geaflow/pom.xml
index 10113e3a3..bd2c74302 100644
--- a/geaflow/pom.xml
+++ b/geaflow/pom.xml
@@ -90,6 +90,7 @@
<aws-crt.version>0.28.10</aws-crt.version>
<org.jetbrains.annotations>13.0</org.jetbrains.annotations>
<javax-api.version>1.3.2</javax-api.version>
+ <lucene-core.version>9.8.0</lucene-core.version>
</properties>
<modules>
@@ -788,6 +789,13 @@
<version>${org.jetbrains.annotations}</version>
<scope>compile</scope>
</dependency>
+
+ <!-- lucene -->
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>${lucene-core.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]