This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cddb237be14 fix(streamer): Use checkpoint V1 for non-incremental 
streamer sources (#18896)
9cddb237be14 is described below

commit 9cddb237be14242be4ba096413ab057942af79aa
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jun 4 14:42:57 2026 -0700

    fix(streamer): Use checkpoint V1 for non-incremental streamer sources 
(#18896)
---
 .../common/table/checkpoint/CheckpointUtils.java   |  32 +-
 .../table/checkpoint/TestCheckpointUtils.java      |  66 +---
 .../hudi/examples/common/RandomJsonSource.java     |   4 +-
 .../hudi/examples/common/TestRandomJsonSource.java |  76 +++++
 .../helpers/DFSTestSuitePathSelector.java          |   7 +-
 .../sql/hudi/streaming/HoodieStreamSourceV2.scala  |   2 +-
 .../hudi/utilities/sources/GcsEventsSource.java    |  12 +-
 .../hudi/utilities/sources/HiveIncrPullSource.java |  11 +-
 .../hudi/utilities/sources/HoodieIncrSource.java   |   4 +-
 .../apache/hudi/utilities/sources/InputBatch.java  |   9 -
 .../apache/hudi/utilities/sources/JdbcSource.java  |   8 +-
 .../apache/hudi/utilities/sources/KafkaSource.java |   7 +-
 .../hudi/utilities/sources/KinesisSource.java      |   5 +-
 .../hudi/utilities/sources/PulsarSource.java       |   4 +-
 .../org/apache/hudi/utilities/sources/Source.java  |  53 +---
 .../hudi/utilities/sources/SqlFileBasedSource.java |   4 +-
 .../utilities/sources/debezium/DebeziumSource.java |   5 +-
 .../utilities/sources/helpers/DFSPathSelector.java |   6 +-
 .../sources/helpers/DatePartitionPathSelector.java |   6 +-
 .../sources/helpers/S3EventsMetaSelector.java      |   9 +-
 .../apache/hudi/utilities/streamer/StreamSync.java |   7 +-
 .../streamer/StreamerCheckpointUtils.java          |  26 +-
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  19 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  36 +--
 .../deltastreamer/TestSourceFormatAdapter.java     |   6 +-
 .../hudi/utilities/sources/TestDataSource.java     |   4 +-
 .../utilities/sources/TestGcsEventsSource.java     |   3 +-
 .../utilities/sources/TestHiveIncrPullSource.java  |  99 ++++++
 .../hudi/utilities/sources/TestInputBatch.java     |   6 +-
 .../hudi/utilities/sources/TestJdbcSource.java     |   5 +-
 .../TestStreamerSourceCheckpointVersion.java       | 351 +++++++++++++++++++++
 .../helpers/TestDFSPathSelectorCommonMethods.java  |  15 +
 .../streamer/TestHoodieIncrSourceE2E.java          |   9 +-
 .../TestParquetDfsCheckpointFormatOnV6.java        | 128 ++++++++
 .../hudi/utilities/streamer/TestStreamSync.java    |  31 +-
 .../streamer/TestStreamerCheckpointUtils.java      |  52 +++
 .../sources/DistributedTestDataSource.java         |   6 +-
 .../checkpoint-v6/parquet-dfs-v1-fixture.zip       | Bin 0 -> 103450 bytes
 38 files changed, 917 insertions(+), 216 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 15084a74ae46..443383c77297 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.checkpoint;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -71,23 +70,24 @@ public class CheckpointUtils {
     throw new HoodieException("Checkpoint is not found in the commit metadata: 
" + commitMetadata.getExtraMetadata());
   }
 
-  public static Checkpoint buildCheckpointFromGeneralSource(
-      String sourceClassName, int writeTableVersion, String 
checkpointToResume) {
-    return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
sourceClassName)
-        ? new StreamerCheckpointV2(checkpointToResume) : new 
StreamerCheckpointV1(checkpointToResume);
+  /**
+   * For sources that do not have a semantic change in the checkpoint, always 
use checkpoint V1.
+   *
+   * @param checkpointToResume value of the checkpoint to resume
+   * @return {@link Checkpoint} instance
+   */
+  public static Checkpoint createCheckpoint(String checkpointToResume) {
+    return new StreamerCheckpointV1(checkpointToResume);
   }
 
-  // Whenever we create checkpoint from streamer config checkpoint override, 
we should use this function
-  // to build checkpoints.
-  public static Checkpoint buildCheckpointFromConfigOverride(
-      String sourceClassName, int writeTableVersion, String 
checkpointToResume) {
-    return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
sourceClassName)
-        ? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume) : new 
StreamerCheckpointV1(checkpointToResume);
-  }
-
-  public static boolean shouldTargetCheckpointV2(int writeTableVersion, String 
sourceClassName) {
-    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
-        && !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
+  /**
+   * For sources that do not have a semantic change in the checkpoint, always 
use checkpoint V1.
+   *
+   * @param checkpointToResume the checkpoint to resume
+   * @return {@link Checkpoint} instance
+   */
+  public static Checkpoint createCheckpoint(Checkpoint checkpointToResume) {
+    return new StreamerCheckpointV1(checkpointToResume);
   }
 
   // TODO(yihua): for checkpoint translation, handle cases where the 
checkpoint is not exactly the
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index a4b1be058799..876c8a53162a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.checkpoint;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -31,8 +30,6 @@ import org.apache.hudi.exception.HoodieException;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
 
 import java.util.stream.Stream;
 
@@ -56,7 +53,6 @@ public class TestCheckpointUtils {
   private HoodieActiveTimeline activeTimeline;
 
   private static final String CHECKPOINT_TO_RESUME = "20240101000000";
-  private static final String GENERAL_SOURCE = 
"org.apache.hudi.utilities.sources.GeneralSource";
 
   @BeforeEach
   public void setUp() {
@@ -197,64 +193,10 @@ public class TestCheckpointUtils {
     assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
   }
 
-  @ParameterizedTest
-  @CsvSource({
-      // version, sourceClassName, expectedResult
-      // Version >= 8 with allowed sources should return true
-      "8, org.apache.hudi.utilities.sources.TestSource, true",
-      "9, org.apache.hudi.utilities.sources.AnotherSource, true",
-      // Version < 8 should return false regardless of source
-      "7, org.apache.hudi.utilities.sources.TestSource, false",
-      "6, org.apache.hudi.utilities.sources.AnotherSource, false",
-      // Disallowed sources should return false even with version >= 8
-      "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
-      "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false",
-      "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource, 
false",
-      "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource, 
false"
-  })
-  public void testTargetCheckpointV2(int version, String sourceClassName, 
boolean isV2Checkpoint) {
-    assertEquals(isV2Checkpoint, 
CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version, 
"ignored") instanceof StreamerCheckpointV2);
-  }
-
-  @Test
-  public void testBuildCheckpointFromGeneralSource() {
-    // Test V2 checkpoint creation (newer table version + general source)
-    Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
-        GENERAL_SOURCE,
-        HoodieTableVersion.EIGHT.versionCode(),
-        CHECKPOINT_TO_RESUME
-    );
-    assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
-    assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
-
-    // Test V1 checkpoint creation (older table version)
-    Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
-        GENERAL_SOURCE,
-        HoodieTableVersion.SEVEN.versionCode(),
-        CHECKPOINT_TO_RESUME
-    );
-    assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
-    assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
-  }
-
   @Test
-  public void testBuildCheckpointFromConfigOverride() {
-    // Test checkpoint from config creation (newer table version + general 
source)
-    Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(
-        GENERAL_SOURCE,
-        HoodieTableVersion.EIGHT.versionCode(),
-        CHECKPOINT_TO_RESUME
-    );
-    assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class, 
checkpoint1);
-    assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
-
-    // Test V1 checkpoint creation (older table version)
-    Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromConfigOverride(
-        GENERAL_SOURCE,
-        HoodieTableVersion.SEVEN.versionCode(),
-        CHECKPOINT_TO_RESUME
-    );
-    assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
-    assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
+  void testCreateCheckpoint() {
+    Checkpoint checkpoint = 
CheckpointUtils.createCheckpoint(CHECKPOINT_TO_RESUME);
+    assertInstanceOf(StreamerCheckpointV1.class, checkpoint);
+    assertEquals(CHECKPOINT_TO_RESUME, checkpoint.getCheckpointKey());
   }
 }
diff --git 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
index c8fcd6b04e2a..97c0a5ec4fa7 100644
--- 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
+++ 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
@@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession;
 
 import java.util.List;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
 public class RandomJsonSource extends JsonSource {
   private final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen;
   private final TimeGenerator timeGenerator;
@@ -54,6 +56,6 @@ public class RandomJsonSource extends JsonSource {
     String commitTime = TimelineUtils.generateInstantTime(true, timeGenerator);
     List<String> inserts = 
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
 
-    return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)), 
commitTime);
+    return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)), 
createCheckpoint(commitTime));
   }
 }
