This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2c9bd80cc8a [HUDI-6315] Feature flag for disabling optimized update/delete code path. (#9131) 2c9bd80cc8a is described below commit 2c9bd80cc8a56a3e50e9bb71d6737a62924a3886 Author: Amrish Lal <amrish.k....@gmail.com> AuthorDate: Wed Jul 12 22:32:14 2023 -0700 [HUDI-6315] Feature flag for disabling optimized update/delete code path. (#9131) * Feature flag for to guard optimized update/delete code path. --- .../scala/org/apache/hudi/DataSourceOptions.scala | 7 + .../scala/org/apache/hudi/HoodieWriterUtils.scala | 1 + .../hudi/command/DeleteHoodieTableCommand.scala | 22 ++- .../hudi/command/UpdateHoodieTableCommand.scala | 21 ++- .../hudi-spark/src/test/java/HoodieJavaApp.java | 2 + .../apache/spark/sql/hudi/TestDeleteTable.scala | 155 +++++++++++---------- .../apache/spark/sql/hudi/TestUpdateTable.scala | 133 ++++++++++-------- 7 files changed, 203 insertions(+), 138 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index a8ed29307d6..6a6a1faf413 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -608,6 +608,13 @@ object DataSourceWriteOptions { val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS + val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty + .key("hoodie.spark.sql.writes.optimized.enable") + .defaultValue("true") + .markAdvanced() + .sinceVersion("0.14.0") + .withDocumentation("Controls whether spark sql optimized update is enabled.") + /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */ @Deprecated val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 405e761635a..cf8d85f704e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -82,6 +82,7 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) + hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES) Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index d9ed523def4..d10b3d529f5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.DATASOURCE_WRITE_PREPPED_KEY +import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, ENABLE_OPTIMIZED_SQL_WRITES} import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -39,13 +39,27 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn logInfo(s"Executing 'DELETE FROM' command for $tableId") val condition = sparkAdapter.extractDeleteCondition(dft) + + val targetLogicalPlan = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() + , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + dft.table + } else { + stripMetaFieldAttributes(dft.table) + } + val filteredPlan = if (condition != null) { - Filter(condition, dft.table) + Filter(condition, targetLogicalPlan) } else { - dft.table + targetLogicalPlan + } + + val config = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() + , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") + } else { + buildHoodieDeleteTableConfig(catalogTable, sparkSession) } - val config = buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") val df = Dataset.ofRows(sparkSession, filteredPlan) df.write.format("hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 18e2599d909..7d6d5f39bb1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.DATASOURCE_WRITE_PREPPED_KEY +import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, ENABLE_OPTIMIZED_SQL_WRITES} import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals import org.apache.spark.sql._ @@ -44,7 +44,14 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC case Assignment(attr: AttributeReference, value) => attr -> value } - val targetExprs = ut.table.output.map { targetAttr => + val filteredOutput = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() + , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + ut.table.output + } else { + removeMetaFields(ut.table.output) + } + + val targetExprs = filteredOutput.map { targetAttr => // NOTE: [[UpdateTable]] permits partial updates and therefore here we correlate assigned // assigned attributes to the ones of the target table. Ones not being assigned // will simply be carried over (from the old record) @@ -56,8 +63,14 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC val condition = ut.condition.getOrElse(TrueLiteral) val filteredPlan = Filter(condition, Project(targetExprs, ut.table)) - // Set config to show that this is a prepped write. - val config = buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") + val config = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() + , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + // Set config to show that this is a prepped write. + buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") + } else { + buildHoodieConfig(catalogTable) + } + val df = Dataset.ofRows(sparkSession, filteredPlan) df.write.format("hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index aed507bc700..448215a9b0a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -160,6 +160,8 @@ public class HoodieJavaApp { .option(HoodieWriteConfig.TBL_NAME.key(), tableName) .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") + .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") + .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(), "true") // This will remove any existing data at path below, and create a .mode(SaveMode.Overwrite); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 4fa9f1083d0..6233884a63e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -26,46 +26,52 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { test("Test Delete Table") { withTempDir { tmp => - Seq("cow", "mor").foreach {tableType => - val tableName = generateTableName - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName' - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts' - | ) + Seq(true, false).foreach { optimizedSqlEnabled => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) """.stripMargin) - // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) - // delete data from table - spark.sql(s"delete from $tableName where id = 1") - checkAnswer(s"select count(1) from $tableName") ( - Seq(0) - ) + // test with optimized sql writes enabled / disabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") - spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000") - spark.sql(s"delete from $tableName where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(2, "a2", 10.0, 1000) - ) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) - spark.sql(s"delete from $tableName") - checkAnswer(s"select count(1) from $tableName")( - Seq(0) - ) + // delete data from table + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select count(1) from $tableName")( + Seq(0) + ) + + spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000") + spark.sql(s"delete from $tableName where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(2, "a2", 10.0, 1000) + ) + + spark.sql(s"delete from $tableName") + checkAnswer(s"select count(1) from $tableName")( + Seq(0) + ) + } } } } @@ -88,7 +94,11 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { | type = '$tableType', | preCombineField = 'ts' | ) - """.stripMargin) + """.stripMargin) + + // test with optimized sql writes enabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true") + // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") checkAnswer(s"select id, name, price, ts from $tableName")( @@ -269,41 +279,46 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { Seq(false, true).foreach { urlencode => test(s"Test Delete single-partition table' partitions, urlencode: $urlencode") { - withTempDir { tmp => - val tableName = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$tableName" - - import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) - .toDF("id", "name", "ts", "dt") - - df.write.format("hudi") - .option(HoodieWriteConfig.TBL_NAME.key, tableName) - .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "ts") - .option(PARTITIONPATH_FIELD.key, "dt") - .option(URL_ENCODE_PARTITIONING.key(), urlencode) - .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") - .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") - .mode(SaveMode.Overwrite) - .save(tablePath) - - // register meta to spark catalog by creating table - spark.sql( - s""" - |create table $tableName using hudi - |location '$tablePath' - |""".stripMargin) + Seq(true, false).foreach { optimizedSqlEnabled => + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" - // delete 2021-10-01 partition - if (urlencode) { - spark.sql(s"""delete from $tableName where dt="2021/10/01"""") - } else { - spark.sql(s"delete from $tableName where dt='2021/10/01'") - } + import spark.implicits._ + val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) + .toDF("id", "name", "ts", "dt") + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Overwrite) + .save(tablePath) - checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + // register meta to spark catalog by creating table + spark.sql( + s""" + |create table $tableName using hudi + |location '$tablePath' + |""".stripMargin) + + // test with optimized sql writes enabled / disabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") + + // delete 2021-10-01 partition + if (urlencode) { + spark.sql(s"""delete from $tableName where dt="2021/10/01"""") + } else { + spark.sql(s"delete from $tableName where dt='2021/10/01'") + } + + checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 4245f2f7aaf..23d192ba099 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -23,42 +23,47 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test Update Table") { withRecordType()(withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName' - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id', - | preCombineField = 'ts' - | ) - """.stripMargin) + Seq(true, false).foreach { optimizedSqlEnabled => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) - // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) - // update data - spark.sql(s"update $tableName set price = 20 where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 20.0, 1000) - ) + // test with optimized sql writes enabled / disabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") - // update data - spark.sql(s"update $tableName set price = price * 2 where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 40.0, 1000) - ) + // update data + spark.sql(s"update $tableName set price = 20 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 20.0, 1000) + ) + + // update data + spark.sql(s"update $tableName set price = price * 2 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 40.0, 1000) + ) + } } }) } @@ -81,7 +86,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { | type = '$tableType', | preCombineField = 'ts' | ) - """.stripMargin) + """.stripMargin) // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") @@ -89,6 +94,9 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { Seq(1, "a1", 10.0, 1000) ) + // test with optimized sql writes enabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true") + // update data spark.sql(s"update $tableName set price = 20 where id = 1") checkAnswer(s"select id, name, price, ts from $tableName")( @@ -248,35 +256,40 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test decimal type") { withTempDir { tmp => - val tableName = generateTableName - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | ff decimal(38, 10) - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName' - | tblproperties ( - | type = 'mor', - | primaryKey = 'id', - | preCombineField = 'ts' - | ) + Seq(true, false).foreach { optimizedSqlEnabled => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | ff decimal(38, 10) + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) """.stripMargin) - // insert data to table - spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // test with optimized sql writes enabled / disabled. + spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") - spark.sql(s"update $tableName set price = 22 where id = 1") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 22.0, 1000) - ) + spark.sql(s"update $tableName set price = 22 where id = 1") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 22.0, 1000) + ) + } } } }