alexeykudinkin commented on code in PR #5470:
URL: https://github.com/apache/hudi/pull/5470#discussion_r924811553


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -24,31 +24,66 @@
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Arrays;
+
 /**
- * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} 
and keeps meta columns locally. But the {@link InternalRow}
- * does include the meta columns as well just that {@link HoodieInternalRow} 
will intercept queries for meta columns and serve from its
- * copy rather than fetching from {@link InternalRow}.
+ * Hudi internal implementation of the {@link InternalRow} allowing to extend 
arbitrary
+ * {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
+ *
+ * Capable of overlaying meta-fields in both cases: whether original {@link 
#row} contains
+ * meta columns or not. This allows to handle following use-cases allowing to 
avoid any
+ * manipulation (reshuffling) of the source row, by simply creating new 
instance
+ * of {@link HoodieInternalRow} with all the meta-values provided
+ *
+ * <ul>
+ *   <li>When meta-fields need to be prepended to the source {@link 
InternalRow}</li>
+ *   <li>When meta-fields need to be updated w/in the source {@link 
InternalRow}
+ *   ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently 
does not
+ *   allow in-place updates due to its memory layout)</li>
+ * </ul>
  */
 public class HoodieInternalRow extends InternalRow {
 
-  private String commitTime;
-  private String commitSeqNumber;
-  private String recordKey;
-  private String partitionPath;
-  private String fileName;
-  private InternalRow row;
-
-  public HoodieInternalRow(String commitTime, String commitSeqNumber, String 
recordKey, String partitionPath,
-      String fileName, InternalRow row) {
-    this.commitTime = commitTime;
-    this.commitSeqNumber = commitSeqNumber;
-    this.recordKey = recordKey;
-    this.partitionPath = partitionPath;
-    this.fileName = fileName;
+  /**
+   * Collection of meta-fields as defined by {@link 
HoodieRecord#HOODIE_META_COLUMNS}
+   */
+  private final UTF8String[] metaFields;
+  private final InternalRow row;
+
+  /**
+   * Specifies whether source {@link #row} contains meta-fields
+   */
+  private final boolean containsMetaFields;
+
+  public HoodieInternalRow(UTF8String commitTime,
+                           UTF8String commitSeqNumber,
+                           UTF8String recordKey,
+                           UTF8String partitionPath,
+                           UTF8String fileName,
+                           InternalRow row,
+                           boolean containsMetaFields) {
+    this.metaFields = new UTF8String[] {
+        commitTime,
+        commitSeqNumber,
+        recordKey,
+        partitionPath,
+        fileName
+    };
+
     this.row = row;
+    this.containsMetaFields = containsMetaFields;

Review Comment:
   Am gonna update the docs to make it more clear



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -24,31 +24,66 @@
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Arrays;
+
 /**
- * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} 
and keeps meta columns locally. But the {@link InternalRow}
- * does include the meta columns as well just that {@link HoodieInternalRow} 
will intercept queries for meta columns and serve from its
- * copy rather than fetching from {@link InternalRow}.
+ * Hudi internal implementation of the {@link InternalRow} allowing to extend 
arbitrary
+ * {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
+ *
+ * Capable of overlaying meta-fields in both cases: whether original {@link 
#row} contains
+ * meta columns or not. This allows to handle following use-cases allowing to 
avoid any
+ * manipulation (reshuffling) of the source row, by simply creating new 
instance
+ * of {@link HoodieInternalRow} with all the meta-values provided
+ *
+ * <ul>
+ *   <li>When meta-fields need to be prepended to the source {@link 
InternalRow}</li>
+ *   <li>When meta-fields need to be updated w/in the source {@link 
InternalRow}
+ *   ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently 
does not
+ *   allow in-place updates due to its memory layout)</li>
+ * </ul>
  */
 public class HoodieInternalRow extends InternalRow {
 
-  private String commitTime;
-  private String commitSeqNumber;
-  private String recordKey;
-  private String partitionPath;
-  private String fileName;
-  private InternalRow row;
-
-  public HoodieInternalRow(String commitTime, String commitSeqNumber, String 
recordKey, String partitionPath,
-      String fileName, InternalRow row) {
-    this.commitTime = commitTime;
-    this.commitSeqNumber = commitSeqNumber;
-    this.recordKey = recordKey;
-    this.partitionPath = partitionPath;
-    this.fileName = fileName;
+  /**
+   * Collection of meta-fields as defined by {@link 
HoodieRecord#HOODIE_META_COLUMNS}
+   */
+  private final UTF8String[] metaFields;
+  private final InternalRow row;
+
+  /**
+   * Specifies whether source {@link #row} contains meta-fields
+   */
+  private final boolean containsMetaFields;
+
+  public HoodieInternalRow(UTF8String commitTime,
+                           UTF8String commitSeqNumber,
+                           UTF8String recordKey,
+                           UTF8String partitionPath,
+                           UTF8String fileName,
+                           InternalRow row,
+                           boolean containsMetaFields) {
+    this.metaFields = new UTF8String[] {
+        commitTime,
+        commitSeqNumber,
+        recordKey,
+        partitionPath,
+        fileName
+    };
+
     this.row = row;
+    this.containsMetaFields = containsMetaFields;

Review Comment:
   There's some confusion: `containsMetaFields` relates to whether inner row 
contains the meta-fields itself. However, HIR will always override the 
meta-fields by overlaying on top of whatever the source row contains (this is 
necessary b/c `UnsafeRow` can't be updated)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {

Review Comment:
   Correct. It's a simplified version converted into Scala (to handle RDDs)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following 
steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas 
partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = 
config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, 
schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+
+          // TODO use mutable row, avoid re-allocating
+          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
+        }
+      }
+
+    val metaFields = Seq(

Review Comment:
   Good call. The reason i didn't do it in the first place was b/c order is 
critical here, and even though we're using a list, i didn't want this 
constraint to be instead obscured in other class (where order actually might 
not matter at all)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following 
steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas 
partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = 
config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, 
schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+
+          // TODO use mutable row, avoid re-allocating
+          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, 
partitionPath, filename, row, false)
+        }
+      }
+
+    val metaFields = Seq(
+      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
+
+    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val updatedDF = HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, 
prependedRdd, updatedSchema)
+
+    if (!populateMetaFields) {
+      updatedDF
+    } else {
+      val trimmedDF = if (dropPartitionColumns) {
+        val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, 
new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+        val partitionPathFields = keyGenerator.getPartitionPathFields.asScala
+        val nestedPartitionPathFields = partitionPathFields.filter(f => 
f.contains('.'))
+        if (nestedPartitionPathFields.nonEmpty) {
+          logWarning(s"Can not drop nested partition path fields: 
$nestedPartitionPathFields")
+        }
+
+        val partitionPathCols = partitionPathFields -- 
nestedPartitionPathFields
+        updatedDF.drop(partitionPathCols: _*)
+      } else {
+        updatedDF
+      }
+
+      val dedupedDF = if (config.shouldCombineBeforeInsert) {
+        dedupeRows(trimmedDF, config.getPreCombineField, isGlobalIndex)
+      } else {
+        trimmedDF
+      }
+
+      partitioner.repartitionRecords(dedupedDF, 
config.getBulkInsertShuffleParallelism)
+    }
+  }
+
+  private def dedupeRows(df: DataFrame, preCombineFieldRef: String, 
isGlobalIndex: Boolean): DataFrame = {
+    val recordKeyMetaFieldOrd = 
df.schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    val partitionPathMetaFieldOrd = 
df.schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
+    // NOTE: Pre-combine field could be a nested field
+    val preCombineFieldPath = composeNestedFieldPath(df.schema, 
preCombineFieldRef)
+
+    val dedupedRdd = df.queryExecution.toRdd
+      .map { row =>
+        val rowKey = if (isGlobalIndex) {
+          row.getString(recordKeyMetaFieldOrd)
+        } else {
+          val partitionPath = row.getString(partitionPathMetaFieldOrd)
+          val recordKey = row.getString(recordKeyMetaFieldOrd)
+          s"$partitionPath:$recordKey"
+        }
+        // NOTE: It's critical whenever we keep the reference to the row, to 
make a copy
+        //       since Spark might be providing us with a mutable copy 
(updated during the iteration)
+        (rowKey, row.copy())

Review Comment:
   We only can get away w/o copying when we do one-pass (streaming-like) 
processing. If at any point we need to hold a reference to it -- we will have 
to make a copy (it's gonna fail otherwise)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -73,18 +75,15 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
-  public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : 
recordKey;
-    } else {
-      minRecordKey = recordKey;
+  public void add(UTF8String recordKey) {
+    this.bloomFilter.add(recordKey.getBytes());

Review Comment:
   Bloom filter always ingest UTF8 (Java by default encodes in UTF16)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java:
##########
@@ -37,7 +38,7 @@ public interface HoodieInternalRowFileWriter {
    *
    * @throws IOException on any exception while writing.
    */
-  void writeRow(String key, InternalRow row) throws IOException;
+  void writeRow(UTF8String key, InternalRow row) throws IOException;

Review Comment:
   Correct -- to avoid conversion b/w String and UTF8String



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -73,18 +75,15 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
-  public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : 
recordKey;
-    } else {
-      minRecordKey = recordKey;
+  public void add(UTF8String recordKey) {
+    this.bloomFilter.add(recordKey.getBytes());
+
+    if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
+      minRecordKey =  recordKey.copy();
     }
 
-    if (maxRecordKey != null) {
-      maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : 
recordKey;
-    } else {
-      maxRecordKey = recordKey;
+    if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
+      maxRecordKey = recordKey.copy();

Review Comment:
   Good catch! Should have been `clone` instead.
   
   We need to `clone` here b/c `UTF8String` doesn't do copying by default -- 
instead it would point into the holding (record's) buffer, and since such 
buffer could be mutable we have to make a copy of it in that case



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {

Review Comment:
   Great eye! We don't need to check it here: we only use `containsMetaFields` 
to understand whether the source row *already* contains meta-fields which 
affects how we index within the `HoodieInternalRow` (meta-fields are always 
prepended, and we always read meta-fields from HIR and never from the source 
row)



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java:
##########
@@ -30,7 +30,17 @@
 public class HoodieTimer {
 
   // Ordered stack of TimeInfo's to make sure stopping the timer returns the 
correct elapsed time
-  Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+  private final Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+
+  public HoodieTimer() {
+    this(false);
+  }
+
+  public HoodieTimer(boolean shouldStart) {
+    if (shouldStart) {
+      startTimer();
+    }
+  }

Review Comment:
   Old semantic is still preserved: it works as it have been, and just adds new 
way when you don't need to invoke `startTimer` explicitly



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {

Review Comment:
   This is our code. We're now testing across all the major versions we're 
running against so this is pretty well-tested. Also, the API we're using here 
is fairly stable and doesn't change much b/w Spark versions



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -88,13 +91,21 @@ public BulkInsertDataInternalWriterHelper(HoodieTable 
hoodieTable, HoodieWriteCo
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+
     if (!populateMetaFields) {
       this.keyGeneratorOpt = getKeyGenerator(writeConfig.getProps());
-      if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof 
SimpleKeyGenerator) {
-        simpleKeyGen = true;
-        simplePartitionFieldIndex = (Integer) 
structType.getFieldIndex((keyGeneratorOpt.get()).getPartitionPathFields().get(0)).get();
-        simplePartitionFieldDataType = 
structType.fields()[simplePartitionFieldIndex].dataType();
-      }
+    } else {
+      this.keyGeneratorOpt = Option.empty();
+    }
+
+    if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof 
SimpleKeyGenerator) {
+      this.simpleKeyGen = true;
+      this.simplePartitionFieldIndex = (Integer) 
structType.getFieldIndex(keyGeneratorOpt.get().getPartitionPathFields().get(0)).get();
+      this.simplePartitionFieldDataType = 
structType.fields()[simplePartitionFieldIndex].dataType();
+    } else {
+      this.simpleKeyGen = false;
+      this.simplePartitionFieldIndex = -1;
+      this.simplePartitionFieldDataType = null;

Review Comment:
   We actually do not (it was done for perf reasons before). This is addressed 
in #5523



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -102,20 +114,20 @@ public String getPartitionPath(InternalRow internalRow, 
StructType structType) {
       return 
RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, 
getPartitionPathFields(),
           hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
-      throw new HoodieIOException("Conversion of InternalRow to Row failed 
with exception " + e);
+      throw new HoodieException("Conversion of InternalRow to Row failed with 
exception", e);
     }
   }
 
   void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
+      this.structType = structType;

Review Comment:
   Nope



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type 
(%s), either UTF8String or String are expected", ordinal, 
value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      return metaFields[ordinal];
+    }
+    return row.getUTF8String(rebaseOrdinal(ordinal));
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
     if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return null == getMetaColumnVal(ordinal);
+      validateMetaFieldDataType(dataType);
+      return metaFields[ordinal];
     }
-    return row.isNullAt(ordinal);
+    return row.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
-    return row.getBoolean(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+    return row.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
-    return row.getByte(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte.class);
+    return row.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
-    return row.getShort(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Short.class);
+    return row.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
-    return row.getInt(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Integer.class);
+    return row.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
-    return row.getLong(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Long.class);
+    return row.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
-    return row.getFloat(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Float.class);
+    return row.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
-    return row.getDouble(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Double.class);
+    return row.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return row.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getUTF8String(ordinal);
-  }
-
-  @Override
-  public String getString(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return new String(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getString(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-    return row.getBinary(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+    return row.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    return row.getInterval(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+    return row.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
-    return row.getStruct(ordinal, numFields);
+    ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+    return row.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
-    return row.getArray(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+    return row.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
-    return row.getMap(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, MapData.class);
+    return row.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+  public InternalRow copy() {
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), 
row.copy(), containsMetaFields);
+  }
+
+  private int rebaseOrdinal(int ordinal) {
+    // NOTE: In cases when source row does not contain meta fields, we will 
have to
+    //       rebase ordinal onto its indexes
+    return containsMetaFields ? ordinal : ordinal - metaFields.length;

Review Comment:
   Please check my comments above -- we always overlay meta-fields, since we 
need them to be mutable (they're being updated dynamically in writer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to