diff --git 
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
 
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
new file mode 100644
index 000000000000..0691834abcd3
--- /dev/null
+++ 
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.common;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.sources.InputBatch;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Tests for RandomJsonSource.
+ */
+class TestRandomJsonSource {
+
+  private static JavaSparkContext jsc;
+  private static SparkSession spark;
+
+  @BeforeAll
+  static void initSpark() {
+    spark = 
SparkSession.builder().master("local[1]").appName("TestRandomJsonSource").getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+  }
+
+  @AfterAll
+  static void stopSpark() {
+    if (jsc != null) {
+      jsc.stop();
+    }
+    if (spark != null) {
+      spark.stop();
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {6, 8})
+  void testFetchNextReturnsTwentyRecordsWithCorrectCheckpointVersion(int 
writeTableVersion) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(WRITE_TABLE_VERSION.key(), 
String.valueOf(writeTableVersion));
+
+    RandomJsonSource source = new RandomJsonSource(props, jsc, spark, null);
+    InputBatch<JavaRDD<String>> batch = source.fetchNext(Option.empty(), 
Long.MAX_VALUE);
+
+    assertNotNull(batch.getBatch());
+    assertEquals(20, batch.getBatch().get().count());
+    assertNotNull(batch.getCheckpointForNextBatch());
+    assertEquals(StreamerCheckpointV1.class, 
batch.getCheckpointForNextBatch().getClass());
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index 2b8d95cc7df3..c4f85c6d98dc 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -20,7 +20,6 @@ package org.apache.hudi.integ.testsuite.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
@@ -41,6 +40,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
 /**
@@ -99,14 +99,13 @@ public class DFSTestSuitePathSelector extends 
DFSPathSelector {
       // no data to readAvro
       if (eligibleFiles.size() == 0) {
         return new ImmutablePair<>(Option.empty(),
-            lastCheckpoint.orElseGet(() -> new 
StreamerCheckpointV2(String.valueOf(Long.MIN_VALUE))));
+            lastCheckpoint.orElseGet(() -> 
createCheckpoint(String.valueOf(Long.MIN_VALUE))));
       }
       // readAvro the files out.
       String pathStr = eligibleFiles.stream().map(f -> f.getPath().toString())
           .collect(Collectors.joining(","));
 
-      return new ImmutablePair<>(Option.ofNullable(pathStr),
-          new StreamerCheckpointV2(String.valueOf(nextBatchId)));
+      return new ImmutablePair<>(Option.ofNullable(pathStr), 
createCheckpoint(String.valueOf(nextBatchId)));
     } catch (IOException ioe) {
       throw new HoodieIOException(
           "Unable to readAvro from source from checkpoint: " + lastCheckpoint, 
ioe);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index d104108f6c0c..11a44f58a0b6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -184,7 +184,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
   }
 
   private def translateCheckpoint(commitTime: String): String = {
-    if 
(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion.versionCode(), 
getClass.getName)) {
+    if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
       commitTime
     } else {
       CheckpointUtils.convertToCheckpointV1ForCommitTime(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index 2625c7d1da49..a7e699db4e90 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -45,6 +44,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -112,7 +112,7 @@ public class GcsEventsSource extends RowSource {
 
   private final List<String> messagesToAck = new ArrayList<>();
 
-  private static final Checkpoint CHECKPOINT_VALUE_ZERO = new 
StreamerCheckpointV2("0");
+  private static final String CHECKPOINT_VALUE_ZERO = "0";
 
   public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
                          SchemaProvider schemaProvider) {
@@ -153,7 +153,7 @@ public class GcsEventsSource extends RowSource {
 
     if (messageBatch.isEmpty()) {
       log.info("No new data. Returning empty batch with checkpoint value: {}", 
CHECKPOINT_VALUE_ZERO);
-      return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO);
+      return Pair.of(Option.empty(), createCheckpoint(CHECKPOINT_VALUE_ZERO));
     }
 
     int numPartitions = (int) Math.ceil(
@@ -164,9 +164,11 @@ public class GcsEventsSource extends RowSource {
 
     StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
     if (sourceSchema != null) {
-      return 
Pair.of(Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)), 
CHECKPOINT_VALUE_ZERO);
+      return Pair.of(
+          
Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)), 
createCheckpoint(CHECKPOINT_VALUE_ZERO));
     } else {
-      return Pair.of(Option.of(sparkSession.read().json(eventRecords)), 
CHECKPOINT_VALUE_ZERO);
+      return Pair.of(
+          Option.of(sparkSession.read().json(eventRecords)), 
createCheckpoint(CHECKPOINT_VALUE_ZERO));
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index b5309debe41e..333ed7e33292 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
@@ -48,6 +47,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
@@ -104,13 +104,13 @@ public class HiveIncrPullSource extends AvroSource {
 
     if (!latestTargetCommit.isPresent()) {
       // start from the beginning
-      return Option.of(new StreamerCheckpointV2(commitTimes.get(0)));
+      return Option.of(createCheckpoint(commitTimes.get(0)));
     }
 
     for (String instantTime : commitTimes) {
       // TODO(vc): Add an option to delete consumed commits
       if (instantTime.compareTo(latestTargetCommit.get().getCheckpointKey()) > 
0) {
-        return Option.of(new StreamerCheckpointV2(instantTime));
+        return Option.of(createCheckpoint(instantTime));
       }
     }
     return Option.empty();
@@ -123,7 +123,8 @@ public class HiveIncrPullSource extends AvroSource {
       Option<Checkpoint> commitToPull = findCommitToPull(lastCheckpoint);
 
       if (!commitToPull.isPresent()) {
-        return new InputBatch<>(Option.empty(), lastCheckpoint.isPresent() ? 
lastCheckpoint.get() : new StreamerCheckpointV2(""));
+        return new InputBatch<>(Option.empty(),
+            lastCheckpoint.isPresent() ? 
createCheckpoint(lastCheckpoint.get()) : createCheckpoint(""));
       }
 
       // read the files out.
@@ -133,7 +134,7 @@ public class HiveIncrPullSource extends AvroSource {
           AvroKey.class, NullWritable.class, 
sparkContext.hadoopConfiguration());
       sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new 
data");
       return new InputBatch<>(Option.of(avroRDD.keys().map(r -> 
((GenericRecord) r.datum()))),
-          String.valueOf(commitToPull.get()));
+          createCheckpoint(String.valueOf(commitToPull.get())));
     } catch (Exception e) {
       throw new HoodieReadFromSourceException("Unable to read from source from 
checkpoint: " + lastCheckpoint, e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index c09196d37c48..937219d1721c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import 
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
@@ -49,6 +48,7 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.streamer.SourceProfile;
 import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
 import org.apache.hudi.utilities.streamer.StreamContext;
+import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -203,7 +203,7 @@ public class HoodieIncrSource extends RowSource {
     String srcPath = getStringWithAltKeys(props, 
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
     HoodieTableVersion sourceTableVersion = 
HoodieTableConfig.loadFromHoodieProps(
         HoodieStorageUtils.getStorage(srcPath, 
HadoopFSUtils.getStorageConf(sparkContext.hadoopConfiguration())), 
srcPath).getTableVersion();
-    if (sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && 
CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
getClass().getName())) {
+    if (sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) && 
StreamerCheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
getClass().getName())) {
       return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
     } else {
       return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 9717ba726866..94b5c22afe7e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -40,14 +39,6 @@ public class InputBatch<T> {
   @Getter(AccessLevel.NONE)
   private final SchemaProvider schemaProvider;
 
-  public InputBatch(Option<T> batch, String checkpointForNextBatch, 
SchemaProvider schemaProvider) {
-    this(batch, new StreamerCheckpointV2(checkpointForNextBatch), 
schemaProvider);
-  }
-
-  public InputBatch(Option<T> batch, String checkpointForNextBatch) {
-    this(batch, checkpointForNextBatch, null);
-  }
-
   public InputBatch(Option<T> batch, Checkpoint checkpointForNextBatch) {
     this(batch, checkpointForNextBatch, null);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
index b236b9587a4b..0f5cd6fa68e9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -52,6 +51,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -263,12 +263,12 @@ public class JdbcSource extends RowSource {
         final String max = 
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
         log.info("Checkpointing column {} with value: {}", incrementalColumn, 
max);
         if (max != null) {
-          return new StreamerCheckpointV2(max);
+          return createCheckpoint(max);
         }
         return lastCheckpoint.isPresent() && 
!StringUtils.isNullOrEmpty(lastCheckpoint.get().getCheckpointKey())
-            ? lastCheckpoint.get() : new 
StreamerCheckpointV2(StringUtils.EMPTY_STRING);
+            ? createCheckpoint(lastCheckpoint.get()) : 
createCheckpoint(StringUtils.EMPTY_STRING);
       } else {
-        return new StreamerCheckpointV2(StringUtils.EMPTY_STRING);
+        return createCheckpoint(StringUtils.EMPTY_STRING);
       }
     } catch (Exception e) {
       log.error("Failed to checkpoint");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 943be5f30a9d..215a9c5b8833 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -49,6 +49,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 
@@ -130,11 +131,13 @@ public abstract class KafkaSource<T> extends Source<T> {
         totalNewMsgs, offsetGen.getTopicName(), Arrays.toString(offsetRanges));
     if (totalNewMsgs <= 0) {
       
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
0);
-      return new InputBatch<>(Option.empty(), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+      return new InputBatch<>(
+          Option.empty(), 
createCheckpoint(KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)));
     }
     
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT, 
totalNewMsgs);
     T newBatch = toBatch(offsetRanges);
-    return new InputBatch<>(Option.of(newBatch), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+    return new InputBatch<>(
+        Option.of(newBatch), 
createCheckpoint(KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)));
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
index a5ec35994d8e..91b52567fb07 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
@@ -51,6 +51,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 
 @Slf4j
@@ -97,7 +98,7 @@ public abstract class KinesisSource<T> extends Source<T> {
     if (shardRangesWithUnreadRecords.length == 0) {
       
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
 0);
       String checkpointStr = lastCheckpoint.isPresent() ? 
lastCheckpoint.get().getCheckpointKey() : "";
-      return new InputBatch<>(Option.empty(), checkpointStr);
+      return new InputBatch<>(Option.empty(), createCheckpoint(checkpointStr));
     }
     // STEP 3: Otherwise, do the read.
     T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit);
@@ -111,7 +112,7 @@ public abstract class KinesisSource<T> extends Source<T> {
     log.info("Read {} records from Kinesis stream {} with {} shards, 
checkpoint: {}",
         totalMsgs, offsetGen.getStreamName(), 
shardRangesWithUnreadRecords.length, checkpointStr);
 
-    return new InputBatch<>(Option.of(batch), checkpointStr);
+    return new InputBatch<>(Option.of(batch), createCheckpoint(checkpointStr));
   }
 
   /** Upper bound on consecutive empty GetRecords responses before giving up 
on a shard. */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
index d7696efdbf6c..cf6cd4093438 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.HoodieConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -54,6 +53,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -130,7 +130,7 @@ public class PulsarSource extends RowSource implements 
Closeable {
         .option("endingOffsets", endingOffsetStr)
         .load();
 
-    return Pair.of(Option.of(transform(sourceRows)), new 
StreamerCheckpointV2(endingOffsetStr));
+    return Pair.of(Option.of(transform(sourceRows)), 
createCheckpoint(endingOffsetStr));
   }
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 9313fe508b5e..75cef40c1ee5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -23,7 +23,6 @@ import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -47,7 +46,6 @@ import org.apache.spark.storage.StorageLevel;
 
 import java.io.Serializable;
 
-import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.shouldTargetCheckpointV2;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD;
 import static 
org.apache.hudi.config.HoodieWriteConfig.TAGGED_RECORD_STORAGE_LEVEL_VALUE;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -121,57 +119,24 @@ public abstract class Source<T> implements 
SourceCommitCallback, Serializable {
    * After the checkpoint value is decided based on the existing 
configurations at
    * 
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils#resolveWhatCheckpointToResume,
    *
-   * For most of the data sources the there is no difference between 
checkpoint V1 and V2, it's
-   * merely changing the wrapper class.
+   * Non-incremental sources always operate on V1 checkpoints regardless of 
the write table version,
+   * so any V2 input read from older commit metadata is normalized to V1 here.
    *
-   * Check child class method overrides to see special case handling.
+   * Hudi incremental sources have their own V1/V2 semantics (requested time 
vs completion time)
+   * and override this method.
    * */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> 
lastCheckpoint) {
     if (lastCheckpoint.isEmpty()) {
       return Option.empty();
     }
-    if (CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, 
getClass().getName())) {
-      // V2 -> V2
-      if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
-        return lastCheckpoint;
-      }
-      // V1 -> V2
-      if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
-        StreamerCheckpointV2 newCheckpoint = new 
StreamerCheckpointV2(lastCheckpoint.get());
-        newCheckpoint.addV1Props();
-        return Option.of(newCheckpoint);
-      }
-    } else {
-      // V2 -> V1
-      if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
-        return Option.of(new StreamerCheckpointV1(lastCheckpoint.get()));
-      }
-      // V1 -> V1
-      if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
-        return lastCheckpoint;
-      }
+    if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
+      return lastCheckpoint;
     }
