nsivabalan commented on code in PR #8879:
URL: https://github.com/apache/hudi/pull/8879#discussion_r1222275056


##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -233,15 +238,25 @@ public static HoodieWriteResult 
doDeletePartitionsOperation(SparkRDDWriteClient
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable 
orderingVal, HoodieKey hKey,
-      String payloadClass) throws IOException {
+      String payloadClass, HoodieRecordLocation recordLocation) throws 
IOException {
     HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, 
gr, orderingVal);
-    return new HoodieAvroRecord<>(hKey, payload);
+
+    HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload);
+    if (recordLocation != null) {
+      record.setCurrentLocation(recordLocation);
+    }
+    return record;
   }
 
+  // AKL_TODO: check if this change is needed. Also validate change if needed.
   public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey 
hKey,
-                                                String payloadClass) throws 
IOException {
+                                                String payloadClass, 
HoodieRecordLocation recordLocation) throws IOException {

Review Comment:
   same here



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-    val dfWithoutMetaCols = 
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
+    val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false")

Review Comment:
   dfPrepped -> processedDf



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1160,21 +1171,29 @@ object HoodieSparkSqlWriter {
 
           // handle dropping partition columns
           it.map { avroRec =>
-            val processedRecord = if (shouldDropPartitionColumns) {
-              HoodieAvroUtils.rewriteRecord(avroRec, dataFileSchema)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = getKeyAndLocatorFromAvroRecord(keyGenerator, 
avroRec,
+              isPrepped)
+
+            val avroRecWithoutMeta: GenericRecord = if (isPrepped) {
+              HoodieAvroUtils.rewriteRecord(avroRec, 
HoodieAvroUtils.removeMetadataFields(dataFileSchema))
             } else {
               avroRec
             }
 
-            val hoodieKey = new HoodieKey(keyGenerator.getRecordKey(avroRec), 
keyGenerator.getPartitionPath(avroRec))
+            val processedRecord = if (shouldDropPartitionColumns) {
+              HoodieAvroUtils.rewriteRecord(avroRecWithoutMeta, dataFileSchema)
+            } else {
+              avroRecWithoutMeta
+            }
+
             val hoodieRecord = if (shouldCombine) {
               val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, 
config.getString(PRECOMBINE_FIELD),
                 false, 
consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
               DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, 
hoodieKey,
-                config.getString(PAYLOAD_CLASS_NAME))
+                config.getString(PAYLOAD_CLASS_NAME), 
recordLocation.getOrElse(null))
             } else {
-              DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
-                config.getString(PAYLOAD_CLASS_NAME))
+              // AKL_TODO: check if this change is needed.

Review Comment:
   fix the comments



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, 
isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val finalRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, 
dataFileStructType, false)
+            if (recordLocation.getOrElse(null) != null) {

Review Comment:
   lets use Option



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, 
isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val finalRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, 
dataFileStructType, false)
+            if (recordLocation.getOrElse(null) != null) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, 
avroRec: GenericRecord,

Review Comment:
   getHoodieKeyAndMayBeLocation



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java:
##########
@@ -84,7 +84,7 @@ public JavaRDD<HoodieRecord> generateInputRecords(String 
tableName, String sourc
                   
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));
           try {
             return DataSourceUtils.createHoodieRecord(gr, orderingVal, 
keyGenerator.getKey(gr),
-                props.getString("hoodie.datasource.write.payload.class"));
+                props.getString("hoodie.datasource.write.payload.class"), 
null);

Review Comment:
   why is this change ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -233,15 +238,25 @@ public static HoodieWriteResult 
