This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d4c3ba278d [spark] spark: lucene vector index write (#6876)
d4c3ba278d is described below

commit d4c3ba278db48a7ebc571ce97c5e9175a31fc021
Author: jerry <[email protected]>
AuthorDate: Thu Dec 25 10:41:32 2025 +0800

    [spark] spark: lucene vector index write (#6876)
---
 .github/workflows/utitcase-spark-4.x.yml           |   4 +-
 .../lucene/index/LuceneVectorIndexFactory.java     |  27 ++++-
 paimon-spark/paimon-spark-4.0/pom.xml              |   7 ++
 .../CreateGlobalVectorIndexProcedureTest.scala     | 124 +++++++++++++++++++++
 .../spark/globalindex/GlobalIndexBuilder.java      |   3 +-
 .../lucene/LuceneGlobalIndexBuilder.java           |  35 ++++++
 .../lucene/LuceneGlobalIndexBuilderFactory.java    |  43 +++++++
 ...mon.spark.globalindex.GlobalIndexBuilderFactory |   1 +
 8 files changed, 236 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/utitcase-spark-4.x.yml 
b/.github/workflows/utitcase-spark-4.x.yml
index 5b87780306..71938fd581 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -51,7 +51,7 @@ jobs:
           distribution: 'temurin'
 
       - name: Build Spark
-        run:  mvn -T 2C -B clean install -DskipTests -Pspark4,flink1
+        run:  mvn -T 2C -B clean install -DskipTests 
-Pspark4,flink1,paimon-lucene
 
       - name: Test Spark
         timeout-minutes: 60
@@ -65,6 +65,6 @@ jobs:
           test_modules+="org.apache.paimon:paimon-spark-${suffix}_2.13,"
           done
           test_modules="${test_modules%,}"
-          mvn -T 2C -B verify -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone -Pspark4,flink1
+          mvn -T 2C -B verify -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone -Pspark4,flink1,paimon-lucene
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git 
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
 
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
index 7215410aff..6a643f6a62 100644
--- 
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
+++ 
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorIndexFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lucene.index;
 
+import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.FloatType;
@@ -38,20 +39,38 @@ public abstract class LuceneVectorIndexFactory {
         }
     }
 
-    public abstract LuceneVectorIndex<?> create(long rowId, Object vector);
+    public abstract LuceneVectorIndex create(long rowId, Object vector);
 
     /** Factory for creating LuceneFloatVectorIndex instances. */
     public static class LuceneFloatVectorIndexFactory extends 
LuceneVectorIndexFactory {
         @Override
-        public LuceneVectorIndex<?> create(long rowId, Object vector) {
-            return new LuceneFloatVectorIndex(rowId, (float[]) vector);
+        public LuceneVectorIndex create(long rowId, Object fieldData) {
+            float[] vector;
+            if (fieldData instanceof float[]) {
+                vector = (float[]) fieldData;
+            } else if (fieldData instanceof InternalArray) {
+                vector = ((InternalArray) fieldData).toFloatArray();
+            } else {
+                throw new RuntimeException(
+                        "Unsupported vector type: " + 
fieldData.getClass().getName());
+            }
+            return new LuceneFloatVectorIndex(rowId, vector);
         }
     }
 
     /** Factory for creating LuceneByteVectorIndex instances. */
     public static class LuceneByteVectorIndexFactory extends 
LuceneVectorIndexFactory {
         @Override
-        public LuceneVectorIndex<?> create(long rowId, Object vector) {
+        public LuceneVectorIndex create(long rowId, Object fieldData) {
+            byte[] vector;
+            if (fieldData instanceof byte[]) {
+                vector = (byte[]) fieldData;
+            } else if (fieldData instanceof InternalArray) {
+                vector = ((InternalArray) fieldData).toByteArray();
+            } else {
+                throw new RuntimeException(
+                        "Unsupported vector type: " + 
fieldData.getClass().getName());
+            }
             return new LuceneByteVectorIndex(rowId, (byte[]) vector);
         }
     }
diff --git a/paimon-spark/paimon-spark-4.0/pom.xml 
b/paimon-spark/paimon-spark-4.0/pom.xml
index 27b8b2439d..6765796468 100644
--- a/paimon-spark/paimon-spark-4.0/pom.xml
+++ b/paimon-spark/paimon-spark-4.0/pom.xml
@@ -116,6 +116,13 @@ under the License.
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-lucene</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
new file mode 100644
index 0000000000..b79c5ce9ba
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalVectorIndexProcedureTest.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.utils.Range
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+class CreateGlobalVectorIndexProcedureTest extends 
CreateGlobalIndexProcedureTest {
+  test("create lucene-vector-knn global index") {
+    import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
+    import java.util.ServiceLoader
+    import scala.collection.JavaConverters._
+
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values = (0 until 100)
+        .map(
+          i => s"($i, array(cast($i as float), cast(${i + 1} as float), 
cast(${i + 2} as float)))")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "lucene-vector-knn")
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100L)
+    }
+  }
+
+  test("create lucene-vector-knn global index with partition") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, v ARRAY<FLOAT>, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values = (0 until 65000)
+        .map(
+          i =>
+            s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i 
+ 2} as float)), 'p0')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000)
+        .map(
+          i =>
+            s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i 
+ 2} as float)), 'p1')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 22222)
+        .map(
+          i =>
+            s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i 
+ 2} as float)), 'p0')")
+        .mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'v', index_type => 'lucene-vector-knn', options => 'vector.dim=3')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val indexEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "lucene-vector-knn")
+
+      assert(indexEntries.nonEmpty)
+      val totalRowCount = indexEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 122222L)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
index d8d3d8471a..e66db5b8b5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
 import org.apache.paimon.globalindex.GlobalIndexWriter;
 import org.apache.paimon.globalindex.GlobalIndexer;
 import org.apache.paimon.globalindex.IndexedSplit;
-import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
@@ -72,7 +71,7 @@ public abstract class GlobalIndexBuilder {
                             range.from, range.to, context.indexField().id(), 
null, entry.meta());
             IndexFileMeta indexFileMeta =
                     new IndexFileMeta(
-                            BitmapGlobalIndexerFactory.IDENTIFIER,
+                            context.indexType(),
                             fileName,
                             fileSize,
                             range.to - range.from + 1,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
new file mode 100644
index 0000000000..78ec1606e1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.lucene;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+
+/**
+ * Builder for creating lucene-vector-knn global indexes.
+ *
+ * <p>This implementation does not apply any custom transformations to the 
input dataset, allowing
+ * the data to be processed as-is for lucene index creation.
+ */
+public class LuceneGlobalIndexBuilder extends GlobalIndexBuilder {
+
+    protected LuceneGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+        super(context);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..a7374be9b3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/lucene/LuceneGlobalIndexBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.lucene;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+
+/**
+ * Factory for creating lucene-vector-knn global index builders.
+ *
+ * <p>This factory is automatically discovered via Java's ServiceLoader 
mechanism.
+ */
+public class LuceneGlobalIndexBuilderFactory implements 
GlobalIndexBuilderFactory {
+
+    private static final String IDENTIFIER = "lucene-vector-knn";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+        return new LuceneGlobalIndexBuilder(context);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
index 704354af89..ea4394add9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
+++ 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -16,3 +16,4 @@
 # limitations under the License.
 
 org.apache.paimon.spark.globalindex.bitmap.BitmapGlobalIndexBuilderFactory
+org.apache.paimon.spark.globalindex.lucene.LuceneGlobalIndexBuilderFactory

Reply via email to