This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit be3a7004cf8c46595b49291b2b643848eb29424c
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Aug 8 17:13:38 2023 -0500

    [HUDI-6587] Check incomplete commit for time travel query (#9280)
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |   5 +
 .../hudi/common/table/timeline/TimelineUtils.java  |  30 +++-
 .../hudi/exception/HoodieTimeTravelException.java  |  29 ++++
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  14 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   5 +-
 .../hudi/functional/TestTimeTravelQuery.scala      | 182 +++++++++++----------
 6 files changed, 173 insertions(+), 92 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 3a24ef4dd2f..7ba20795790 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
 import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
+import static 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
 import static org.apache.hudi.common.util.CollectionUtils.combine;
 import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
 
@@ -243,6 +244,10 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
       return Collections.emptyMap();
     }
 
+    if (specifiedQueryInstant.isPresent() && !shouldIncludePendingCommits) {
+      validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
+    }
+
     FileStatus[] allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 14a03ce60ef..a763f4d9053 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieTimeTravelException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,9 +48,11 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.INCREMENTAL_READ_
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * TimelineUtils provides a common way to query incremental meta-data changes 
for a hoodie table.
@@ -244,8 +247,8 @@ public class TimelineUtils {
     if (lastMaxCompletionTime.isPresent()) {
       // Get 'hollow' instants that have less instant time than 
exclusiveStartInstantTime but with greater commit completion time
       HoodieDefaultTimeline hollowInstantsTimeline = (HoodieDefaultTimeline) 
timeline.getCommitsTimeline()
-          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, exclusiveStartInstantTime))
-          .filter(s -> 
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, 
lastMaxCompletionTime.get()));
+          .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
exclusiveStartInstantTime))
+          .filter(s -> compareTimestamps(s.getStateTransitionTime(), 
GREATER_THAN, lastMaxCompletionTime.get()));
       if (!hollowInstantsTimeline.empty()) {
         return timelineSinceLastSync.mergeTimeline(hollowInstantsTimeline);
       }
@@ -315,6 +318,29 @@ public class TimelineUtils {
     }
   }
 
