This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c9bd80cc8a [HUDI-6315] Feature flag for disabling optimized 
update/delete code path. (#9131)
2c9bd80cc8a is described below

commit 2c9bd80cc8a56a3e50e9bb71d6737a62924a3886
Author: Amrish Lal <amrish.k....@gmail.com>
AuthorDate: Wed Jul 12 22:32:14 2023 -0700

    [HUDI-6315] Feature flag for disabling optimized update/delete code path. 
(#9131)
    
    * Feature flag for to guard optimized update/delete code path.
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   7 +
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   1 +
 .../hudi/command/DeleteHoodieTableCommand.scala    |  22 ++-
 .../hudi/command/UpdateHoodieTableCommand.scala    |  21 ++-
 .../hudi-spark/src/test/java/HoodieJavaApp.java    |   2 +
 .../apache/spark/sql/hudi/TestDeleteTable.scala    | 155 +++++++++++----------
 .../apache/spark/sql/hudi/TestUpdateTable.scala    | 133 ++++++++++--------
 7 files changed, 203 insertions(+), 138 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index a8ed29307d6..6a6a1faf413 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -608,6 +608,13 @@ object DataSourceWriteOptions {
 
   val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = 
HoodieTableConfig.DROP_PARTITION_COLUMNS
 
+  val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
+    .key("hoodie.spark.sql.writes.optimized.enable")
+    .defaultValue("true")
+    .markAdvanced()
+    .sinceVersion("0.14.0")
+    .withDocumentation("Controls whether spark sql optimized update is 
enabled.")
+
   /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods 
instead */
   @Deprecated
   val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = 
HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 405e761635a..cf8d85f704e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -82,6 +82,7 @@ object HoodieWriterUtils {
     hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
     hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
     
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
+    hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES)
     Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ 
DataSourceOptionsHelper.translateConfigurations(parameters)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
index d9ed523def4..d10b3d529f5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.DataSourceWriteOptions.DATASOURCE_WRITE_PREPPED_KEY
+import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, 
ENABLE_OPTIMIZED_SQL_WRITES}
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
@@ -39,13 +39,27 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) 
extends HoodieLeafRunn
     logInfo(s"Executing 'DELETE FROM' command for $tableId")
 
     val condition = sparkAdapter.extractDeleteCondition(dft)
+
+    val targetLogicalPlan = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
+      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+      dft.table
+    } else {
+      stripMetaFieldAttributes(dft.table)
+    }
+
     val filteredPlan = if (condition != null) {
-      Filter(condition, dft.table)
+      Filter(condition, targetLogicalPlan)
     } else {
-      dft.table
+      targetLogicalPlan
+    }
+
+    val config = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
+      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+      buildHoodieDeleteTableConfig(catalogTable, sparkSession) + 
(DATASOURCE_WRITE_PREPPED_KEY -> "true")
+    } else {
+      buildHoodieDeleteTableConfig(catalogTable, sparkSession)
     }
 
-    val config = buildHoodieDeleteTableConfig(catalogTable, sparkSession) + 
(DATASOURCE_WRITE_PREPPED_KEY -> "true")
     val df = Dataset.ofRows(sparkSession, filteredPlan)
 
     df.write.format("hudi")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 18e2599d909..7d6d5f39bb1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.DataSourceWriteOptions.DATASOURCE_WRITE_PREPPED_KEY
+import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, 
ENABLE_OPTIMIZED_SQL_WRITES}
 import org.apache.hudi.SparkAdapterSupport
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
 import org.apache.spark.sql._
@@ -44,7 +44,14 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends 
HoodieLeafRunnableC
       case Assignment(attr: AttributeReference, value) => attr -> value
     }
 