-    throw new UnsupportedOperationException("Unsupported checkpoint type: " + 
lastCheckpoint.get());
-  }
-
-  public void assertCheckpointVersion(Option<Checkpoint> lastCheckpoint, 
Option<Checkpoint> lastCheckpointTranslated, Checkpoint checkpoint) {
-    if (checkpoint != null) {
-      boolean shouldBeV2Checkpoint = 
shouldTargetCheckpointV2(writeTableVersion, getClass().getName());
-      String errorMessage = String.format(
-          "Data source should return checkpoint version V%s. The checkpoint 
resumed in the iteration is %s, whose translated version is %s. "
-              + "The checkpoint returned after the iteration %s.",
-          shouldBeV2Checkpoint ? "2" : "1",
-          lastCheckpoint.isEmpty() ? "null" : lastCheckpointTranslated.get(),
-          lastCheckpointTranslated.isEmpty() ? "null" : 
lastCheckpointTranslated.get(),
-          checkpoint);
-      if (shouldBeV2Checkpoint && !(checkpoint instanceof 
StreamerCheckpointV2)) {
-        throw new IllegalStateException(errorMessage);
-      }
-      if (!shouldBeV2Checkpoint && !(checkpoint instanceof 
StreamerCheckpointV1)) {
-        throw new IllegalStateException(errorMessage);
-      }
+    if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
+      return Option.of(new StreamerCheckpointV1(lastCheckpoint.get()));
     }
+    throw new UnsupportedOperationException("Unsupported checkpoint type: " + 
lastCheckpoint.get());
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
index 60cac125a029..913806420323 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
@@ -39,6 +38,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Scanner;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -92,7 +92,7 @@ public class SqlFileBasedSource extends RowSource {
           rows = sparkSession.sql(sqlStr);
         }
       }
