This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b9999a050 [spark] Make RowIdIndexFieldsExtractor Serializable
5b9999a050 is described below

commit 5b9999a050565206f69de1462901c6b844b2a29c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 7 15:47:50 2026 +0800

    [spark] Make RowIdIndexFieldsExtractor Serializable
---
 .../globalindex/DefaultGlobalIndexBuilder.java     |  2 ++
 .../globalindex/RowIdIndexFieldsExtractor.java     | 42 ++++++++++++++++------
 .../globalindex/btree/BTreeGlobalIndexBuilder.java |  2 ++
 3 files changed, 36 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index d8b5240faa..e0a6c0e353 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -45,6 +45,8 @@ import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toInde
 /** Default global index builder. */
 public class DefaultGlobalIndexBuilder implements Serializable {
 
+    private static final long serialVersionUID = 1L;
+
     private final FileStoreTable table;
     private final BinaryRow partition;
     private final RowType readType;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
index 9ed265b654..d3ca25a2cb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
@@ -22,37 +22,59 @@ import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.List;
 
 /** The extractor to get partition, index field and row id from records. */
-public class RowIdIndexFieldsExtractor {
+public class RowIdIndexFieldsExtractor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
-    private final Projection partitionProjection;
-    private final InternalRow.FieldGetter indexFieldGetter;
     private final int rowIdPos;
+    private final RowType readType;
+    private final List<String> partitionKeys;
+    private final String indexField;
+
+    private transient Projection lazyPartitionProjection;
+    private transient FieldGetter lazyIndexFieldGetter;
 
     public RowIdIndexFieldsExtractor(
             RowType readType, List<String> partitionKeys, String indexField) {
-        this.partitionProjection = CodeGenUtils.newProjection(readType, 
partitionKeys);
-        int indexFieldPos = readType.getFieldIndex(indexField);
-        this.indexFieldGetter =
-                
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
+        this.readType = readType;
+        this.partitionKeys = partitionKeys;
+        this.indexField = indexField;
         this.rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
     }
 
+    private Projection partitionProjection() {
+        if (lazyPartitionProjection == null) {
+            lazyPartitionProjection = CodeGenUtils.newProjection(readType, 
partitionKeys);
+        }
+        return lazyPartitionProjection;
+    }
+
+    private FieldGetter indexFieldGetter() {
+        if (lazyIndexFieldGetter == null) {
+            int indexFieldPos = readType.getFieldIndex(indexField);
+            lazyIndexFieldGetter =
+                    
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
+        }
+        return lazyIndexFieldGetter;
+    }
+
     public BinaryRow extractPartition(InternalRow record) {
-        // projection will reuse returning record, copy is necessary
-        return partitionProjection.apply(record).copy();
+        return partitionProjection().apply(record).copy();
     }
 
     @Nullable
     public Object extractIndexField(InternalRow record) {
-        return indexFieldGetter.getFieldOrNull(record);
+        return indexFieldGetter().getFieldOrNull(record);
     }
 
     public Long extractRowId(InternalRow record) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
index 1a76d8f63f..c6a69bd898 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -54,6 +54,8 @@ import static 
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toInde
  */
 public class BTreeGlobalIndexBuilder implements Serializable {
 
+    private static final long serialVersionUID = 1L;
+
     private static final double FLOATING = 1.2;
 
     private final FileStoreTable table;

Reply via email to