doDeletePartitionsOperation(SparkRDDWriteClient
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable 
orderingVal, HoodieKey hKey,
-      String payloadClass) throws IOException {
+      String payloadClass, HoodieRecordLocation recordLocation) throws 
IOException {

Review Comment:
   Lets do Option<HoodieRecordLocation>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-    val dfWithoutMetaCols = 
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
+    val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false")

Review Comment:
   or we can rename the argument (L146) to unProcessedDf and so we can rename 
dfPrepped -> df and so rest of the code is untouched. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()

Review Comment:
   lets ensure we drop 6 meta fields across both avro and spark record type 
code paths



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, 
isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val finalRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, 
dataFileStructType, false)
+            if (recordLocation.getOrElse(null) != null) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, 
avroRec: GenericRecord,
+                                             isPrepped: Boolean): (HoodieKey, 
Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get("_hoodie_record_key").toString
+    } else {
+      keyGenerator.getRecordKey(avroRec)
+    };
+
+    val partitionPath = if (isPrepped) {
+      avroRec.get("_hoodie_partition_path").toString
+    } else {
+      keyGenerator.getPartitionPath(avroRec)
+    };
+
+    val hoodieKey = new HoodieKey(recordKey, partitionPath)
+    val instantTime: Option[String] = if (isPrepped) {
+      Option(avroRec.get("_hoodie_commit_time")).map(_.toString) }
+    else {
+      None
+    }
+    val fileName: Option[String] = if (isPrepped) {
+      Option(avroRec.get("_hoodie_file_name")).map(_.toString) }
+    else {
+      None
+    }
+    val recordLocation: Option[HoodieRecordLocation] = if 
(instantTime.isDefined && fileName.isDefined) {
+      val fileId = FSUtils.getFileId(fileName.get)
+      Some(new HoodieRecordLocation(instantTime.get, fileId))
+    } else {
+      None
+    }
+    (hoodieKey, recordLocation)
+  }
+
+  private def getKeyAndLocatorFromSparkRecord(sparkKeyGenerator: 
SparkKeyGeneratorInterface, sourceRow: InternalRow,
+                                              schema: StructType, isPrepped: 
Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    def getFieldIndex(fieldName: String): Int = {
+      if (schema.fieldNames.contains(fieldName)) {

Review Comment:
   HoodieRecord already has the name to position mapping 
   HOODIE_META_COLUMNS_NAME_TO_POS
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, 
isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val finalRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, 
dataFileStructType, false)
+            if (recordLocation.getOrElse(null) != null) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, 
avroRec: GenericRecord,
+                                             isPrepped: Boolean): (HoodieKey, 
Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get("_hoodie_record_key").toString
+    } else {
+      keyGenerator.getRecordKey(avroRec)
+    };
+
+    val partitionPath = if (isPrepped) {
+      avroRec.get("_hoodie_partition_path").toString
+    } else {
+      keyGenerator.getPartitionPath(avroRec)
+    };
+
+    val hoodieKey = new HoodieKey(recordKey, partitionPath)
+    val instantTime: Option[String] = if (isPrepped) {
+      Option(avroRec.get("_hoodie_commit_time")).map(_.toString) }
+    else {
+      None
+    }
+    val fileName: Option[String] = if (isPrepped) {
+      Option(avroRec.get("_hoodie_file_name")).map(_.toString) }
+    else {
+      None
+    }
+    val recordLocation: Option[HoodieRecordLocation] = if 
(instantTime.isDefined && fileName.isDefined) {
+      val fileId = FSUtils.getFileId(fileName.get)
+      Some(new HoodieRecordLocation(instantTime.get, fileId))
+    } else {
+      None
+    }
+    (hoodieKey, recordLocation)
+  }
+
+  private def getKeyAndLocatorFromSparkRecord(sparkKeyGenerator: 
SparkKeyGeneratorInterface, sourceRow: InternalRow,
+                                              schema: StructType, isPrepped: 
Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
+    def getFieldIndex(fieldName: String): Int = {
+      if (schema.fieldNames.contains(fieldName)) {
+        schema.fieldIndex(fieldName)
+      } else {
+        -1
+      }
+    }
+
+    val hoodieRecordKeyIndex = getFieldIndex("_hoodie_record_key")

Review Comment:
   lets re-use the const variable and avoid hard coding



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter {
           }
           val sparkKeyGenerator = 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
           val targetStructType = if (shouldDropPartitionColumns) 
dataFileStructType else writerStructType
+          val finalStructType = if (isPrepped) {
+            val fieldsToExclude = 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray()
+            StructType(targetStructType.fields.filterNot(field => 
fieldsToExclude.contains(field.name)))
+          } else {
+            targetStructType
+          }
           // NOTE: To make sure we properly transform records
-          val targetStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, targetStructType)
+          val finalStructTypeRowWriter = 
getCachedUnsafeRowWriter(sourceStructType, finalStructType)
 
           it.map { sourceRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, 
sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, 
sourceStructType)
-            val key = new HoodieKey(recordKey.toString, partitionPath.toString)
-            val targetRow = targetStructTypeRowWriter(sourceRow)
+            val (hoodieKey: HoodieKey, recordLocation: 
Option[HoodieRecordLocation]) = 
getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, 
isPrepped)
 
-            new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
+            val finalRow = finalStructTypeRowWriter(sourceRow)
+            var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, 
dataFileStructType, false)
+            if (recordLocation.getOrElse(null) != null) {
+              hoodieSparkRecord.setCurrentLocation(recordLocation.get)
+            }
+            hoodieSparkRecord
           }
         }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
     }
   }
+
+  private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, 
avroRec: GenericRecord,
+                                             isPrepped: Boolean): (HoodieKey, 
Option[HoodieRecordLocation]) = {
+    val recordKey = if (isPrepped) {
+      avroRec.get("_hoodie_record_key").toString

Review Comment:
   can we reference the static variables. 
   HoodieRecord.RECORD_KEY_METADATA_FIELD 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-    val dfWithoutMetaCols = 
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
+    val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false")
+      .equalsIgnoreCase("true")) {
+      df // Don't remove meta columns for prepped write.
+    } else {
+      df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)

Review Comment:
   do you know what happens if we drop a non existant column? 
   we have another optional meta field named operation(_hoodie_operation). 
   HOODIE_META_COLUMNS_WITH_OPERATION
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -55,8 +55,8 @@ trait ProvidesHoodieConfig extends Logging {
     // TODO(HUDI-3456) clean up
     val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("")
 
-    require(hoodieCatalogTable.primaryKeys.nonEmpty,
-      s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
+//    require(hoodieCatalogTable.primaryKeys.nonEmpty,

Review Comment:
   lets fix unintended changes or remove commented out code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to