+  /**
+   * Validate user-specified timestamp of time travel query against incomplete 
commit's timestamp.
+   *
+   * @throws HoodieException when time travel query's timestamp >= incomplete 
commit's timestamp
+   */
+  public static void validateTimestampAsOf(HoodieTableMetaClient metaClient, 
String timestampAsOf) {
+    Option<HoodieInstant> firstIncompleteCommit = 
metaClient.getCommitsTimeline()
+        .filterInflightsAndRequested()
+        .filter(instant ->
+            !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+                || !ClusteringUtils.getClusteringPlan(metaClient, 
instant).isPresent())
+        .firstInstant();
+
+    if (firstIncompleteCommit.isPresent()) {
+      String incompleteCommitTime = firstIncompleteCommit.get().getTimestamp();
+      if (compareTimestamps(timestampAsOf, GREATER_THAN_OR_EQUALS, 
incompleteCommitTime)) {
+        throw new HoodieTimeTravelException(String.format(
+            "Time travel's timestamp '%s' must be earlier than the first 
incomplete commit timestamp '%s'.",
+            timestampAsOf, incompleteCommitTime));
+      }
+    }
+  }
+
   /**
    * Handles hollow commit as per {@link 
HoodieCommonConfig#INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT}
    * and return filtered or non-filtered timeline for incremental query to run 
against.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
new file mode 100644
index 00000000000..c0f703fc95a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieTimeTravelException extends HoodieException {
+  public HoodieTimeTravelException(String msg) {
+    super(msg);
+  }
+
+  public HoodieTimeTravelException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index b38cea1ffe6..5e89ed804a8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
+import static 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 
 /**
  * Given a path is a part of - Hoodie table = accepts ONLY the latest version 
of each path - Non-Hoodie table = then
@@ -185,16 +187,20 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
-          if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
+          final Configuration conf = getConf();
+          final String timestampAsOf = conf.get(TIMESTAMP_AS_OF.key());
+          if (nonEmpty(timestampAsOf)) {
+            validateTimestampAsOf(metaClient, timestampAsOf);
+
             // Build FileSystemViewManager with specified time, it's necessary 
to set this config when you may
             // access old version files. For example, in spark side, using 
"hoodie.datasource.read.paths"
             // which contains old version files, if not specify this value, 
these files will be filtered.
             fsView = 
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
-                metaClient, 
HoodieInputFormatUtils.buildMetadataConfig(getConf()),
-                
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
+                metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf),
+                
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(timestampAsOf));
           } else {
             fsView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
-                metaClient, 
HoodieInputFormatUtils.buildMetadataConfig(getConf()));
+                metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
           }
           String partition = FSUtils.getRelativePartitionPath(new 
Path(metaClient.getBasePath()), folder);
           List<HoodieBaseFile> latestFiles = 
fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index fea7781f84d..0f7eb27fd04 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -32,7 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty, 
HoodieMetadataConfig, Seri
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, 
validateTimestampAsOf, handleHollowCommitIfNeeded}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -413,6 +414,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: 
Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
     queryTimestamp match {
       case Some(ts) =>
+        specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, 
t))
+
         val partitionDirs = if (globPaths.isEmpty) {
           fileIndex.listFiles(partitionFilters, dataFilters)
         } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index 66f905abc47..cdb94907158 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -17,23 +17,27 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.testutils.HoodieTestTable
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieTimeTravelException
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.SaveMode.{Append, Overwrite}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
+import org.scalatest.Assertions.assertThrows
 
 import java.text.SimpleDateFormat
 
 class TestTimeTravelQuery extends HoodieSparkClientTestBase {
-  var spark: SparkSession =_
+  var spark: SparkSession = _
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
     "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -44,7 +48,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase {
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
   )
 
-  @BeforeEach override def setUp() {
+  @BeforeEach override def setUp(): Unit = {
     setTableName("hoodie_test")
     initPath()
     initSparkContexts()
@@ -53,7 +57,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase {
     initFileSystem()
   }
 
-  @AfterEach override def tearDown() = {
+  @AfterEach override def tearDown(): Unit = {
     cleanupSparkContexts()
     cleanupTestDataGenerator()
     cleanupFileSystem()
@@ -66,38 +70,22 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase 
{
     val _spark = spark
     import _spark.implicits._
 
+    val opts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> ""
+    )
+
     // First write
     val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
-    df1.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val firstCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val firstCommit = writeBatch(df1, opts, Overwrite)
 
     // Second write
     val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version")
-    df2.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val secondCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val secondCommit = writeBatch(df2, opts)
 
     // Third write
     val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
-    df3.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val thirdCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val thirdCommit = writeBatch(df3, opts)
 
     // Query as of firstCommitTime
     val result1 = spark.read.format("hudi")
@@ -124,6 +112,59 @@ class TestTimeTravelQuery extends 
HoodieSparkClientTestBase {
     assertEquals(Row(1, "a1", 13, 1002), result3)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testTimeTravelQueryWithIncompleteCommit(tableType: HoodieTableType): 
Unit = {
+    initMetaClient(tableType)
+    val _spark = spark
+    import _spark.implicits._
+
+    val opts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> ""
+    )
+
+    // First write
+    val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
+    val firstCommit = writeBatch(df1, opts, Overwrite)
+
+    // Second write
+    val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version")
+    val secondCommit = writeBatch(df2, opts)
+
+    // Third write
+    val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
+    val thirdCommit = writeBatch(df3, opts)
+
+    // add an incomplete commit btw 1st and 2nd commit
+    // it'll be 1 ms after 1st commit, which won't clash with 2nd commit 
timestamp
+    val incompleteCommit = (firstCommit.toLong + 1).toString
+    tableType match {
+      case COPY_ON_WRITE => 
HoodieTestTable.of(metaClient).addInflightCommit(incompleteCommit)
+      case MERGE_ON_READ => 
HoodieTestTable.of(metaClient).addInflightDeltaCommit(incompleteCommit)
+    }
+
+    // Query as of firstCommitTime
+    val result1 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
+      .load(basePath)
+      .select("id", "name", "value", "version")
+      .take(1)(0)
+    assertEquals(Row(1, "a1", 10, 1000), result1)
+
+    // Query as of other commits
+    List(incompleteCommit, secondCommit, thirdCommit)
+      .foreach(commitTime => {
+        assertThrows[HoodieTimeTravelException] {
+          spark.read.format("hudi")
+            .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, 
commitTime)
+            .load(basePath)
+            .select("id", "name", "value", "version")
+            .take(1)(0)
+        }
+      })
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieTableType])
   def testTimeTravelQueryForPartitionedTable(tableType: HoodieTableType): Unit 
= {
@@ -131,44 +172,24 @@ class TestTimeTravelQuery extends 
HoodieSparkClientTestBase {
     val _spark = spark
     import _spark.implicits._
 
+    val opts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+      DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+    )
+
     // First write
     val df1 = Seq((1, "a1", 10, 1000, "2021-07-26")).toDF("id", "name", 
"value", "version", "dt")
-    df1.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(RECORDKEY_FIELD.key, "id")
-      .option(PRECOMBINE_FIELD.key, "version")
-      .option(PARTITIONPATH_FIELD.key, "dt")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    val firstCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val firstCommit = writeBatch(df1, opts, Overwrite)
 
     // Second write
     val df2 = Seq((1, "a1", 12, 1001, "2021-07-26")).toDF("id", "name", 
"value", "version", "dt")
-    df2.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(RECORDKEY_FIELD.key, "id")
-      .option(PRECOMBINE_FIELD.key, "version")
-      .option(PARTITIONPATH_FIELD.key, "dt")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val secondCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val secondCommit = writeBatch(df2, opts)
 
     // Third write
     val df3 = Seq((1, "a1", 13, 1002, "2021-07-26")).toDF("id", "name", 
"value", "version", "dt")
-    df3.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(RECORDKEY_FIELD.key, "id")
-      .option(PRECOMBINE_FIELD.key, "version")
-      .option(PARTITIONPATH_FIELD.key, "dt")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val thirdCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val thirdCommit = writeBatch(df3, opts)
 
     // query as of firstCommitTime (using 'yyyy-MM-dd HH:mm:ss' format)
     val result1 = spark.read.format("hudi")
@@ -204,6 +225,12 @@ class TestTimeTravelQuery extends 
HoodieSparkClientTestBase {
     assertTrue(result4.isEmpty)
   }
 
+  private def writeBatch(df: DataFrame, options: Map[String, String], mode: 
SaveMode = Append): String = {
+    df.write.format("hudi").options(options).mode(mode).save(basePath)
+    metaClient.reloadActiveTimeline()
+    
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+  }
+
   private def defaultDateTimeFormat(queryInstant: String): String = {
     val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant)
     val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
@@ -223,42 +250,27 @@ class TestTimeTravelQuery extends 
HoodieSparkClientTestBase {
     val _spark = spark
     import _spark.implicits._
 
-    // First write
-    val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
-    df1.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "name")
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
     metaClient = HoodieTableMetaClient.builder()
       .setBasePath(basePath)
       .setConf(spark.sessionState.newHadoopConf)
       .build()
-    val firstCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+    val opts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name"
+    )
+
+    // First write
+    val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
+    val firstCommit = writeBatch(df1, opts, Overwrite)
 
     // Second write
     val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", 
"version", "year")
-    df2.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "name")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val secondCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val secondCommit = writeBatch(df2, opts)
 
     // Third write
     val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", 
"value", "version", "year", "month")
-    df3.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
-      .option(PARTITIONPATH_FIELD.key, "name")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    metaClient.reloadActiveTimeline()
-    val thirdCommit = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+    val thirdCommit = writeBatch(df3, opts)
 
     val tableSchemaResolver = new TableSchemaResolver(metaClient)
 

Reply via email to