-      return Pair.of(Option.of(rows), shouldEmitCheckPoint ? new 
StreamerCheckpointV2(String.valueOf(System.currentTimeMillis())) : null);
+      return Pair.of(Option.of(rows), shouldEmitCheckPoint ? 
createCheckpoint(String.valueOf(System.currentTimeMillis())) : null);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error reading source SQL file.", ioe);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 030f5e0e671b..d45cac3922ff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources.debezium;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
@@ -59,6 +58,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
@@ -128,7 +128,8 @@ public abstract class DebeziumSource extends RowSource {
       log.info("Spark schema of Kafka Payload for topic {}:\n{}", 
offsetGen.getTopicName(), dataset.schema().treeString());
       log.info("New checkpoint string: {}", 
CheckpointUtils.offsetsToStr(offsetRanges));
       return Pair.of(Option.of(dataset),
-              new StreamerCheckpointV2(overrideCheckpointStr.isEmpty() ? 
CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr));
+              createCheckpoint(overrideCheckpointStr.isEmpty()
+                  ? CheckpointUtils.offsetsToStr(offsetRanges) : 
overrideCheckpointStr));
     } catch (Exception e) {
       log.error("Fatal error reading and parsing incoming debezium event", e);
       throw new HoodieReadFromSourceException("Fatal error reading and parsing 
incoming debezium event", e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 594034308ee8..071499d13a9b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -47,6 +46,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static 
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
@@ -151,13 +151,13 @@ public class DFSPathSelector implements Serializable {
 
       // no data to read
       if (filteredFiles.isEmpty()) {
-        return new ImmutablePair<>(Option.empty(), new 
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+        return new ImmutablePair<>(Option.empty(), 
createCheckpoint(String.valueOf(newCheckpointTime)));
       }
 
       // read the files out.
       String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
 
-      return new ImmutablePair<>(Option.ofNullable(pathStr), new 
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+      return new ImmutablePair<>(Option.ofNullable(pathStr), 
createCheckpoint(String.valueOf(newCheckpointTime)));
     } catch (IOException ioe) {
       throw new HoodieIOException("Unable to read from source from checkpoint: 
" + lastCheckpointStr, ioe);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 989be9163da1..0892ca4d3564 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources.helpers;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -44,6 +43,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.DFSPathSelectorConfig.ROOT_INPUT_PATH;
@@ -159,13 +159,13 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
 
     // no data to read
     if (filteredFiles.isEmpty()) {
-      return new ImmutablePair<>(Option.empty(), new 
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+      return new ImmutablePair<>(Option.empty(), 
createCheckpoint(String.valueOf(newCheckpointTime)));
     }
 
     // read the files out.
     String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
 
-    return new ImmutablePair<>(Option.ofNullable(pathStr), new 
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+    return new ImmutablePair<>(Option.ofNullable(pathStr), 
createCheckpoint(String.valueOf(newCheckpointTime)));
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
index 3cd073e72109..b5ccaa58f374 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -20,7 +20,7 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -42,6 +42,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
 /**
@@ -155,8 +156,10 @@ public class S3EventsMetaSelector extends 
CloudObjectsSelector {
       for (Map<String, Object> eventRecord : eventRecords) {
         
filteredEventRecords.add(SdkHttpUtils.urlDecode(MAPPER.writeValueAsString(eventRecord)));
       }
-      // Return the old checkpoint if no messages to consume from queue.
-      Checkpoint newCheckpoint = newCheckpointTime == 0 ? 
lastCheckpoint.orElse(null) : new 
StreamerCheckpointV2(String.valueOf(newCheckpointTime));
+      // Re-wrap a prior V2 checkpoint as V1 to avoid leaking it back to 
commit metadata.
+      Checkpoint newCheckpoint = newCheckpointTime == 0
+          ? lastCheckpoint.map(CheckpointUtils::createCheckpoint).orElse(null)
+          : createCheckpoint(String.valueOf(newCheckpointTime));
       return new ImmutablePair<>(filteredEventRecords, newCheckpoint);
     } catch (JSONException | IOException e) {
       throw new HoodieException("Unable to read from SQS: ", e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1af9c503b613..7f38d80f8308 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -153,7 +154,7 @@ import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordQualified
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
-import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromGeneralSource;
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
 import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
@@ -173,6 +174,7 @@ import static 
org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_F
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 import static 
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo;
+import static 
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.shouldTargetCheckpointV2;
 
 /**
  * Sync's one batch of data to hoodie table.
@@ -1031,7 +1033,8 @@ public class StreamSync implements Serializable, 
Closeable {
     }
 
     // Otherwise create new checkpoint based on version
-    Checkpoint checkpoint = 
buildCheckpointFromGeneralSource(cfg.sourceClassName, versionCode, null);
+    Checkpoint checkpoint = shouldTargetCheckpointV2(versionCode, 
cfg.sourceClassName)
+        ? new StreamerCheckpointV2((String) null) : createCheckpoint((String) 
null);
 
     return checkpoint.getCheckpointCommitMetadata(cfg.checkpoint, 
cfg.ignoreCheckpoint);
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index 323a2b95149d..2b28e09e3a2c 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import 
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -44,8 +46,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2;
 import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.HOODIE_INCREMENTAL_SOURCES;
-import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
@@ -56,6 +58,28 @@ import static 
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowng
 @Slf4j
 public class StreamerCheckpointUtils {
 
+  /**
+   * Wraps a user-supplied checkpoint override string. For HoodieIncrSource 
family on table version
+   * 8+, returns {@link UnresolvedStreamerCheckpointBasedOnCfg} so the source 
can resolve V1/V2
+   * cursor semantics from the override's prefix; for everything else returns 
V1.
+   */
+  public static Checkpoint buildCheckpointFromConfigOverride(
+      String sourceClassName, int writeTableVersion, String 
checkpointToResume) {
+    return shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
+        ? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume)
+        : new StreamerCheckpointV1(checkpointToResume);
+  }
+
+  /**
+   * True only for the HoodieIncrSource family on table version 8+, where V2 
(completion-time)
+   * cursor semantics differ from V1 (requested-time). Every other source 
operates on V1 only.
+   */
+  public static boolean shouldTargetCheckpointV2(int writeTableVersion, String 
sourceClassName) {
+    return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
+        && HOODIE_INCREMENTAL_SOURCES.contains(sourceClassName)
+        && !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
+  }
+
   /**
    * The first phase of checkpoint resolution - read the checkpoint configs 
from 2 sources and resolve
    * conflicts:
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index f38f8db4f48b..8bece4c3df2c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -83,11 +82,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 
+import static 
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Slf4j
@@ -740,10 +741,24 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       HoodieInstant lastInstant = timeline.lastInstant().get();
       HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(lastInstant);
       assertEquals(totalCommits, timeline.countInstants());
+      assertEquals(expected, 
commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
+      assertNull(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
+      return lastInstant;
+    }
+
+    static HoodieInstant assertCommitMetadataForIncrSource(String expected, 
String tablePath, int totalCommits)
+        throws IOException {
+      HoodieTableMetaClient meta = createMetaClient(storage.getConf(), 
tablePath);
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      HoodieInstant lastInstant = timeline.lastInstant().get();
+      HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(lastInstant);
+      assertEquals(totalCommits, timeline.countInstants());
       if 
(meta.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT))
 {
-        assertEquals(expected, 
commitMetadata.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2));
+        assertEquals(expected, 
commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
+        assertNull(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
       } else {
         assertEquals(expected, 
commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
+        assertNull(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
       }
       return lastInstant;
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d180dc720ea5..ca418d519560 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -461,17 +461,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         expectedKeyGeneratorClassName, 
metaClient.getTableConfig().getKeyGeneratorClassName());
     Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
     assertEquals(1000, res.count());
-    assertUseV2Checkpoint(metaClient);
+    assertCheckpointVersion(metaClient);
   }
 
-  private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
+  private static void assertCheckpointVersion(HoodieTableMetaClient 
metaClient) {
     metaClient.reloadActiveTimeline();
     Option<HoodieCommitMetadata> metadata = 
HoodieClientTestUtils.getCommitMetadataForInstant(
         metaClient, metaClient.getActiveTimeline().lastInstant().get());
     assertFalse(metadata.isEmpty());
     Map<String, String> extraMetadata = metadata.get().getExtraMetadata();
-    assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
-    assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
   }
 
   @Test
@@ -632,7 +632,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
 
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
 
     assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -645,7 +645,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1450, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -710,7 +710,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
 
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
     assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -734,7 +734,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
 
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
     assertRecordCount(1450, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     tableSchemaResolver = new TableSchemaResolver(
@@ -771,7 +771,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
 
       syncOnce(cfg);
-      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
       assertRecordCount(1000, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
       TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -791,7 +791,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
 
       syncOnce(cfg);
-      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
       assertRecordCount(1450, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
       tableSchemaResolver = new TableSchemaResolver(
@@ -857,7 +857,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
           tableBasePath, WriteOperationType.INSERT, hasTransformer, 
orderingField, recordType, tableType);
       syncOnce(cfg);
       // Validate.
-      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
       assertRecordCount(1000, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500", 
tableBasePath, 1);
       TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -873,7 +873,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
           tableBasePath, WriteOperationType.UPSERT, hasTransformer, 
orderingField, recordType, tableType);
       syncOnce(cfg);
       // Validate.
-      assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+      assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
       assertRecordCount(1500, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata(topicName + ",0:1250,1:1250", 
tableBasePath, 2);
       tableSchemaResolver = new TableSchemaResolver(
@@ -1450,7 +1450,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
     cfg.continuousMode = false;
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
     assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
     assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -1479,7 +1479,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
     cfg.continuousMode = false;
     syncOnce(cfg);
-    assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
+    assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage, 
tableBasePath));
     assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
     assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -2392,7 +2392,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
                   entry, metaClient, WriteOperationType.BULK_INSERT));
         }
       }
-      assertUseV2Checkpoint(createMetaClient(jsc, tableBasePath));
+      assertCheckpointVersion(createMetaClient(jsc, tableBasePath));
     } finally {
       deltaStreamer.shutdownGracefully();
     }
@@ -2512,7 +2512,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-    
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 1);
+    
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 1);
 
     // No new data => no commits for upstream table
     cfg.sourceLimit = 0;
@@ -2530,7 +2530,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-    
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 1);
+    
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 1);
 
     // upsert() #1 on upstream hudi table
     cfg.sourceLimit = 2000;
@@ -2554,7 +2554,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(2000, downstreamTableBasePath, 
sqlContext);
     HoodieInstant finalInstant =
-        
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 2);
+        
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
 downstreamTableBasePath, 2);
     counts = countsPerCommit(downstreamTableBasePath, sqlContext);
     assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 59f8605663bc..b54be7cb1890 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -95,13 +95,13 @@ public class TestSourceFormatAdapter {
   }
 
   private void setupRowSource(Dataset<Row> ds, TypedProperties properties, 
SchemaProvider schemaProvider) {
-    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), 
DUMMY_CHECKPOINT, schemaProvider);
+    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), new 
StreamerCheckpointV2(DUMMY_CHECKPOINT), schemaProvider);
     testRowDataSource = new TestRowDataSource(properties, jsc, spark, 
schemaProvider, batch);
   }
 
   private void setupJsonSource(JavaRDD<String> ds, HoodieSchema schema) {
     SchemaProvider basicSchemaProvider = new BasicSchemaProvider(schema);
-    InputBatch<JavaRDD<String>> batch = new InputBatch<>(Option.of(ds), 
DUMMY_CHECKPOINT, basicSchemaProvider);
+    InputBatch<JavaRDD<String>> batch = new InputBatch<>(Option.of(ds), new 
StreamerCheckpointV2(DUMMY_CHECKPOINT), basicSchemaProvider);
     testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc, 
spark, basicSchemaProvider, batch);
   }
 
@@ -183,7 +183,7 @@ public class TestSourceFormatAdapter {
     SchemaProvider schemaProvider = new 
TestSchemaProviderWithTransformation(sourceSchema, targetSchema);
 
     // Setup the row source
-    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(testDataset), 
DUMMY_CHECKPOINT, schemaProvider);
+    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(testDataset), 
new StreamerCheckpointV2(DUMMY_CHECKPOINT), schemaProvider);
     TestRowDataSource testSource = new TestRowDataSource(new 
TypedProperties(), jsc, spark, schemaProvider, batch);
 
     // Create SourceFormatAdapter and fetch data in Avro format
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 1ec7842fe4e1..ed993a837c99 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -33,6 +33,8 @@ import org.apache.spark.sql.SparkSession;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
 /**
  * An implementation of {@link Source}, that emits test upserts.
  */
@@ -69,6 +71,6 @@ public class TestDataSource extends AbstractBaseTestSource {
     List<GenericRecord> records =
         fetchNextBatch(props, (int) sourceLimit, 
recordInstantTime.orElse(instantTime), 
DEFAULT_PARTITION_NUM).collect(Collectors.toList());
     JavaRDD<GenericRecord> avroRDD = 
sparkContext.<GenericRecord>parallelize(records, 4);
-    return new InputBatch<>(Option.of(avroRDD), instantTime);
+    return new InputBatch<>(Option.of(avroRDD), createCheckpoint(instantTime));
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index c45a9eaf7d10..e7b9b2b46c3c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -60,7 +61,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
   protected FilebasedSchemaProvider schemaProvider;
   private TypedProperties props;
 
-  private static final Checkpoint CHECKPOINT_VALUE_ZERO = new 
StreamerCheckpointV2("0");
+  private static final Checkpoint CHECKPOINT_VALUE_ZERO = new 
StreamerCheckpointV1("0");
 
   @BeforeAll
   public static void beforeAll() throws Exception {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
new file mode 100644
index 000000000000..8fbb98e0ab6d
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hudi.utilities.config.HiveIncrPullSourceConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHiveIncrPullSource extends HoodieSparkClientTestHarness {
+
+  private TypedProperties props;
+  private String incrPullRoot;
+
+  @BeforeEach
+  void setUp() throws Exception {
+    initSparkContexts();
+    initPath();
+    initHoodieStorage();
+    incrPullRoot = basePath + "/incrPullRoot";
+    FileSystem fs = (FileSystem) storage.getFileSystem();
+    fs.mkdirs(new Path(incrPullRoot));
+    props = new TypedProperties();
+    props.setProperty(HiveIncrPullSourceConfig.ROOT_INPUT_PATH.key(), 
incrPullRoot);
+  }
+
+  @AfterEach
+  void teardown() throws Exception {
+    cleanupResources();
+  }
+
+  private void createCommitDir(String commitTime) throws Exception {
+    FileSystem fs = (FileSystem) storage.getFileSystem();
+    fs.mkdirs(new Path(incrPullRoot, commitTime));
+  }
+
+  @Test
+  void 
findCommitToPullReturnsV1CheckpointForFirstCommitWhenNoLatestTargetCommit() 
throws Exception {
+    createCommitDir("20240101");
+    createCommitDir("20240102");
+    HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, 
sparkSession, null);
+    Method findCommitToPull = 
HiveIncrPullSource.class.getDeclaredMethod("findCommitToPull", Option.class);
+    findCommitToPull.setAccessible(true);
+
+    @SuppressWarnings("unchecked")
+    Option<Checkpoint> result = (Option<Checkpoint>) 
findCommitToPull.invoke(source, Option.empty());
+
+    assertTrue(result.isPresent());
+    assertInstanceOf(StreamerCheckpointV1.class, result.get());
+    assertEquals("20240101", result.get().getCheckpointKey());
+  }
+
+  @Test
+  void readFromCheckpointRewrapsV2LastCheckpointAsV1WhenNoCommitToPull() 
throws Exception {
+    createCommitDir("20240101");
+    createCommitDir("20240102");
+    HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, 
sparkSession, null);
+
+    // lastCheckpoint past the last commit so findCommitToPull returns empty 
and the early return hits.
+    InputBatch<?> batch = source.readFromCheckpoint(
+        Option.of(new StreamerCheckpointV2("20240103")), Long.MAX_VALUE);
+
+    assertFalse(batch.getBatch().isPresent());
+    assertInstanceOf(StreamerCheckpointV1.class, 
batch.getCheckpointForNextBatch());
+    assertEquals("20240103", 
batch.getCheckpointForNextBatch().getCheckpointKey());
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
index f7bb36076a7a..bf969ae2ced4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
@@ -34,7 +34,7 @@ public class TestInputBatch {
 
   @Test
   public void getSchemaProviderShouldThrowException() {
-    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
(String) null, null);
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, null);
     Throwable t = assertThrows(HoodieException.class, 
inputBatch::getSchemaProvider);
     assertEquals("Schema provider is required for this operation and for the 
source of interest. "
         + "Please set '--schemaprovider-class' in the top level HoodieStreamer 
config for the source of interest. "
@@ -45,7 +45,7 @@ public class TestInputBatch {
 
   @Test
   public void getSchemaProviderShouldReturnNullSchemaProvider() {
-    final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), 
(String) null, null);
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(), 
null, null);
     SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
     assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
   }
@@ -53,7 +53,7 @@ public class TestInputBatch {
   @Test
   public void getSchemaProviderShouldReturnGivenSchemaProvider() {
     SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
-    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
(String) null, schemaProvider);
+    final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"), 
null, schemaProvider);
     assertSame(schemaProvider, inputBatch.getSchemaProvider());
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
index 6d00c9c91bf6..9fc76f94f513 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
@@ -273,7 +274,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
       InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100);
       Dataset<Row> rowDataset = batch.getBatch().get();
       assertEquals(100, rowDataset.count());
-      assertEquals(new StreamerCheckpointV2("100"), 
batch.getCheckpointForNextBatch());
+      assertEquals(new StreamerCheckpointV1("100"), 
batch.getCheckpointForNextBatch());
 
       // Add 100 records with commit time "001"
       insert("001", 100, connection, DATA_GENERATOR, PROPS);
@@ -361,7 +362,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
       InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
       Dataset<Row> rowDataset = batch.getBatch().get();
       assertEquals(10, rowDataset.count());
-      assertEquals(new StreamerCheckpointV2(""), 
batch.getCheckpointForNextBatch());
+      assertEquals(new StreamerCheckpointV1(""), 
batch.getCheckpointForNextBatch());
 
       // Get max of incremental column
       Column incrementalColumn = rowDataset
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
new file mode 100644
index 000000000000..32480348a565
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
+import org.apache.hudi.utilities.config.JdbcSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import 
org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.KinesisShardRange;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.streaming.kafka010.OffsetRange;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Validates that every non-incremental streamer source emits a V1 checkpoint 
regardless of the
+ * configured write table version or the wrapper class of the input checkpoint 
(so V2 keys
+ * persisted by older releases are read on input but never written back as 
V2). Hudi incremental
+ * sources own their own V1/V2 semantics and are exercised by
+ * {@link #testS3AndGcsIncrSourcesStayV1OnBothTableVersions()} plus
+ * {@code TestHoodieIncrSource} family.
+ */
+class TestStreamerSourceCheckpointVersion {
+
+  private static JavaSparkContext jsc;
+  private static SparkSession spark;
+
+  @TempDir
+  static java.nio.file.Path tempDir;
+
+  @BeforeAll
+  static void initSpark() {
+    spark = 
SparkSession.builder().master("local[1]").appName("TestSourceCheckpointVersion").getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+  }
+
+  @AfterAll
+  static void stopSpark() {
+    if (jsc != null) {
+      jsc.stop();
+    }
+    if (spark != null) {
+      spark.stop();
+    }
+  }
+
+  // Covers AvroDFSSource, JsonDFSSource, CsvDFSSource, ParquetDFSSource, 
ORCDFSSource.
+  @ParameterizedTest
+  @ValueSource(ints = {6, 8})
+  void testDfsPathSelector(int writeTableVersion) throws IOException {
+    TypedProperties props = propsWith(writeTableVersion);
+    props.setProperty(DFSPathSelectorConfig.ROOT_INPUT_PATH.key(), 
tempDir.toString());
+    Configuration hadoopConf = jsc.hadoopConfiguration();
+    FileSystem fs = FileSystem.get(hadoopConf);
+    fs.mkdirs(new Path(tempDir.toString()));
+    DFSPathSelector selector = new DFSPathSelector(props, hadoopConf);
+    Pair<Option<String>, Checkpoint> result =
+        selector.getNextFilePathsAndMaxModificationTime(jsc, Option.empty(), 
Long.MAX_VALUE);
+    assertV1(result.getRight());
+  }
+
+  // Covers AvroKafkaSource, JsonKafkaSource, ProtoKafkaSource.
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testKafkaSource(int writeTableVersion, InputCheckpointKind inputKind) {
+    TestableKafkaSource source = new 
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+    KafkaOffsetGen offsetGen = mock(KafkaOffsetGen.class);
+    when(offsetGen.getNextOffsetRanges(any(), anyLong(), any())).thenReturn(
+        new OffsetRange[] {OffsetRange.create("t", 0, 0L, 0L)});
+    when(offsetGen.getTopicName()).thenReturn("t");
+    source.offsetGen = offsetGen;
+    InputBatch<String> batch = source.fetchNext(makeInputCheckpoint(inputKind, 
"k"), 1L);
+    assertV1(batch.getCheckpointForNextBatch());
+  }
+
+  // Covers JsonKinesisSource.
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testKinesisSource(int writeTableVersion, InputCheckpointKind inputKind) 
{
+    TestableKinesisSource source = new 
TestableKinesisSource(propsWith(writeTableVersion), jsc, spark);
+    KinesisOffsetGen offsetGen = mock(KinesisOffsetGen.class);
+    when(offsetGen.getStreamName()).thenReturn("s");
+    when(offsetGen.getNextShardRanges(any(), anyLong())).thenReturn(
+        new KinesisShardRange[0]);
+    source.setOffsetGen(offsetGen);
+    InputBatch<JavaRDD<String>> batch = 
source.fetchNext(makeInputCheckpoint(inputKind, "s"), 1L);
+    assertV1(batch.getCheckpointForNextBatch());
+  }
+
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testJdbcSource(int writeTableVersion, InputCheckpointKind inputKind) 
throws Exception {
+    JdbcSource source = new JdbcSource(propsWith(writeTableVersion), jsc, 
spark, null);
+    Method m = JdbcSource.class.getDeclaredMethod(
+        "checkpoint", Dataset.class, boolean.class, Option.class);
+    m.setAccessible(true);
+    Checkpoint c = (Checkpoint) m.invoke(source, null, false, 
makeInputCheckpoint(inputKind, "k"));
+    assertV1(c);
+  }
+
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testSqlFileBasedSource(int writeTableVersion, InputCheckpointKind 
inputKind) throws IOException {
+    java.nio.file.Path sqlFile = tempDir.resolve("q.sql");
+    Files.write(sqlFile, "SELECT 1".getBytes());
+    TypedProperties props = propsWith(writeTableVersion);
+    props.setProperty("hoodie.streamer.source.sql.file", sqlFile.toString());
+    props.setProperty("hoodie.streamer.source.sql.checkpoint.emit", "true");
+    SqlFileBasedSource source = new SqlFileBasedSource(props, jsc, spark, 
null);
+    Pair<Option<Dataset<Row>>, Checkpoint> result =
+        invokeRowSourceFetch(source, makeInputCheckpoint(inputKind, "k"));
+    assertV1(result.getRight());
+  }
+
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testHiveIncrPullSource(int writeTableVersion, InputCheckpointKind 
inputKind) throws Exception {
+    Files.createDirectories(tempDir.resolve("20200101000000"));
+    TypedProperties props = propsWith(writeTableVersion);
+    props.setProperty("hoodie.streamer.source.incrpull.root", 
tempDir.toString());
+    HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, spark, 
null);
+    Method m = HiveIncrPullSource.class.getDeclaredMethod("findCommitToPull", 
Option.class);
+    m.setAccessible(true);
+    @SuppressWarnings("unchecked")
+    Option<Checkpoint> result = (Option<Checkpoint>) m.invoke(
+        source, makeInputCheckpoint(inputKind, "00000000000000"));
+    assertV1(result.get());
+  }
+
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testGcsEventsSource(int writeTableVersion, InputCheckpointKind 
inputKind) {
+    PubsubMessagesFetcher fetcher = mock(PubsubMessagesFetcher.class);
+    when(fetcher.fetchMessages()).thenReturn(Collections.emptyList());
+    TypedProperties props = propsWith(writeTableVersion);
+    props.setProperty("hoodie.streamer.source.gcs.project.id", "p");
+    props.setProperty("hoodie.streamer.source.gcs.subscription.id", "s");
+    GcsEventsSource source = new GcsEventsSource(props, jsc, spark, null, 
fetcher);
+    Pair<Option<Dataset<Row>>, Checkpoint> result =
+        invokeRowSourceFetch(source, makeInputCheckpoint(inputKind, "k"));
+    assertV1(result.getRight());
+  }
+
+  @ParameterizedTest
+  @EnumSource(InputCheckpointKind.class)
+  void testJdbcSourceIncrementalWithMaxValueEmitsV1(InputCheckpointKind 
inputKind) throws Exception {
+    TypedProperties props = propsWith(8);
+    props.setProperty(JdbcSourceConfig.INCREMENTAL_COLUMN.key(), "idx");
+    JdbcSource source = new JdbcSource(props, jsc, spark, null);
+    StructType schema = new StructType().add("idx", DataTypes.StringType, 
true);
+    List<Row> rows = Arrays.asList(RowFactory.create("100"), 
RowFactory.create("200"));
+    Dataset<Row> dataset = spark.createDataFrame(rows, schema);
+    Method m = JdbcSource.class.getDeclaredMethod(
+        "checkpoint", Dataset.class, boolean.class, Option.class);
+    m.setAccessible(true);
+    Checkpoint c = (Checkpoint) m.invoke(source, dataset, true, 
makeInputCheckpoint(inputKind, "k"));
+    assertV1(c);
+    assertEquals("200", c.getCheckpointKey());
+  }
+
+  // Drives the isIncremental + max==null pass-through to confirm it re-wraps 
a V2 input as V1.
+  @ParameterizedTest
+  @EnumSource(InputCheckpointKind.class)
+  void testJdbcSourcePassThroughEmitsV1(InputCheckpointKind inputKind) throws 
Exception {
+    TypedProperties props = propsWith(8);
+    props.setProperty(JdbcSourceConfig.INCREMENTAL_COLUMN.key(), "idx");
+    JdbcSource source = new JdbcSource(props, jsc, spark, null);
+    StructType schema = new StructType().add("idx", DataTypes.StringType, 
true);
+    List<Row> rows = Collections.singletonList(RowFactory.create((Object) 
null));
+    Dataset<Row> nullColDataset = spark.createDataFrame(rows, schema);
+    Method m = JdbcSource.class.getDeclaredMethod(
+        "checkpoint", Dataset.class, boolean.class, Option.class);
+    m.setAccessible(true);
+    Checkpoint c = (Checkpoint) m.invoke(source, nullColDataset, true, 
makeInputCheckpoint(inputKind, "k"));
+    assertV1(c);
+  }
+
+  // Input key sorts after the only commit on disk so findCommitToPull returns 
empty and
+  // readFromCheckpoint enters its pass-through branch.
+  @ParameterizedTest
+  @EnumSource(InputCheckpointKind.class)
+  void testHiveIncrPullSourcePassThroughEmitsV1(InputCheckpointKind inputKind) 
throws IOException {
+    Files.createDirectories(tempDir.resolve("20200101000000"));
+    TypedProperties props = propsWith(8);
+    props.setProperty("hoodie.streamer.source.incrpull.root", 
tempDir.toString());
+    HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, spark, 
null);
+    InputBatch<?> batch = source.readFromCheckpoint(
+        makeInputCheckpoint(inputKind, "30000000000000"), 1L);
+    assertV1(batch.getCheckpointForNextBatch());
+  }
+
+  @ParameterizedTest
+  @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+  void testTranslateCheckpointNormalizesToV1(int writeTableVersion, 
InputCheckpointKind inputKind) {
+    TestableKafkaSource source = new 
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+    Option<Checkpoint> translated = 
source.translateCheckpoint(makeInputCheckpoint(inputKind, "k"));
+    assertV1(translated.get());
+    assertEquals("k", translated.get().getCheckpointKey());
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {6, 8})
+  void testTranslateCheckpointPreservesEmpty(int writeTableVersion) {
+    TestableKafkaSource source = new 
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+    assertTrue(source.translateCheckpoint(Option.empty()).isEmpty());
+  }
+
+  @Test
+  void testS3AndGcsIncrSourcesStayV1OnBothTableVersions() {
+    String s3 = S3EventsHoodieIncrSource.class.getName();
+    String gcs = GcsEventsHoodieIncrSource.class.getName();
+    
assertTrue(CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(s3));
+    
assertTrue(CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(gcs));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, s3));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, gcs));
+  }
+
+  private static TypedProperties propsWith(int writeTableVersion) {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(WRITE_TABLE_VERSION.key(), 
String.valueOf(writeTableVersion));
+    return props;
+  }
+
+  private static void assertV1(Checkpoint c) {
+    assertEquals(StreamerCheckpointV1.class, c.getClass());
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Pair<Option<Dataset<Row>>, Checkpoint> invokeRowSourceFetch(
+      RowSource source, Option<Checkpoint> lastCheckpoint) {
+    try {
+      Method m = source.getClass().getDeclaredMethod("fetchNextBatch", 
Option.class, long.class);
+      m.setAccessible(true);
+      return (Pair<Option<Dataset<Row>>, Checkpoint>) m.invoke(source, 
lastCheckpoint, 1L);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private enum InputCheckpointKind { V1, V2 }
+
+  private static Option<Checkpoint> makeInputCheckpoint(InputCheckpointKind 
kind, String key) {
+    switch (kind) {
+      case V1: return Option.of(new StreamerCheckpointV1(key));
+      case V2: return Option.of(new StreamerCheckpointV2(key));
+      default: throw new IllegalArgumentException("Unsupported kind: " + kind);
+    }
+  }
+
+  private static class TestableKafkaSource extends KafkaSource<String> {
+    TestableKafkaSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark) {
+      super(props, jsc, spark, SourceType.JSON, 
mock(HoodieIngestionMetrics.class),
+          new DefaultStreamContext(null, Option.empty()));
+    }
+
+    @Override
+    protected String toBatch(OffsetRange[] offsetRanges) {
+      return "batch";
+    }
+  }
+
+  private static class TestableKinesisSource extends 
KinesisSource<JavaRDD<String>> {
+    TestableKinesisSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark) {
+      super(props, jsc, spark, SourceType.JSON, 
mock(HoodieIngestionMetrics.class),
+          new DefaultStreamContext((SchemaProvider) null, Option.empty()));
+    }
+
+    void setOffsetGen(KinesisOffsetGen gen) {
+      this.offsetGen = gen;
+    }
+
+    @Override
+    protected JavaRDD<String> toBatch(KinesisShardRange[] shardRanges, long 
sourceLimit) {
+      return jsc.emptyRDD();
+    }
+
+    @Override
+    protected String createCheckpointFromBatch(JavaRDD<String> batch,
+        KinesisShardRange[] shardRangesWithUnreadRecords,
+        KinesisShardRange[] allOpenClosedShardRanges) {
+      return "checkpoint";
+    }
+
+    @Override
+    protected long getRecordCount(JavaRDD<String> batch) {
+      return 1L;
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
index 5b3d970c0368..30e138d5e3e9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -44,6 +45,7 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtilsLegacy.createBaseF
 import static 
org.apache.hudi.utilities.config.DFSPathSelectorConfig.ROOT_INPUT_PATH;
 import static 
org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig.PARTITIONS_LIST_PARALLELISM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarness {
@@ -164,4 +166,17 @@ public class TestDFSPathSelectorCommonMethods extends 
HoodieSparkClientTestHarne
     String checkpointStr2ndRead = 
nextFilePathsAndCheckpoint.getRight().getCheckpointKey();
     assertEquals(2000L, Long.parseLong(checkpointStr2ndRead), "should read up 
to foo5 (inclusive)");
   }
+
+  @ParameterizedTest
+  @ValueSource(classes = {DFSPathSelector.class, 
DatePartitionPathSelector.class})
+  void 
getNextFilePathsAndMaxModificationTimeReturnsV1CheckpointWhenNoEligibleFiles(Class<?>
 clazz) throws Exception {
+    DFSPathSelector selector = (DFSPathSelector) 
ReflectionUtils.loadClass(clazz.getName(), props, storageConf.unwrap());
+    createBaseFile(basePath, "p1", "000", "foo1", 10, 1000);
+    createBaseFile(basePath, "p1", "000", "foo2", 10, 2000);
+    Pair<Option<String>, Checkpoint> result = selector
+        .getNextFilePathsAndMaxModificationTime(jsc, Option.of(new 
StreamerCheckpointV2("999999999")), 30);
+    assertTrue(result.getLeft().isEmpty());
+    assertInstanceOf(StreamerCheckpointV1.class, result.getRight());
+    assertEquals("999999999", result.getRight().getCheckpointKey());
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
index 409b2d9fe9c8..1b010efab7cb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -512,9 +511,9 @@ public class TestHoodieIncrSourceE2E extends 
S3EventsHoodieIncrSourceHarness {
   @Test
   public void testTargetCheckpointV2ForS3Gcs() {
     // To ensure we properly track sources that must use checkpoint V1.
-    assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8, 
S3EventsHoodieIncrSource.class.getName()));
-    assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6, 
S3EventsHoodieIncrSource.class.getName()));
-    assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8, 
GcsEventsHoodieIncrSource.class.getName()));
-    assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6, 
GcsEventsHoodieIncrSource.class.getName()));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, 
S3EventsHoodieIncrSource.class.getName()));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(6, 
S3EventsHoodieIncrSource.class.getName()));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, 
GcsEventsHoodieIncrSource.class.getName()));
+    assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(6, 
GcsEventsHoodieIncrSource.class.getName()));
   }
 }
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
new file mode 100644
index 000000000000..596be797e36b
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Continues ingestion against a fixture table that was originally written 
with V1 checkpoint
+ * format only and is now being resumed on a Hudi 1.x release with
+ * {@code hoodie.write.table.version=6}.
+ *
+ * <p>Expected: the next commit's {@code extraMetadata} carries the V1 key
+ * {@code deltastreamer.checkpoint.key} and NOT the V2 key {@code 
streamer.checkpoint.key.v2},
+ * because non-incremental sources emit V1 regardless of write table version.
+ *
+ * <p>If a V2 key shows up on the resumed commit, that reproduces the bug 
reported against
+ * {@code hoodie.write.table.version=6}.
+ */
+public class TestParquetDfsCheckpointFormatOnV6 extends 
HoodieDeltaStreamerTestBase {
+
+  private static final String FIXTURE_RESOURCE = 
"checkpoint-v6/parquet-dfs-v1-fixture.zip";
+
+  @Test
+  public void resumedV6CommitKeepsV1CheckpointFormat() throws Exception {
+    String dirName = "checkpoint-v6-resume-" + System.currentTimeMillis();
+    String dataPath = basePath + "/" + dirName;
+    Path zipOutput = Paths.get(new URI(dataPath));
+    Files.createDirectories(zipOutput);
+    HoodieTestUtils.extractZipToDirectory(FIXTURE_RESOURCE, zipOutput, 
getClass());
+
+    // The fixture table is rooted directly at zipOutput (zip contains 
.hoodie/, partitions, etc.).
+    String tableBasePath = zipOutput.toString();
+    assertTrue(Files.exists(zipOutput.resolve(".hoodie/hoodie.properties")),
+        "Fixture did not unpack a hudi table at " + tableBasePath);
+
+    // Sanity-check that the fixture's last commit carries a V1 checkpoint and 
no V2 key.
+    HoodieCommitMetadata baselineCommit = 
readLatestCommitMetadata(tableBasePath);
+    
assertNotNull(baselineCommit.getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1),
+        "Fixture baseline expected to carry V1 checkpoint key.");
+    
assertNull(baselineCommit.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2),
+        "Fixture baseline must not have a V2 checkpoint key (was built on 
master).");
+
+    // Produce a fresh parquet source file under a new root so the streamer 
has work to do.
+    String parquetSourceRoot = basePath + "/" + dirName + "-source";
+    prepareParquetDFSFiles(50, parquetSourceRoot, "resume.parquet", false, 
null, null).close();
+
+    String propsFile = dirName + "-source.properties";
+    TypedProperties extraProps = new TypedProperties();
+    extraProps.setProperty("hoodie.datasource.write.table.type", 
HoodieTableType.COPY_ON_WRITE.name());
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
+        propsFile, parquetSourceRoot, false, "partition_path", "", extraProps, 
false, false);
+
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT,
+        ParquetDFSSource.class.getName(), Collections.emptyList(), propsFile, 
false, false,
+        100_000, false, null, HoodieTableType.COPY_ON_WRITE.name(), 
"timestamp", null);
+    cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=6");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+
+    // Confirm the on-disk table version stayed at 6 so we are testing the 
v6-write path, not an
+    // accidental v6 -> v9 auto-upgrade (which would make V2 the correct 
result).
+    HoodieTableMetaClient resumedMetaClient = 
HoodieTestUtils.createMetaClient(storage, tableBasePath);
+    assertEquals(6, 
resumedMetaClient.getTableConfig().getTableVersion().versionCode(),
+        "Table version on disk should still be 6 after resume");
+
+    HoodieCommitMetadata resumedCommit = 
readLatestCommitMetadata(tableBasePath);
+    // Under hoodie.write.table.version=6, ParquetDFSSource must keep emitting 
V1 keys; in the
+    // always-V1 design, this holds across every write table version for 
non-incremental sources.
+    
assertNotNull(resumedCommit.getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1),
+        "Resumed v6 commit must persist V1 checkpoint key. extraMetadata="
+            + resumedCommit.getExtraMetadata());
+    
assertNull(resumedCommit.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2),
+        "Resumed v6 commit must NOT persist a V2 checkpoint key. 
extraMetadata="
+            + resumedCommit.getExtraMetadata());
+  }
+
+  private static HoodieCommitMetadata readLatestCommitMetadata(String 
tableBasePath) throws IOException {
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storage, tableBasePath);
+    HoodieInstant lastInstant = metaClient.getActiveTimeline()
+        .getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .orElseThrow(() -> new IllegalStateException("No completed commit 
found in " + tableBasePath));
+    return metaClient.getActiveTimeline().readCommitMetadata(lastInstant);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 26ba7599b451..4a851e7c0a06 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
@@ -103,10 +104,10 @@ public class TestStreamSync extends 
SparkClientFunctionalTestHarness {
     SourceFormatAdapter sourceFormatAdapter = mock(SourceFormatAdapter.class);
     SchemaProvider inputBatchSchemaProvider = getSchemaProvider("InputBatch", 
false);
     Option<Dataset<Row>> fakeDataFrame = Option.of(mock(Dataset.class));
-    InputBatch<Dataset<Row>> fakeRowInputBatch = new 
InputBatch<>(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
+    InputBatch<Dataset<Row>> fakeRowInputBatch = new 
InputBatch<>(fakeDataFrame, new StreamerCheckpointV2("chkpt"), 
inputBatchSchemaProvider);
     when(sourceFormatAdapter.fetchNewDataInRowFormat(any(), 
anyLong())).thenReturn(fakeRowInputBatch);
     //batch is empty because we don't want getBatch().map() to do anything 
because it calls static method we can't mock
-    InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new 
InputBatch<>(Option.empty(), "chkpt", inputBatchSchemaProvider);
+    InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new 
InputBatch<>(Option.empty(), new StreamerCheckpointV2("chkpt"), 
inputBatchSchemaProvider);
     
when(sourceFormatAdapter.fetchNewDataInAvroFormat(any(),anyLong())).thenReturn(fakeAvroInputBatch);
 
     //transformer
@@ -356,11 +357,33 @@ public class TestStreamSync extends 
SparkClientFunctionalTestHarness {
   }
 
   @Test
-  public void testExtractCheckpointMetadata_WhenCheckpointIsNullV2() {
+  void 
testExtractCheckpointMetadata_WhenCheckpointIsNullNonIncrementalSourceOnV8UsesV1()
 {
     StreamSync streamSync = setupStreamSync();
     HoodieStreamer.Config cfg = new HoodieStreamer.Config();
     cfg.checkpoint = "test-checkpoint";
     cfg.ignoreCheckpoint = "test-ignore";
+    cfg.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource";
+    TypedProperties props = new TypedProperties();
+
+    InputBatch inputBatch = mock(InputBatch.class);
+    when(inputBatch.getCheckpointForNextBatch()).thenReturn(null);
+
+    Map<String, String> result = streamSync.extractCheckpointMetadata(
+        inputBatch, props, HoodieTableVersion.EIGHT.versionCode(), cfg);
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
+    expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint");
+    assertEquals(expected, result, "Should fall back to V1 keys for 
non-incremental sources on v8");
+  }
+
+  @Test
+  void 
testExtractCheckpointMetadata_WhenCheckpointIsNullIncrementalSourceOnV8UsesV2() 
{
+    StreamSync streamSync = setupStreamSync();
+    HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+    cfg.checkpoint = "test-checkpoint";
+    cfg.ignoreCheckpoint = "test-ignore";
+    cfg.sourceClassName = "org.apache.hudi.utilities.sources.HoodieIncrSource";
     TypedProperties props = new TypedProperties();
 
     InputBatch inputBatch = mock(InputBatch.class);
@@ -372,7 +395,7 @@ public class TestStreamSync extends 
SparkClientFunctionalTestHarness {
     Map<String, String> expected = new HashMap<>();
     expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
     expected.put(STREAMER_CHECKPOINT_RESET_KEY_V2, "test-checkpoint");
-    assertEquals(expected, result, "Should return default metadata when 
checkpoint is null");
+    assertEquals(expected, result, "Should fall back to V2 keys for 
incremental sources on v8");
   }
 
   @Test
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
index f4e30667b295..0d9c3f7d15f4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
 import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import 
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -41,6 +42,8 @@ import 
org.apache.hudi.utilities.exception.HoodieStreamerException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.io.IOException;
@@ -51,11 +54,13 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorage
 import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
 import static 
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(MockitoExtension.class)
 public class TestStreamerCheckpointUtils extends 
SparkClientFunctionalTestHarness {
+  private static final String CHECKPOINT_TO_RESUME = "20240101000000";
   private TypedProperties props;
   private HoodieStreamer.Config streamerConfig;
   protected HoodieTableMetaClient metaClient;
@@ -68,6 +73,53 @@ public class TestStreamerCheckpointUtils extends 
SparkClientFunctionalTestHarnes
     streamerConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
   }
 
+  @ParameterizedTest
+  @CsvSource({
+      // version, sourceClassName, expectedV2
+      // Only HoodieIncrSource family on v8+ targets V2.
+      "8, org.apache.hudi.utilities.sources.HoodieIncrSource, true",
+      "9, org.apache.hudi.utilities.sources.HoodieIncrSource, true",
+      "8, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource, true",
+      // v6/v7 always V1.
+      "7, org.apache.hudi.utilities.sources.HoodieIncrSource, false",
+      "6, org.apache.hudi.utilities.sources.HoodieIncrSource, false",
+      // Non-incremental sources always V1 regardless of version.
+      "8, org.apache.hudi.utilities.sources.KafkaSource, false",
+      "9, org.apache.hudi.utilities.sources.JdbcSource, false",
+      // V2-not-supported allowlist always V1.
+      "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
+      "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false",
+      "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource, 
false",
+      "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource, 
false"
+  })
+  void testShouldTargetCheckpointV2(int version, String sourceClassName, 
boolean expectedV2) {
+    assertEquals(expectedV2, 
StreamerCheckpointUtils.shouldTargetCheckpointV2(version, sourceClassName));
+  }
+
+  @Test
+  void testBuildCheckpointFromConfigOverride() {
+    Checkpoint hoodieIncrV8 = 
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+        "org.apache.hudi.utilities.sources.HoodieIncrSource",
+        HoodieTableVersion.EIGHT.versionCode(),
+        CHECKPOINT_TO_RESUME);
+    assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class, 
hoodieIncrV8);
+    assertEquals(CHECKPOINT_TO_RESUME, hoodieIncrV8.getCheckpointKey());
+
+    Checkpoint nonIncrV8 = 
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+        "org.apache.hudi.utilities.sources.KafkaSource",
+        HoodieTableVersion.EIGHT.versionCode(),
+        CHECKPOINT_TO_RESUME);
+    assertInstanceOf(StreamerCheckpointV1.class, nonIncrV8);
+    assertEquals(CHECKPOINT_TO_RESUME, nonIncrV8.getCheckpointKey());
+
+    Checkpoint hoodieIncrV6 = 
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+        "org.apache.hudi.utilities.sources.HoodieIncrSource",
+        HoodieTableVersion.SIX.versionCode(),
+        CHECKPOINT_TO_RESUME);
+    assertInstanceOf(StreamerCheckpointV1.class, hoodieIncrV6);
+    assertEquals(CHECKPOINT_TO_RESUME, hoodieIncrV6.getCheckpointKey());
+  }
+
   @Test
   public void testEmptyTimelineCase() throws IOException {
     Option<Checkpoint> checkpoint = 
StreamerCheckpointUtils.resolveCheckpointBetweenConfigAndPrevCommit(
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
index 017507f2f9ec..7b7c06d622d4 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
@@ -35,6 +35,8 @@ import org.apache.spark.sql.SparkSession;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
 /**
  * A Test DataSource which scales test-data generation by using spark 
parallelism.
  */
@@ -57,7 +59,7 @@ public class DistributedTestDataSource extends 
AbstractBaseTestSource {
 
     // No new data.
     if (sourceLimit <= 0) {
-      return new InputBatch<>(Option.empty(), instantTime);
+      return new InputBatch<>(Option.empty(), createCheckpoint(instantTime));
     }
 
     TypedProperties newProps = new TypedProperties();
@@ -77,6 +79,6 @@ public class DistributedTestDataSource extends 
AbstractBaseTestSource {
               }
               return fetchNextBatch(newProps, perPartitionSourceLimit, 
instantTime, p).iterator();
             }, true);
-    return new InputBatch<>(Option.of(avroRDD), instantTime);
+    return new InputBatch<>(Option.of(avroRDD), createCheckpoint(instantTime));
   }
 }
diff --git 
a/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip 
b/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip
new file mode 100644
index 000000000000..29c1b9804435
Binary files /dev/null and 
b/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip 
differ

Reply via email to