This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47151f653d8 [HUDI-7487] Fixed test with in-memory index by proper heap 
clearing (#10910)
47151f653d8 is described below

commit 47151f653d85202ab5b28c0e770779f05b5a59f7
Author: Geser Dugarov <geserduga...@gmail.com>
AuthorDate: Sat Mar 23 07:45:09 2024 +0700

    [HUDI-7487] Fixed test with in-memory index by proper heap clearing (#10910)
---
 .../lock/TestInProcessLockProvider.java            | 16 +++++
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  | 78 ++++++++++++----------
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   | 48 +++++++------
 3 files changed, 82 insertions(+), 60 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java
 
b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java
index 60f74bb7996..6e7dcd7e3fa 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java
@@ -166,6 +166,9 @@ public class TestInProcessLockProvider {
     Assertions.assertTrue(writer3Completed.get());
     Assertions.assertEquals(lockProviderList.get(0).getLock(), 
lockProviderList.get(1).getLock());
     Assertions.assertEquals(lockProviderList.get(1).getLock(), 
lockProviderList.get(2).getLock());
+
+    writer2.interrupt();
+    writer3.interrupt();
   }
 
   @Test
@@ -254,6 +257,8 @@ public class TestInProcessLockProvider {
       //
     }
     Assertions.assertTrue(writer2Completed.get());
+
+    writer2.interrupt();
   }
 
   @Test
@@ -317,6 +322,9 @@ public class TestInProcessLockProvider {
     }
     Assertions.assertTrue(writer2Stream1Completed.get());
     Assertions.assertTrue(writer2Stream2Completed.get());
+
+    writer2Stream1.interrupt();
+    writer2Stream2.interrupt();
   }
 
   @Test
@@ -373,6 +381,8 @@ public class TestInProcessLockProvider {
     assertDoesNotThrow(() -> {
       inProcessLockProvider.unlock();
     });
+
+    writer2.interrupt();
   }
 
   @Test
@@ -414,6 +424,9 @@ public class TestInProcessLockProvider {
       // unlock by main thread should succeed.
       inProcessLockProvider.unlock();
     });
+
+    writer2.interrupt();
+    writer3.interrupt();
   }
 
   @Test
@@ -472,6 +485,9 @@ public class TestInProcessLockProvider {
     // Make sure both writers actually completed good
     Assertions.assertTrue(writer1Completed.get());
     Assertions.assertTrue(writer2Completed.get());
+
+    writer1.interrupt();
+    writer2.interrupt();
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 8ac8e766e56..9f23494ae79 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -18,16 +18,15 @@
 package org.apache.spark.sql.hudi.ddl
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, 
PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, 
SPARK_SQL_INSERT_INTO_OPERATION, TABLE_NAME}
-import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
RawTripTestPayload}
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
 import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, 
HoodieSparkUtils}
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, 
HoodieSparkUtils, QuickstartUtils}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -77,7 +76,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
-          spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
+          spark.sql("set " + 
DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
           spark.sql("set hoodie.schema.on.read.enable=true")
           // NOTE: This is required since as this tests use type coercions 
which were only permitted in Spark 2.x
           //       and are disallowed now by default in Spark 3.x
@@ -138,7 +137,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
           spark.sessionState.catalog.dropTable(TableIdentifier(tableName), 
true, true)
           spark.sessionState.catalog.refreshTable(TableIdentifier(tableName))
-          
spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+          
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
         }
       }
     })
@@ -244,7 +243,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
-          spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
+          spark.sql("set " + 
DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
           // NOTE: This is required since as this tests use type coercions 
which were only permitted in Spark 2.x
           //       and are disallowed now by default in Spark 3.x
           spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
@@ -337,7 +336,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           spark.sql(s"select id, col1_new, col2 from $tableName where id = 1 
or id = 6 or id = 2 or id = 11 order by id").show(false)
         }
       }
-      spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+      
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
     }
   }
 
