This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d4c3ba278d [spark] spark: lucene vector index write (#6876)
d4c3ba278d is described below
commit d4c3ba278db48a7ebc571ce97c5e9175a31fc021
Author: jerry <[email protected]>
AuthorDate: Thu Dec 25 10:41:32 2025 +0800
[spark] spark: lucene vector index write (#6876)
---
.github/workflows/utitcase-spark-4.x.yml | 4 +-
.../lucene/index/LuceneVectorIndexFactory.java | 27 ++++-
paimon-spark/paimon-spark-4.0/pom.xml | 7 ++
.../CreateGlobalVectorIndexProcedureTest.scala | 124 +++++++++++++++++++++
.../spark/globalindex/GlobalIndexBuilder.java | 3 +-
.../lucene/LuceneGlobalIndexBuilder.java | 35 ++++++
.../lucene/LuceneGlobalIndexBuilderFactory.java | 43 +++++++
...mon.spark.globalindex.GlobalIndexBuilderFactory | 1 +
8 files changed, 236 insertions(+), 8 deletions(-)
diff --git a/.github/workflows/utitcase-spark-4.x.yml
b/.github/workflows/utitcase-spark-4.x.yml
index 5b87780306..71938fd581 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -51,7 +51,7 @@ jobs:
distribution: 'temurin'
- name: Build Spark
- run: mvn -T 2C -B clean install -DskipTests -Pspark4,flink1
+ run: mvn -T 2C -B clean install -DskipTests
-Pspark4,flink1,paimon-lucene
- name: Test Spark
timeout-minutes: 60
@@ -65,6 +65,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13,"
done
test_modules="${test_modules%,}"
- mvn -T 2C -B verify -pl "${test_modules}"
-Duser.timezone=$jvm_timezone -Pspark4,flink1
+ mvn -T 2C -B verify -pl "${test_modules}"
-Duser.timezone=$jvm_timezone -Pspark4,flink1,paimon-lucene
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
index 7215410aff..6a643f6a62 100644
---
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
+++
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.lucene.index;
+import org.apache.paimon.data.InternalArray;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.FloatType;
@@ -38,20 +39,38 @@ public abstract class LuceneVectorIndexFactory {
}
}
- public abstract LuceneVectorIndex<?> create(long rowId, Object vector);
+ public abstract LuceneVectorIndex create(long rowId, Object vector);
/** Factory for creating LuceneFloatVectorIndex instances. */
public static class LuceneFloatVectorIndexFactory extends
LuceneVectorIndexFactory {
@Override
- public LuceneVectorIndex<?> create(long rowId, Object vector) {
- return new LuceneFloatVectorIndex(rowId, (float[]) vector);
+ public LuceneVectorIndex create(long rowId, Object fieldData) {
+ float[] vector;
+ if (fieldData instanceof float[]) {
+ vector = (float[]) fieldData;
+ } else if (fieldData instanceof InternalArray) {
+ vector = ((InternalArray) fieldData).toFloatArray();
+ } else {
+ throw new RuntimeException(
+ "Unsupported vector type: " +
fieldData.getClass().getName());
+ }
+ return new LuceneFloatVectorIndex(rowId, vector);
}
}
/** Factory for creating LuceneByteVectorIndex instances. */
public static class LuceneByteVectorIndexFactory extends
LuceneVectorIndexFactory {
@Override
- public LuceneVectorIndex<?> create(long rowId, Object vector) {
+ public LuceneVectorIndex create(long rowId, Object fieldData) {
+ byte[] vector;
+ if (fieldData instanceof byte[]) {
+ vector = (byte[]) fieldData;
+ } else if (fieldData instanceof InternalArray) {
+ vector = ((InternalArray) fieldData).toByteArray();
+ } else {
+ throw new RuntimeException(
+ "Unsupported vector type: " +
fieldData.getClass().getName());
+ }
return new LuceneByteVectorIndex(rowId, (byte[]) vector);
}
}
diff --git a/paimon-spark/paimon-spark-4.0/pom.xml
b/paimon-spark/paimon-spark-4.0/pom.xml
index 27b8b2439d..6765796468 100644
--- a/paimon-spark/paimon-spark-4.0/pom.xml
+++ b/paimon-spark/paimon-spark-4.0/pom.xml
@@ -116,6 +116,13 @@ under the License.
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-lucene</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
new file mode 100644
index 0000000000..b79c5ce9ba
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.utils.Range
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+class CreateGlobalVectorIndexProcedureTest extends
CreateGlobalIndexProcedureTest {
+ test("create lucene-vector-knn global index") {
+ import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
+ import java.util.ServiceLoader
+ import scala.collection.JavaConverters._
+
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values = (0 until 100)
+ .map(
+ i => s"($i, array(cast($i as float), cast(${i + 1} as float),
cast(${i + 2} as float)))")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "lucene-vector-knn")
+
+ assert(indexEntries.nonEmpty)
+ val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 100L)
+ }
+ }
+
+ test("create lucene-vector-knn global index with partition") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, v ARRAY<FLOAT>, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values = (0 until 65000)
+ .map(
+ i =>
+ s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i
+ 2} as float)), 'p0')")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 35000)
+ .map(
+ i =>
+ s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i
+ 2} as float)), 'p1')")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 22222)
+ .map(
+ i =>
+ s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i
+ 2} as float)), 'p0')")
+ .mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val indexEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "lucene-vector-knn")
+
+ assert(indexEntries.nonEmpty)
+ val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 122222L)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
index d8d3d8471a..e66db5b8b5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.IndexedSplit;
-import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
@@ -72,7 +71,7 @@ public abstract class GlobalIndexBuilder {
range.from, range.to, context.indexField().id(),
null, entry.meta());
IndexFileMeta indexFileMeta =
new IndexFileMeta(
- BitmapGlobalIndexerFactory.IDENTIFIER,
+ context.indexType(),
fileName,
fileSize,
range.to - range.from + 1,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
new file mode 100644
index 0000000000..78ec1606e1
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.lucene;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+
+/**
+ * Builder for creating lucene-vector-knn global indexes.
+ *
+ * <p>This implementation does not apply any custom transformations to the
input dataset, allowing
+ * the data to be processed as-is for lucene index creation.
+ */
+public class LuceneGlobalIndexBuilder extends GlobalIndexBuilder {
+
+ protected LuceneGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+ super(context);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..a7374be9b3
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.lucene;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+
+/**
+ * Factory for creating lucene-vector-knn global index builders.
+ *
+ * <p>This factory is automatically discovered via Java's ServiceLoader
mechanism.
+ */
+public class LuceneGlobalIndexBuilderFactory implements
GlobalIndexBuilderFactory {
+
+ private static final String IDENTIFIER = "lucene-vector-knn";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+ return new LuceneGlobalIndexBuilder(context);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
index 704354af89..ea4394add9 100644
---
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
+++
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -16,3 +16,4 @@
# limitations under the License.
org.apache.paimon.spark.globalindex.bitmap.BitmapGlobalIndexBuilderFactory
+org.apache.paimon.spark.globalindex.lucene.LuceneGlobalIndexBuilderFactory