This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 62a9279d666 [MINOR] Pass prepped boolean correctly in sql writer (#9320) 62a9279d666 is described below commit 62a9279d666646fd7abe1872857ea2f94fdedd46 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Thu Aug 3 08:22:59 2023 +0530 [MINOR] Pass prepped boolean correctly in sql writer (#9320) --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +-- .../sql/hudi/command/MergeIntoHoodieTableCommand.scala | 16 ++++++++-------- .../hudi/TestMergeIntoTableWithNonRecordKeyField.scala | 3 --- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fcee3fdab49..07b16e1e47d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -404,8 +404,7 @@ object HoodieSparkSqlWriter { hoodieRecords } client.startCommitWithTime(instantTime, commitActionType) - val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation, - isPrepped) + val writeResult = DataSourceUtils.doWriteOperation(client, dedupedHoodieRecords, instantTime, operation, isPrepped) (writeResult, client) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index eba75c95452..f830c552bc8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -24,8 +24,8 @@ import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.util.StringUtils -import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME} import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig @@ -342,7 +342,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val tableMetaCols = mergeInto.targetTable.output.filter(a => isMetaField(a.name)) val joinData = sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE") val incomingDataCols = joinData.output.filterNot(mergeInto.targetTable.outputSet.contains) - val projectedJoinPlan = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { + // for pkless table, we need to project the meta columns + val hasPrimaryKey = hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent + val projectedJoinPlan = if (!hasPrimaryKey || sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), "false") == "true") { Project(tableMetaCols ++ incomingDataCols, joinData) } else { Project(incomingDataCols, joinData) @@ -619,12 +621,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // default value ("ts") // TODO(HUDI-3456) clean up val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") - val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - - val enableOptimizedMerge = sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), - SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) - + // for pkless tables, we need to enable optimized merge + val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent + val enableOptimizedMerge = if (!hasPrimaryKey) "true" else sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), "false") val keyGeneratorClassName = if (enableOptimizedMerge == "true") { classOf[MergeIntoKeyGenerator].getCanonicalName } else { @@ -653,7 +653,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, RECORD_MERGER_IMPLS.key -> classOf[HoodieAvroRecordMerger].getName, - // NOTE: We have to explicitly override following configs to make sure no schema validation is performed + // NOTE: We have to explicitly override following configs to make sure no schema validation is performed // as schema of the incoming dataset might be diverging from the table's schema (full schemas' // compatibility b/w table's schema and incoming one is not necessary in this case since we can // be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala index dd1d00580dc..48964b37323 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala @@ -172,7 +172,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit | (6, 'a6', 60, 100) |""".stripMargin) - // First merge with a extra input field 'flag' (insert a new record) spark.sql( s""" | merge into $tableName @@ -219,7 +218,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit for (withPrecombine <- Seq(true, false)) { withRecordType()(withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") - spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") val tableName = generateTableName val prekstr = if (withPrecombine) "tblproperties (preCombineField = 'ts')" else "" @@ -242,7 +240,6 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit | (1, 'a1', 10, 100) |""".stripMargin) - // First merge with a extra input field 'flag' (insert a new record) spark.sql( s""" | merge into $tableName