@@ -348,7 +347,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
-          spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
+          spark.sql("set " + 
DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
           spark.sql(
             s"""
                |create table $tableName (
@@ -389,7 +388,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           )
         }
       }
-      spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key)
+      
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
     })
   }
 
@@ -546,7 +545,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
   test("Test alter column with complex schema") {
     withTempDir { tmp =>
-      withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert",
+      withSQLConf(s"${DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION}" 
-> "upsert",
         "hoodie.schema.on.read.enable" -> "true",
         "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
         val tableName = generateTableName
@@ -713,36 +712,36 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tableName = generateTableName
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
-          val dataGen = new DataGenerator
-          val inserts = convertToStringList(dataGen.generateInserts(10))
+          val dataGen = new QuickstartUtils.DataGenerator
+          val inserts = 
QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
           val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
           df.write.format("hudi").
-            options(getQuickstartWriteConfigs).
+            options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
-            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath").
             option("hoodie.schema.on.read.enable","true").
-            option(TABLE_NAME.key(), tableName).
+            option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
             option("hoodie.table.name", tableName).
             mode("overwrite").
             save(tablePath)
 
-          val updates = convertToStringList(dataGen.generateUpdates(10))
+          val updates = 
QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
           // type change: fare (double -> String)
           // add new column and drop a column
           val dfUpdate = 
spark.read.json(spark.sparkContext.parallelize(updates, 2))
             .withColumn("fare", expr("cast(fare as string)"))
             .withColumn("addColumn", lit("new"))
           dfUpdate.drop("begin_lat").write.format("hudi").
-            options(getQuickstartWriteConfigs).
+            options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
-            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath").
             option("hoodie.schema.on.read.enable","true").
             option("hoodie.datasource.write.reconcile.schema","true").
-            option(TABLE_NAME.key(), tableName).
+            option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
             option("hoodie.table.name", tableName).
             mode("append").
             save(tablePath)
@@ -760,35 +759,35 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"select * from  hudi_trips_snapshot").show(false)
           //  test insert_over_write  + update again
-          val overwrite = convertToStringList(dataGen.generateInserts(10))
+          val overwrite = 
QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
           val dfOverWrite = spark.
             read.json(spark.sparkContext.parallelize(overwrite, 2)).
             filter("partitionpath = 'americas/united_states/san_francisco'")
             .withColumn("fare", expr("cast(fare as string)")) // fare now in 
table is string type, we forbid convert string to double.
           dfOverWrite.write.format("hudi").
-            options(getQuickstartWriteConfigs).
+            options(QuickstartUtils.getQuickstartWriteConfigs).
             option("hoodie.datasource.write.operation","insert_overwrite").
-            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath").
             option("hoodie.schema.on.read.enable","true").
             option("hoodie.datasource.write.reconcile.schema","true").
-            option(TABLE_NAME.key(), tableName).
+            option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
             option("hoodie.table.name", tableName).
             mode("append").
             save(tablePath)
           spark.read.format("hudi").load(tablePath).show(false)
 
-          val updatesAgain = convertToStringList(dataGen.generateUpdates(10))
+          val updatesAgain = 
QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
           val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 
2)).withColumn("fare", expr("cast(fare as string)"))
           dfAgain.write.format("hudi").
-            options(getQuickstartWriteConfigs).
-            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
-            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
-            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            options(QuickstartUtils.getQuickstartWriteConfigs).
+            option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath").
             option("hoodie.schema.on.read.enable","true").
             option("hoodie.datasource.write.reconcile.schema","true").
-            option(TABLE_NAME.key(), tableName).
+            option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
             option("hoodie.table.name", tableName).
             mode("append").
             save(tablePath)
@@ -882,6 +881,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
           // Not checking answer as this is an unsafe casting operation, just 
need to make sure that error is not thrown
           spark.sql(s"select id, name, cast(price as string), ts from 
$tableName")
+
+          // clear after using INMEMORY index
+          HoodieInMemoryHashIndex.clear()
         }
       }
     }
@@ -947,6 +949,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             Seq(11, "a11", "-10.04", 1000),
             Seq(12, "a12", "-10.04", 1000)
           )
+
+          // clear after using INMEMORY index
+          HoodieInMemoryHashIndex.clear()
         }
       }
     }
@@ -1012,6 +1017,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             Seq(11, "a11", "-10.04", 1000),
             Seq(12, "a12", "-10.04", 1000)
           )
+
+          // clear after using INMEMORY index
+          HoodieInMemoryHashIndex.clear()
         }
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 439dde99aee..e4151bd7e95 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -18,11 +18,11 @@
 package org.apache.spark.sql.hudi.dml
 
 import org.apache.avro.Schema
-import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES
+import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.avro.HoodieAvroUtils
-import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
-import 
org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT
+import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.function.SerializableFunctionUnchecked
@@ -31,8 +31,8 @@ import org.apache.hudi.common.table.log.HoodieLogFileReader
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
 import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf, 
getLogFileListFromFileSlice}
-import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -71,9 +71,9 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
-      spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-      spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-      spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+      spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+      spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+      spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
 
       // Create a table with five data fields
       spark.sql(
@@ -115,16 +115,15 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     }
   }
 
-  /* HUDI-7487: disabled due to flakiness
   test("Test MERGE INTO with inserts only on MOR table when partial updates 
are enabled") {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
-      spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-      spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-      spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+      spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+      spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+      spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
       // Write inserts to log block
-      spark.sql(s"set ${INDEX_TYPE.key} = INMEMORY")
+      spark.sql(s"set ${HoodieIndexConfig.INDEX_TYPE.key} = INMEMORY")
 
       // Create a table with five data fields
       spark.sql(
@@ -171,17 +170,16 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
         false)
     }
   }
-  */
 
   def testPartialUpdate(tableType: String,
                         logDataBlockFormat: String): Unit = {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
-      spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-      spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-      spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
-      spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+      spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+      spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+      spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
+      spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
 
       // Create a table with five data fields
       spark.sql(
@@ -281,10 +279,10 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
-      spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
-      spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
-      spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
-      spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+      spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+      spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+      spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
+      spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
 
       // Create a table with five data fields
       spark.sql(
@@ -381,7 +379,7 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
                        expectedNumLogFile: Int,
                        changedFields: Seq[Seq[String]],
                        isPartial: Boolean): Unit = {
-    val hadoopConf = getDefaultHadoopConf
+    val hadoopConf = HoodieTestUtils.getDefaultHadoopConf
     val metaClient: HoodieTableMetaClient =
       
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
     val metadataConfig = HoodieMetadataConfig.newBuilder.build
@@ -400,12 +398,12 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("")
       .filter(new Predicate[FileSlice] {
         override def test(fileSlice: FileSlice): Boolean = {
-          getLogFileListFromFileSlice(fileSlice).size() == expectedNumLogFile
+          HoodieTestUtils.getLogFileListFromFileSlice(fileSlice).size() == 
expectedNumLogFile
         }
       })
       .findFirst()
     assertTrue(fileSlice.isPresent)
-    val logFilePathList: List[String] = 
getLogFileListFromFileSlice(fileSlice.get)
+    val logFilePathList: List[String] = 
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
     Collections.sort(logFilePathList)
 
     val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema

Reply via email to