-    val targetExprs = ut.table.output.map { targetAttr =>
+    val filteredOutput = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
+      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+      ut.table.output
+    } else {
+      removeMetaFields(ut.table.output)
+    }
+
+    val targetExprs = filteredOutput.map { targetAttr =>
       // NOTE: [[UpdateTable]] permits partial updates and therefore here we 
correlate assigned
       //       assigned attributes to the ones of the target table. Ones not 
being assigned
       //       will simply be carried over (from the old record)
@@ -56,8 +63,14 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends 
HoodieLeafRunnableC
     val condition = ut.condition.getOrElse(TrueLiteral)
     val filteredPlan = Filter(condition, Project(targetExprs, ut.table))
 
-    // Set config to show that this is a prepped write.
-    val config = buildHoodieConfig(catalogTable) + 
(DATASOURCE_WRITE_PREPPED_KEY -> "true")
+    val config = if 
(sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
+      , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
+      // Set config to show that this is a prepped write.
+      buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY -> 
"true")
+    } else {
+      buildHoodieConfig(catalogTable)
+    }
+
     val df = Dataset.ofRows(sparkSession, filteredPlan)
 
     df.write.format("hudi")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index aed507bc700..448215a9b0a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -160,6 +160,8 @@ public class HoodieJavaApp {
         .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
+        .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
+        .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(), 
"true")
         // This will remove any existing data at path below, and create a
         .mode(SaveMode.Overwrite);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index 4fa9f1083d0..6233884a63e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -26,46 +26,52 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
 
   test("Test Delete Table") {
     withTempDir { tmp =>
-      Seq("cow", "mor").foreach {tableType =>
-        val tableName = generateTableName
-        // create table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}/$tableName'
-             | tblproperties (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+      Seq(true, false).foreach { optimizedSqlEnabled =>
+        Seq("cow", "mor").foreach { tableType =>
+          val tableName = generateTableName
+          // create table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '${tmp.getCanonicalPath}/$tableName'
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
        """.stripMargin)
-        // insert data to table
-        spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
 
-        // delete data from table
-        spark.sql(s"delete from $tableName where id = 1")
-        checkAnswer(s"select count(1) from $tableName") (
-          Seq(0)
-        )
+          // test with optimized sql writes enabled / disabled.
+          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
 
-        spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
-        spark.sql(s"delete from $tableName where id = 1")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(2, "a2", 10.0, 1000)
-        )
+          // insert data to table
+          spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 10.0, 1000)
+          )
 
-        spark.sql(s"delete from $tableName")
-        checkAnswer(s"select count(1) from $tableName")(
-          Seq(0)
-        )
+          // delete data from table
+          spark.sql(s"delete from $tableName where id = 1")
+          checkAnswer(s"select count(1) from $tableName")(
+            Seq(0)
+          )
+
+          spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
+          spark.sql(s"delete from $tableName where id = 1")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(2, "a2", 10.0, 1000)
+          )
+
+          spark.sql(s"delete from $tableName")
+          checkAnswer(s"select count(1) from $tableName")(
+            Seq(0)
+          )
+        }
       }
     }
   }
@@ -88,7 +94,11 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-     """.stripMargin)
+   """.stripMargin)
+
+        // test with optimized sql writes enabled.
+        spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+
         // insert data to table
         spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
         checkAnswer(s"select id, name, price, ts from $tableName")(
@@ -269,41 +279,46 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
 
   Seq(false, true).foreach { urlencode =>
     test(s"Test Delete single-partition table' partitions, urlencode: 
$urlencode") {
-      withTempDir { tmp =>
-        val tableName = generateTableName
-        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-
-        import spark.implicits._
-        val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", 
"2021/10/02"))
-          .toDF("id", "name", "ts", "dt")
-
-        df.write.format("hudi")
-          .option(HoodieWriteConfig.TBL_NAME.key, tableName)
-          .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
-          .option(RECORDKEY_FIELD.key, "id")
-          .option(PRECOMBINE_FIELD.key, "ts")
-          .option(PARTITIONPATH_FIELD.key, "dt")
-          .option(URL_ENCODE_PARTITIONING.key(), urlencode)
-          .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
-          .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
-          .mode(SaveMode.Overwrite)
-          .save(tablePath)
-
-        // register meta to spark catalog by creating table
-        spark.sql(
-          s"""
-             |create table $tableName using hudi
-             |location '$tablePath'
-             |""".stripMargin)
+      Seq(true, false).foreach { optimizedSqlEnabled =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
 
-        // delete 2021-10-01 partition
-        if (urlencode) {
-          spark.sql(s"""delete from $tableName where dt="2021/10/01"""")
-        } else {
-          spark.sql(s"delete from $tableName where dt='2021/10/01'")
-        }
+          import spark.implicits._
+          val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", 
"2021/10/02"))
+            .toDF("id", "name", "ts", "dt")
+
+          df.write.format("hudi")
+            .option(HoodieWriteConfig.TBL_NAME.key, tableName)
+            .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
+            .option(RECORDKEY_FIELD.key, "id")
+            .option(PRECOMBINE_FIELD.key, "ts")
+            .option(PARTITIONPATH_FIELD.key, "dt")
+            .option(URL_ENCODE_PARTITIONING.key(), urlencode)
+            .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
+            .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
+            .mode(SaveMode.Overwrite)
+            .save(tablePath)
 
-        checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
+          // register meta to spark catalog by creating table
+          spark.sql(
+            s"""
+               |create table $tableName using hudi
+               |location '$tablePath'
+               |""".stripMargin)
+
+          // test with optimized sql writes enabled / disabled.
+          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
+
+          // delete 2021-10-01 partition
+          if (urlencode) {
+            spark.sql(s"""delete from $tableName where dt="2021/10/01"""")
+          } else {
+            spark.sql(s"delete from $tableName where dt='2021/10/01'")
+          }
+
+          checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
+        }
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 4245f2f7aaf..23d192ba099 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -23,42 +23,47 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
 
   test("Test Update Table") {
     withRecordType()(withTempDir { tmp =>
-      Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        // create table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}/$tableName'
-             | tblproperties (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
-       """.stripMargin)
+      Seq(true, false).foreach { optimizedSqlEnabled =>
+        Seq("cow", "mor").foreach { tableType =>
+          val tableName = generateTableName
+          // create table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '${tmp.getCanonicalPath}/$tableName'
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+         """.stripMargin)
 
-        // insert data to table
-        spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+          // insert data to table
+          spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 10.0, 1000)
+          )
 
-        // update data
-        spark.sql(s"update $tableName set price = 20 where id = 1")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 20.0, 1000)
-        )
+          // test with optimized sql writes enabled / disabled.
+          spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
 
-        // update data
-        spark.sql(s"update $tableName set price = price * 2 where id = 1")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 40.0, 1000)
-        )
+          // update data
+          spark.sql(s"update $tableName set price = 20 where id = 1")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 20.0, 1000)
+          )
+
+          // update data
+          spark.sql(s"update $tableName set price = price * 2 where id = 1")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 40.0, 1000)
+          )
+        }
       }
     })
   }
@@ -81,7 +86,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
-     """.stripMargin)
+   """.stripMargin)
 
         // insert data to table
         spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
@@ -89,6 +94,9 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
           Seq(1, "a1", 10.0, 1000)
         )
 
+        // test with optimized sql writes enabled.
+        spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
+
         // update data
         spark.sql(s"update $tableName set price = 20 where id = 1")
         checkAnswer(s"select id, name, price, ts from $tableName")(
@@ -248,35 +256,40 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
 
   test("Test decimal type") {
     withTempDir { tmp =>
-      val tableName = generateTableName
-      // create table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long,
-           |  ff decimal(38, 10)
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
-           | tblproperties (
-           |  type = 'mor',
-           |  primaryKey = 'id',
-           |  preCombineField = 'ts'
-           | )
+      Seq(true, false).foreach { optimizedSqlEnabled =>
+        val tableName = generateTableName
+        // create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  ff decimal(38, 10)
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
      """.stripMargin)
 
-      // insert data to table
-      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0")
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "a1", 10.0, 1000)
-      )
+        // insert data to table
+        spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000, 10.0")
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000)
+        )
+
+        // test with optimized sql writes enabled / disabled.
+        spark.sql(s"set 
hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
 
-      spark.sql(s"update $tableName set price = 22 where id = 1")
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "a1", 22.0, 1000)
-      )
+        spark.sql(s"update $tableName set price = 22 where id = 1")
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 22.0, 1000)
+        )
+      }
     }
   }
 }

Reply via email to