This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 47151f653d8 [HUDI-7487] Fixed test with in-memory index by proper heap clearing (#10910) 47151f653d8 is described below commit 47151f653d85202ab5b28c0e770779f05b5a59f7 Author: Geser Dugarov <geserduga...@gmail.com> AuthorDate: Sat Mar 23 07:45:09 2024 +0700 [HUDI-7487] Fixed test with in-memory index by proper heap clearing (#10910) --- .../lock/TestInProcessLockProvider.java | 16 +++++ .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 78 ++++++++++++---------- .../hudi/dml/TestPartialUpdateForMergeInto.scala | 48 +++++++------ 3 files changed, 82 insertions(+), 60 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java index 60f74bb7996..6e7dcd7e3fa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java +++ b/hudi-common/src/test/java/org/apache/hudi/client/transaction/lock/TestInProcessLockProvider.java @@ -166,6 +166,9 @@ public class TestInProcessLockProvider { Assertions.assertTrue(writer3Completed.get()); Assertions.assertEquals(lockProviderList.get(0).getLock(), lockProviderList.get(1).getLock()); Assertions.assertEquals(lockProviderList.get(1).getLock(), lockProviderList.get(2).getLock()); + + writer2.interrupt(); + writer3.interrupt(); } @Test @@ -254,6 +257,8 @@ public class TestInProcessLockProvider { // } Assertions.assertTrue(writer2Completed.get()); + + writer2.interrupt(); } @Test @@ -317,6 +322,9 @@ public class TestInProcessLockProvider { } Assertions.assertTrue(writer2Stream1Completed.get()); Assertions.assertTrue(writer2Stream2Completed.get()); + + writer2Stream1.interrupt(); + writer2Stream2.interrupt(); } @Test @@ -373,6 +381,8 @@ public class TestInProcessLockProvider { assertDoesNotThrow(() -> { inProcessLockProvider.unlock(); }); + + writer2.interrupt(); } @Test @@ -414,6 +424,9 @@ public class TestInProcessLockProvider { // unlock by main thread should succeed. inProcessLockProvider.unlock(); }); + + writer2.interrupt(); + writer3.interrupt(); } @Test @@ -472,6 +485,9 @@ public class TestInProcessLockProvider { // Make sure both writers actually completed good Assertions.assertTrue(writer1Completed.get()); Assertions.assertTrue(writer2Completed.get()); + + writer1.interrupt(); + writer2.interrupt(); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index 8ac8e766e56..9f23494ae79 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.hudi.ddl import org.apache.hadoop.fs.Path -import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, SPARK_SQL_INSERT_INTO_OPERATION, TABLE_NAME} -import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, RawTripTestPayload} import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, HoodieSparkUtils} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, HoodieSparkUtils, QuickstartUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -77,7 +76,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { - spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + spark.sql("set " + DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") spark.sql("set hoodie.schema.on.read.enable=true") // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x // and are disallowed now by default in Spark 3.x @@ -138,7 +137,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { ) spark.sessionState.catalog.dropTable(TableIdentifier(tableName), true, true) spark.sessionState.catalog.refreshTable(TableIdentifier(tableName)) - spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key) } } }) @@ -244,7 +243,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + spark.sql("set " + DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x // and are disallowed now by default in Spark 3.x spark.sql("set spark.sql.storeAssignmentPolicy=legacy") @@ -337,7 +336,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"select id, col1_new, col2 from $tableName where id = 1 or id = 6 or id = 2 or id = 11 order by id").show(false) } } - spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key) } } @@ -348,7 +347,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") - spark.sql("set " + SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") + spark.sql("set " + DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert") spark.sql( s""" |create table $tableName ( @@ -389,7 +388,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { ) } } - spark.sessionState.conf.unsetConf(SPARK_SQL_INSERT_INTO_OPERATION.key) + spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key) }) } @@ -546,7 +545,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { test("Test alter column with complex schema") { withTempDir { tmp => - withSQLConf(s"$SPARK_SQL_INSERT_INTO_OPERATION" -> "upsert", + withSQLConf(s"${DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION}" -> "upsert", "hoodie.schema.on.read.enable" -> "true", "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") { val tableName = generateTableName @@ -713,36 +712,36 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tableName = generateTableName val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { - val dataGen = new DataGenerator - val inserts = convertToStringList(dataGen.generateInserts(10)) + val dataGen = new QuickstartUtils.DataGenerator + val inserts = QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). - options(getQuickstartWriteConfigs). + options(QuickstartUtils.getQuickstartWriteConfigs). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). - option(TABLE_NAME.key(), tableName). + option(DataSourceWriteOptions.TABLE_NAME.key(), tableName). option("hoodie.table.name", tableName). mode("overwrite"). save(tablePath) - val updates = convertToStringList(dataGen.generateUpdates(10)) + val updates = QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) // type change: fare (double -> String) // add new column and drop a column val dfUpdate = spark.read.json(spark.sparkContext.parallelize(updates, 2)) .withColumn("fare", expr("cast(fare as string)")) .withColumn("addColumn", lit("new")) dfUpdate.drop("begin_lat").write.format("hudi"). - options(getQuickstartWriteConfigs). + options(QuickstartUtils.getQuickstartWriteConfigs). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). - option(TABLE_NAME.key(), tableName). + option(DataSourceWriteOptions.TABLE_NAME.key(), tableName). option("hoodie.table.name", tableName). mode("append"). save(tablePath) @@ -760,35 +759,35 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"select * from hudi_trips_snapshot").show(false) // test insert_over_write + update again - val overwrite = convertToStringList(dataGen.generateInserts(10)) + val overwrite = QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) val dfOverWrite = spark. read.json(spark.sparkContext.parallelize(overwrite, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") .withColumn("fare", expr("cast(fare as string)")) // fare now in table is string type, we forbid convert string to double. dfOverWrite.write.format("hudi"). - options(getQuickstartWriteConfigs). + options(QuickstartUtils.getQuickstartWriteConfigs). option("hoodie.datasource.write.operation","insert_overwrite"). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). - option(TABLE_NAME.key(), tableName). + option(DataSourceWriteOptions.TABLE_NAME.key(), tableName). option("hoodie.table.name", tableName). mode("append"). save(tablePath) spark.read.format("hudi").load(tablePath).show(false) - val updatesAgain = convertToStringList(dataGen.generateUpdates(10)) + val updatesAgain = QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) val dfAgain = spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).withColumn("fare", expr("cast(fare as string)")) dfAgain.write.format("hudi"). - options(getQuickstartWriteConfigs). - option(PRECOMBINE_FIELD_OPT_KEY, "ts"). - option(RECORDKEY_FIELD_OPT_KEY, "uuid"). - option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + options(QuickstartUtils.getQuickstartWriteConfigs). + option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option("hoodie.schema.on.read.enable","true"). option("hoodie.datasource.write.reconcile.schema","true"). - option(TABLE_NAME.key(), tableName). + option(DataSourceWriteOptions.TABLE_NAME.key(), tableName). option("hoodie.table.name", tableName). mode("append"). save(tablePath) @@ -882,6 +881,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { // Not checking answer as this is an unsafe casting operation, just need to make sure that error is not thrown spark.sql(s"select id, name, cast(price as string), ts from $tableName") + + // clear after using INMEMORY index + HoodieInMemoryHashIndex.clear() } } } @@ -947,6 +949,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq(11, "a11", "-10.04", 1000), Seq(12, "a12", "-10.04", 1000) ) + + // clear after using INMEMORY index + HoodieInMemoryHashIndex.clear() } } } @@ -1012,6 +1017,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq(11, "a11", "-10.04", 1000), Seq(12, "a12", "-10.04", 1000) ) + + // clear after using INMEMORY index + HoodieInMemoryHashIndex.clear() } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index 439dde99aee..e4151bd7e95 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.hudi.dml import org.apache.avro.Schema -import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES +import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED -import org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT +import org.apache.hudi.common.config.HoodieReaderConfig +import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.function.SerializableFunctionUnchecked @@ -31,8 +31,8 @@ import org.apache.hudi.common.table.log.HoodieLogFileReader import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType import org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewStorageConfig, SyncableFileSystemView} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf, getLogFileListFromFileSlice} -import org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT +import org.apache.hudi.common.testutils.HoodieTestUtils +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} @@ -71,9 +71,9 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName - spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") - spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") - spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true") + spark.sql(s"set ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") + spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") + spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") // Create a table with five data fields spark.sql( @@ -115,16 +115,15 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { } } - /* HUDI-7487: disabled due to flakiness test("Test MERGE INTO with inserts only on MOR table when partial updates are enabled") { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName - spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") - spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") - spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true") + spark.sql(s"set ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") + spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") + spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") // Write inserts to log block - spark.sql(s"set ${INDEX_TYPE.key} = INMEMORY") + spark.sql(s"set ${HoodieIndexConfig.INDEX_TYPE.key} = INMEMORY") // Create a table with five data fields spark.sql( @@ -171,17 +170,16 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { false) } } - */ def testPartialUpdate(tableType: String, logDataBlockFormat: String): Unit = { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName - spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") - spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") - spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") - spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true") + spark.sql(s"set ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") + spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") + spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") + spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") // Create a table with five data fields spark.sql( @@ -281,10 +279,10 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { withTempDir { tmp => val tableName = generateTableName val basePath = tmp.getCanonicalPath + "/" + tableName - spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") - spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") - spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") - spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true") + spark.sql(s"set ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0") + spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true") + spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat") + spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = true") // Create a table with five data fields spark.sql( @@ -381,7 +379,7 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { expectedNumLogFile: Int, changedFields: Seq[Seq[String]], isPartial: Boolean): Unit = { - val hadoopConf = getDefaultHadoopConf + val hadoopConf = HoodieTestUtils.getDefaultHadoopConf val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build val metadataConfig = HoodieMetadataConfig.newBuilder.build @@ -400,12 +398,12 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { val fileSlice: Optional[FileSlice] = fsView.getAllFileSlices("") .filter(new Predicate[FileSlice] { override def test(fileSlice: FileSlice): Boolean = { - getLogFileListFromFileSlice(fileSlice).size() == expectedNumLogFile + HoodieTestUtils.getLogFileListFromFileSlice(fileSlice).size() == expectedNumLogFile } }) .findFirst() assertTrue(fileSlice.isPresent) - val logFilePathList: List[String] = getLogFileListFromFileSlice(fileSlice.get) + val logFilePathList: List[String] = HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get) Collections.sort(logFilePathList) val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema