nsivabalan commented on code in PR #8879: URL: https://github.com/apache/hudi/pull/8879#discussion_r1222275056
########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java: ########## @@ -233,15 +238,25 @@ public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass) throws IOException { + String payloadClass, HoodieRecordLocation recordLocation) throws IOException { HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); - return new HoodieAvroRecord<>(hKey, payload); + + HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload); + if (recordLocation != null) { + record.setCurrentLocation(recordLocation); + } + return record; } + // AKL_TODO: check if this change is needed. Also validate change if needed. public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey, - String payloadClass) throws IOException { + String payloadClass, HoodieRecordLocation recordLocation) throws IOException { Review Comment: same here ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala: ########## @@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) + val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false") Review Comment: dfPrepped -> processedDf ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1160,21 +1171,29 @@ object HoodieSparkSqlWriter { // handle dropping partition columns it.map { avroRec => - val processedRecord = if (shouldDropPartitionColumns) { - HoodieAvroUtils.rewriteRecord(avroRec, dataFileSchema) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromAvroRecord(keyGenerator, avroRec, + isPrepped) + + val avroRecWithoutMeta: GenericRecord = if (isPrepped) { + HoodieAvroUtils.rewriteRecord(avroRec, HoodieAvroUtils.removeMetadataFields(dataFileSchema)) } else { avroRec } - val hoodieKey = new HoodieKey(keyGenerator.getRecordKey(avroRec), keyGenerator.getPartitionPath(avroRec)) + val processedRecord = if (shouldDropPartitionColumns) { + HoodieAvroUtils.rewriteRecord(avroRecWithoutMeta, dataFileSchema) + } else { + avroRecWithoutMeta + } + val hoodieRecord = if (shouldCombine) { val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, config.getString(PRECOMBINE_FIELD), false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey, - config.getString(PAYLOAD_CLASS_NAME)) + config.getString(PAYLOAD_CLASS_NAME), recordLocation.getOrElse(null)) } else { - DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey, - config.getString(PAYLOAD_CLASS_NAME)) + // AKL_TODO: check if this change is needed. Review Comment: fix the comments ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() + StructType(targetStructType.fields.filterNot(field => fieldsToExclude.contains(field.name))) + } else { + targetStructType + } // NOTE: To make sure we properly transform records - val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + val finalStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, finalStructType) it.map { sourceRow => - val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - val targetRow = targetStructTypeRowWriter(sourceRow) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped) - new HoodieSparkRecord(key, targetRow, dataFileStructType, false) + val finalRow = finalStructTypeRowWriter(sourceRow) + var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, dataFileStructType, false) + if (recordLocation.getOrElse(null) != null) { Review Comment: lets use Option ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() + StructType(targetStructType.fields.filterNot(field => fieldsToExclude.contains(field.name))) + } else { + targetStructType + } // NOTE: To make sure we properly transform records - val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + val finalStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, finalStructType) it.map { sourceRow => - val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - val targetRow = targetStructTypeRowWriter(sourceRow) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped) - new HoodieSparkRecord(key, targetRow, dataFileStructType, false) + val finalRow = finalStructTypeRowWriter(sourceRow) + var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, dataFileStructType, false) + if (recordLocation.getOrElse(null) != null) { + hoodieSparkRecord.setCurrentLocation(recordLocation.get) + } + hoodieSparkRecord } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } } + + private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, avroRec: GenericRecord, Review Comment: getHoodieKeyAndMayBeLocation ########## hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java: ########## @@ -84,7 +84,7 @@ public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourc Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))); try { return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - props.getString("hoodie.datasource.write.payload.class")); + props.getString("hoodie.datasource.write.payload.class"), null); Review Comment: why is this change ? ########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java: ########## @@ -233,15 +238,25 @@ public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass) throws IOException { + String payloadClass, HoodieRecordLocation recordLocation) throws IOException { Review Comment: Lets do Option<HoodieRecordLocation> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala: ########## @@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) + val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false") Review Comment: or we can rename the argument (L146) to unProcessedDf and so we can rename dfPrepped -> df and so rest of the code is untouched. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() Review Comment: lets ensure we drop 6 meta fields across both avro and spark record type code paths ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() + StructType(targetStructType.fields.filterNot(field => fieldsToExclude.contains(field.name))) + } else { + targetStructType + } // NOTE: To make sure we properly transform records - val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + val finalStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, finalStructType) it.map { sourceRow => - val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - val targetRow = targetStructTypeRowWriter(sourceRow) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped) - new HoodieSparkRecord(key, targetRow, dataFileStructType, false) + val finalRow = finalStructTypeRowWriter(sourceRow) + var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, dataFileStructType, false) + if (recordLocation.getOrElse(null) != null) { + hoodieSparkRecord.setCurrentLocation(recordLocation.get) + } + hoodieSparkRecord } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } } + + private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, avroRec: GenericRecord, + isPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + val recordKey = if (isPrepped) { + avroRec.get("_hoodie_record_key").toString + } else { + keyGenerator.getRecordKey(avroRec) + }; + + val partitionPath = if (isPrepped) { + avroRec.get("_hoodie_partition_path").toString + } else { + keyGenerator.getPartitionPath(avroRec) + }; + + val hoodieKey = new HoodieKey(recordKey, partitionPath) + val instantTime: Option[String] = if (isPrepped) { + Option(avroRec.get("_hoodie_commit_time")).map(_.toString) } + else { + None + } + val fileName: Option[String] = if (isPrepped) { + Option(avroRec.get("_hoodie_file_name")).map(_.toString) } + else { + None + } + val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) { + val fileId = FSUtils.getFileId(fileName.get) + Some(new HoodieRecordLocation(instantTime.get, fileId)) + } else { + None + } + (hoodieKey, recordLocation) + } + + private def getKeyAndLocatorFromSparkRecord(sparkKeyGenerator: SparkKeyGeneratorInterface, sourceRow: InternalRow, + schema: StructType, isPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + def getFieldIndex(fieldName: String): Int = { + if (schema.fieldNames.contains(fieldName)) { Review Comment: HoodieRecord already has the name to position mapping HOODIE_META_COLUMNS_NAME_TO_POS ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() + StructType(targetStructType.fields.filterNot(field => fieldsToExclude.contains(field.name))) + } else { + targetStructType + } // NOTE: To make sure we properly transform records - val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + val finalStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, finalStructType) it.map { sourceRow => - val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - val targetRow = targetStructTypeRowWriter(sourceRow) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped) - new HoodieSparkRecord(key, targetRow, dataFileStructType, false) + val finalRow = finalStructTypeRowWriter(sourceRow) + var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, dataFileStructType, false) + if (recordLocation.getOrElse(null) != null) { + hoodieSparkRecord.setCurrentLocation(recordLocation.get) + } + hoodieSparkRecord } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } } + + private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, avroRec: GenericRecord, + isPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + val recordKey = if (isPrepped) { + avroRec.get("_hoodie_record_key").toString + } else { + keyGenerator.getRecordKey(avroRec) + }; + + val partitionPath = if (isPrepped) { + avroRec.get("_hoodie_partition_path").toString + } else { + keyGenerator.getPartitionPath(avroRec) + }; + + val hoodieKey = new HoodieKey(recordKey, partitionPath) + val instantTime: Option[String] = if (isPrepped) { + Option(avroRec.get("_hoodie_commit_time")).map(_.toString) } + else { + None + } + val fileName: Option[String] = if (isPrepped) { + Option(avroRec.get("_hoodie_file_name")).map(_.toString) } + else { + None + } + val recordLocation: Option[HoodieRecordLocation] = if (instantTime.isDefined && fileName.isDefined) { + val fileId = FSUtils.getFileId(fileName.get) + Some(new HoodieRecordLocation(instantTime.get, fileId)) + } else { + None + } + (hoodieKey, recordLocation) + } + + private def getKeyAndLocatorFromSparkRecord(sparkKeyGenerator: SparkKeyGeneratorInterface, sourceRow: InternalRow, + schema: StructType, isPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + def getFieldIndex(fieldName: String): Int = { + if (schema.fieldNames.contains(fieldName)) { + schema.fieldIndex(fieldName) + } else { + -1 + } + } + + val hoodieRecordKeyIndex = getFieldIndex("_hoodie_record_key") Review Comment: lets re-use the const variable and avoid hard coding ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -1195,18 +1214,108 @@ object HoodieSparkSqlWriter { } val sparkKeyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface] val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType + val finalStructType = if (isPrepped) { + val fieldsToExclude = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.toArray() + StructType(targetStructType.fields.filterNot(field => fieldsToExclude.contains(field.name))) + } else { + targetStructType + } // NOTE: To make sure we properly transform records - val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType) + val finalStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, finalStructType) it.map { sourceRow => - val recordKey = sparkKeyGenerator.getRecordKey(sourceRow, sourceStructType) - val partitionPath = sparkKeyGenerator.getPartitionPath(sourceRow, sourceStructType) - val key = new HoodieKey(recordKey.toString, partitionPath.toString) - val targetRow = targetStructTypeRowWriter(sourceRow) + val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = getKeyAndLocatorFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped) - new HoodieSparkRecord(key, targetRow, dataFileStructType, false) + val finalRow = finalStructTypeRowWriter(sourceRow) + var hoodieSparkRecord = new HoodieSparkRecord(hoodieKey, finalRow, dataFileStructType, false) + if (recordLocation.getOrElse(null) != null) { + hoodieSparkRecord.setCurrentLocation(recordLocation.get) + } + hoodieSparkRecord } }.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]] } } + + private def getKeyAndLocatorFromAvroRecord(keyGenerator: BaseKeyGenerator, avroRec: GenericRecord, + isPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + val recordKey = if (isPrepped) { + avroRec.get("_hoodie_record_key").toString Review Comment: can we reference the static variables. HoodieRecord.RECORD_KEY_METADATA_FIELD ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala: ########## @@ -144,20 +144,25 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) + val dfPrepped = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false") + .equalsIgnoreCase("true")) { + df // Don't remove meta columns for prepped write. + } else { + df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*) Review Comment: do you know what happens if we drop a non existant column? we have another optional meta field named operation(_hoodie_operation). HOODIE_META_COLUMNS_WITH_OPERATION ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala: ########## @@ -55,8 +55,8 @@ trait ProvidesHoodieConfig extends Logging { // TODO(HUDI-3456) clean up val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("") - require(hoodieCatalogTable.primaryKeys.nonEmpty, - s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") +// require(hoodieCatalogTable.primaryKeys.nonEmpty, Review Comment: lets fix unintended changes or remove commented out code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org