This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6428cb258366 perf(spark): Resolve drop-partition-columns projection 
once per writer instead of per row (#18972)
6428cb258366 is described below

commit 6428cb258366df5fdba79dad35c7c8b9933aa8a2
Author: voonhous <[email protected]>
AuthorDate: Sat Jun 13 12:04:56 2026 +0800

    perf(spark): Resolve drop-partition-columns projection once per writer 
instead of per row (#18972)
    
    BulkInsertDataInternalWriterHelper#write redid constant work for every
    row when hoodie.datasource.write.drop.partition.columns is enabled:
    resolving the config flag, instantiating a key generator via
    constructor reflection through getPartitionPathCols, recomputing the
    partition-column ordinals into a fresh HashSet, and round-tripping the
    whole row through toSeq/fromSeq (boxing every column).
    
    The flag is now resolved once in the constructor, and the retained
    (non-partition) field ordinals and types are computed once on the
    first write(). The lazy initialization keeps the partition-column
    resolution unreachable for the bucket-index subclasses, which override
    write() and never drop columns, and for tasks that write no rows,
    matching the previous reachability exactly. write() copies the
    retained fields into a fresh GenericInternalRow, which is
    value-identical to the previous toSeq/filter/fromSeq output.
---
 .../commit/BulkInsertDataInternalWriterHelper.java | 64 ++++++++++++++++------
 1 file changed, 46 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 57847faedb82..712a27f81833 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -32,6 +32,7 @@ import org.apache.hudi.util.JavaScalaConverters;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -69,6 +70,13 @@ public class BulkInsertDataInternalWriterHelper {
   protected final boolean simpleKeyGen;
   protected final int simplePartitionFieldIndex;
   protected final DataType simplePartitionFieldDataType;
+  protected final boolean shouldDropPartitionColumns;
+  // Ordinals and types of the non-partition fields, computed once on the 
first write() instead of
+  // in the constructor: bucket-index subclasses override write() and never 
drop columns, and the
+  // partition-column resolution must stay unreachable for them (and for tasks 
that write no rows)
+  // exactly as before. The helper is confined to a single task thread, so 
plain lazy init is safe.
+  private int[] retainedOrdinals;
+  private DataType[] retainedTypes;
   /**
    * NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
    *       conversion (deserialization) b/w {@link UTF8String} and {@link 
String}
@@ -114,6 +122,36 @@ public class BulkInsertDataInternalWriterHelper {
       this.simplePartitionFieldIndex = -1;
       this.simplePartitionFieldDataType = null;
     }
+
+    this.shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns();
+  }
+
+  /**
+   * Resolves the ordinals and types of the non-partition fields. The 
partition columns are a pure
+   * function of the write config and schema, both immutable for the helper's 
lifetime, so this
+   * runs once per helper instead of once per row (getPartitionPathCols 
instantiates a key
+   * generator reflectively).
+   */
+  private void initRetainedFields() {
+    List<String> partitionCols = 
JavaScalaConverters.convertScalaListToJavaList(
+        HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
+    Set<Integer> partitionIdx = new HashSet<>();
+    for (String col : partitionCols) {
+      partitionIdx.add(this.structType.fieldIndex(col));
+    }
+    int numRetained = structType.fields().length - partitionIdx.size();
+    int[] ordinals = new int[numRetained];
+    DataType[] types = new DataType[numRetained];
+    int retained = 0;
+    for (int i = 0; i < structType.fields().length; i++) {
+      if (!partitionIdx.contains(i)) {
+        ordinals[retained] = i;
+        types[retained] = structType.fields()[i].dataType();
+        retained++;
+      }
+    }
+    this.retainedOrdinals = ordinals;
+    this.retainedTypes = types;
   }
 
   public void write(InternalRow row) throws IOException {
@@ -126,27 +164,17 @@ public class BulkInsertDataInternalWriterHelper {
         lastKnownPartitionPath = partitionPath.clone();
       }
 
-      boolean shouldDropPartitionColumns = 
writeConfig.shouldDropPartitionColumns();
       if (shouldDropPartitionColumns) {
-        // Drop the partition columns from the row
-        List<String> partitionCols = 
JavaScalaConverters.convertScalaListToJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
-        Set<Integer> partitionIdx = new HashSet<>();
-        for (String col : partitionCols) {
-          partitionIdx.add(this.structType.fieldIndex(col));
+        if (retainedOrdinals == null) {
+          initRetainedFields();
         }
-
-        // Relies on InternalRow::toSeq(...) preserving the column ordering 
based on the supplied schema
-        List<Object> cols = 
JavaScalaConverters.convertScalaListToJavaList(row.toSeq(structType));
-        int idx = 0;
-        List<Object> newCols = new ArrayList<>();
-        for (Object o : cols) {
-          if (!partitionIdx.contains(idx)) {
-            newCols.add(o);
-          }
-          idx += 1;
+        // Drop the partition columns from the row by copying the retained 
fields; a fresh row is
+        // allocated per record so values keep the same aliasing behavior as 
InternalRow.fromSeq
+        Object[] values = new Object[retainedOrdinals.length];
+        for (int i = 0; i < retainedOrdinals.length; i++) {
+          values[i] = row.get(retainedOrdinals[i], retainedTypes[i]);
         }
-        InternalRow newRow = 
InternalRow.fromSeq(JavaScalaConverters.convertJavaListToScalaSeq(newCols));
-        handle.write(newRow);
+        handle.write(new GenericInternalRow(values));
       } else {
         handle.write(row);
       }

Reply via email to