This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit be3a7004cf8c46595b49291b2b643848eb29424c Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Aug 8 17:13:38 2023 -0500 [HUDI-6587] Check incomplete commit for time travel query (#9280) --- .../org/apache/hudi/BaseHoodieTableFileIndex.java | 5 + .../hudi/common/table/timeline/TimelineUtils.java | 30 +++- .../hudi/exception/HoodieTimeTravelException.java | 29 ++++ .../hudi/hadoop/HoodieROTablePathFilter.java | 14 +- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 5 +- .../hudi/functional/TestTimeTravelQuery.scala | 182 +++++++++++---------- 6 files changed, 173 insertions(+), 92 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 3a24ef4dd2f..7ba20795790 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -61,6 +61,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; +import static org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf; import static org.apache.hudi.common.util.CollectionUtils.combine; import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe; @@ -243,6 +244,10 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { return Collections.emptyMap(); } + if (specifiedQueryInstant.isPresent() && !shouldIncludePendingCommits) { + validateTimestampAsOf(metaClient, specifiedQueryInstant.get()); + } + FileStatus[] allFiles = listPartitionPathFiles(partitions); HoodieTimeline activeTimeline = getActiveTimeline(); Option<HoodieInstant> latestInstant = activeTimeline.lastInstant(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 14a03ce60ef..a763f4d9053 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieTimeTravelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +48,11 @@ import static org.apache.hudi.common.config.HoodieCommonConfig.INCREMENTAL_READ_ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** * TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table. @@ -244,8 +247,8 @@ public class TimelineUtils { if (lastMaxCompletionTime.isPresent()) { // Get 'hollow' instants that have less instant time than exclusiveStartInstantTime but with greater commit completion time HoodieDefaultTimeline hollowInstantsTimeline = (HoodieDefaultTimeline) timeline.getCommitsTimeline() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, exclusiveStartInstantTime)) - .filter(s -> HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, lastMaxCompletionTime.get())); + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, exclusiveStartInstantTime)) + .filter(s -> compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, lastMaxCompletionTime.get())); if (!hollowInstantsTimeline.empty()) { return timelineSinceLastSync.mergeTimeline(hollowInstantsTimeline); } @@ -315,6 +318,29 @@ public class TimelineUtils { } } + /** + * Validate user-specified timestamp of time travel query against incomplete commit's timestamp. + * + * @throws HoodieException when time travel query's timestamp >= incomplete commit's timestamp + */ + public static void validateTimestampAsOf(HoodieTableMetaClient metaClient, String timestampAsOf) { + Option<HoodieInstant> firstIncompleteCommit = metaClient.getCommitsTimeline() + .filterInflightsAndRequested() + .filter(instant -> + !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) + || !ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent()) + .firstInstant(); + + if (firstIncompleteCommit.isPresent()) { + String incompleteCommitTime = firstIncompleteCommit.get().getTimestamp(); + if (compareTimestamps(timestampAsOf, GREATER_THAN_OR_EQUALS, incompleteCommitTime)) { + throw new HoodieTimeTravelException(String.format( + "Time travel's timestamp '%s' must be earlier than the first incomplete commit timestamp '%s'.", + timestampAsOf, incompleteCommitTime)); + } + } + } + /** * Handles hollow commit as per {@link HoodieCommonConfig#INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT} * and return filtered or non-filtered timeline for incremental query to run against. diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java new file mode 100644 index 00000000000..c0f703fc95a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieTimeTravelException extends HoodieException { + public HoodieTimeTravelException(String msg) { + super(msg); + } + + public HoodieTimeTravelException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index b38cea1ffe6..5e89ed804a8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; +import static org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf; +import static org.apache.hudi.common.util.StringUtils.nonEmpty; /** * Given a path is a part of - Hoodie table = accepts ONLY the latest version of each path - Non-Hoodie table = then @@ -185,16 +187,20 @@ public class HoodieROTablePathFilter implements Configurable, PathFilter, Serial metaClientCache.put(baseDir.toString(), metaClient); } - if (getConf().get(TIMESTAMP_AS_OF.key()) != null) { + final Configuration conf = getConf(); + final String timestampAsOf = conf.get(TIMESTAMP_AS_OF.key()); + if (nonEmpty(timestampAsOf)) { + validateTimestampAsOf(metaClient, timestampAsOf); + // Build FileSystemViewManager with specified time, it's necessary to set this config when you may // access old version files. For example, in spark side, using "hoodie.datasource.read.paths" // which contains old version files, if not specify this value, these files will be filtered. fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, - metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), - metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key()))); + metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf), + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(timestampAsOf)); } else { fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, - metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); + metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf)); } String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index fea7781f84d..0f7eb27fd04 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -32,7 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, Seri import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} -import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils} +import org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, validateTimestampAsOf, handleHollowCommitIfNeeded} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils.isNullOrEmpty @@ -413,6 +414,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { queryTimestamp match { case Some(ts) => + specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, t)) + val partitionDirs = if (globPaths.isEmpty) { fileIndex.listFiles(partitionFilters, dataFilters) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 66f905abc47..cdb94907158 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -17,23 +17,27 @@ package org.apache.hudi.functional -import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.testutils.HoodieTestTable import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieTimeTravelException import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.spark.sql.SaveMode.{Append, Overwrite} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource +import org.scalatest.Assertions.assertThrows import java.text.SimpleDateFormat class TestTimeTravelQuery extends HoodieSparkClientTestBase { - var spark: SparkSession =_ + var spark: SparkSession = _ val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", @@ -44,7 +48,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) - @BeforeEach override def setUp() { + @BeforeEach override def setUp(): Unit = { setTableName("hoodie_test") initPath() initSparkContexts() @@ -53,7 +57,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { initFileSystem() } - @AfterEach override def tearDown() = { + @AfterEach override def tearDown(): Unit = { cleanupSparkContexts() cleanupTestDataGenerator() cleanupFileSystem() @@ -66,38 +70,22 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { val _spark = spark import _spark.implicits._ + val opts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" + ) + // First write val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") - df1.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .mode(SaveMode.Overwrite) - .save(basePath) - - val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val firstCommit = writeBatch(df1, opts, Overwrite) // Second write val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version") - df2.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val secondCommit = writeBatch(df2, opts) // Third write val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version") - df3.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val thirdCommit = writeBatch(df3, opts) // Query as of firstCommitTime val result1 = spark.read.format("hudi") @@ -124,6 +112,59 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { assertEquals(Row(1, "a1", 13, 1002), result3) } + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQueryWithIncompleteCommit(tableType: HoodieTableType): Unit = { + initMetaClient(tableType) + val _spark = spark + import _spark.implicits._ + + val opts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "" + ) + + // First write + val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") + val firstCommit = writeBatch(df1, opts, Overwrite) + + // Second write + val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version") + val secondCommit = writeBatch(df2, opts) + + // Third write + val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version") + val thirdCommit = writeBatch(df3, opts) + + // add an incomplete commit btw 1st and 2nd commit + // it'll be 1 ms after 1st commit, which won't clash with 2nd commit timestamp + val incompleteCommit = (firstCommit.toLong + 1).toString + tableType match { + case COPY_ON_WRITE => HoodieTestTable.of(metaClient).addInflightCommit(incompleteCommit) + case MERGE_ON_READ => HoodieTestTable.of(metaClient).addInflightDeltaCommit(incompleteCommit) + } + + // Query as of firstCommitTime + val result1 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 10, 1000), result1) + + // Query as of other commits + List(incompleteCommit, secondCommit, thirdCommit) + .foreach(commitTime => { + assertThrows[HoodieTimeTravelException] { + spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commitTime) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + } + }) + } + @ParameterizedTest @EnumSource(value = classOf[HoodieTableType]) def testTimeTravelQueryForPartitionedTable(tableType: HoodieTableType): Unit = { @@ -131,44 +172,24 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { val _spark = spark import _spark.implicits._ + val opts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + // First write val df1 = Seq((1, "a1", 10, 1000, "2021-07-26")).toDF("id", "name", "value", "version", "dt") - df1.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") - .option(PARTITIONPATH_FIELD.key, "dt") - .mode(SaveMode.Overwrite) - .save(basePath) - - val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val firstCommit = writeBatch(df1, opts, Overwrite) // Second write val df2 = Seq((1, "a1", 12, 1001, "2021-07-26")).toDF("id", "name", "value", "version", "dt") - df2.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") - .option(PARTITIONPATH_FIELD.key, "dt") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val secondCommit = writeBatch(df2, opts) // Third write val df3 = Seq((1, "a1", 13, 1002, "2021-07-26")).toDF("id", "name", "value", "version", "dt") - df3.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") - .option(PARTITIONPATH_FIELD.key, "dt") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val thirdCommit = writeBatch(df3, opts) // query as of firstCommitTime (using 'yyyy-MM-dd HH:mm:ss' format) val result1 = spark.read.format("hudi") @@ -204,6 +225,12 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { assertTrue(result4.isEmpty) } + private def writeBatch(df: DataFrame, options: Map[String, String], mode: SaveMode = Append): String = { + df.write.format("hudi").options(options).mode(mode).save(basePath) + metaClient.reloadActiveTimeline() + metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + } + private def defaultDateTimeFormat(queryInstant: String): String = { val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") @@ -223,42 +250,27 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { val _spark = spark import _spark.implicits._ - // First write - val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") - df1.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "name") - .mode(SaveMode.Overwrite) - .save(basePath) - metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf) .build() - val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + val opts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name" + ) + + // First write + val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") + val firstCommit = writeBatch(df1, opts, Overwrite) // Second write val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", "version", "year") - df2.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "name") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val secondCommit = writeBatch(df2, opts) // Third write val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", "value", "version", "year", "month") - df3.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "name") - .mode(SaveMode.Append) - .save(basePath) - metaClient.reloadActiveTimeline() - val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + val thirdCommit = writeBatch(df3, opts) val tableSchemaResolver = new TableSchemaResolver(metaClient)