This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9cddb237be14 fix(streamer): Use checkpoint V1 for non-incremental
streamer sources (#18896)
9cddb237be14 is described below
commit 9cddb237be14242be4ba096413ab057942af79aa
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jun 4 14:42:57 2026 -0700
fix(streamer): Use checkpoint V1 for non-incremental streamer sources
(#18896)
---
.../common/table/checkpoint/CheckpointUtils.java | 32 +-
.../table/checkpoint/TestCheckpointUtils.java | 66 +---
.../hudi/examples/common/RandomJsonSource.java | 4 +-
.../hudi/examples/common/TestRandomJsonSource.java | 76 +++++
.../helpers/DFSTestSuitePathSelector.java | 7 +-
.../sql/hudi/streaming/HoodieStreamSourceV2.scala | 2 +-
.../hudi/utilities/sources/GcsEventsSource.java | 12 +-
.../hudi/utilities/sources/HiveIncrPullSource.java | 11 +-
.../hudi/utilities/sources/HoodieIncrSource.java | 4 +-
.../apache/hudi/utilities/sources/InputBatch.java | 9 -
.../apache/hudi/utilities/sources/JdbcSource.java | 8 +-
.../apache/hudi/utilities/sources/KafkaSource.java | 7 +-
.../hudi/utilities/sources/KinesisSource.java | 5 +-
.../hudi/utilities/sources/PulsarSource.java | 4 +-
.../org/apache/hudi/utilities/sources/Source.java | 53 +---
.../hudi/utilities/sources/SqlFileBasedSource.java | 4 +-
.../utilities/sources/debezium/DebeziumSource.java | 5 +-
.../utilities/sources/helpers/DFSPathSelector.java | 6 +-
.../sources/helpers/DatePartitionPathSelector.java | 6 +-
.../sources/helpers/S3EventsMetaSelector.java | 9 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 7 +-
.../streamer/StreamerCheckpointUtils.java | 26 +-
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 19 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 36 +--
.../deltastreamer/TestSourceFormatAdapter.java | 6 +-
.../hudi/utilities/sources/TestDataSource.java | 4 +-
.../utilities/sources/TestGcsEventsSource.java | 3 +-
.../utilities/sources/TestHiveIncrPullSource.java | 99 ++++++
.../hudi/utilities/sources/TestInputBatch.java | 6 +-
.../hudi/utilities/sources/TestJdbcSource.java | 5 +-
.../TestStreamerSourceCheckpointVersion.java | 351 +++++++++++++++++++++
.../helpers/TestDFSPathSelectorCommonMethods.java | 15 +
.../streamer/TestHoodieIncrSourceE2E.java | 9 +-
.../TestParquetDfsCheckpointFormatOnV6.java | 128 ++++++++
.../hudi/utilities/streamer/TestStreamSync.java | 31 +-
.../streamer/TestStreamerCheckpointUtils.java | 52 +++
.../sources/DistributedTestDataSource.java | 6 +-
.../checkpoint-v6/parquet-dfs-v1-fixture.zip | Bin 0 -> 103450 bytes
38 files changed, 917 insertions(+), 216 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 15084a74ae46..443383c77297 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.checkpoint;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -71,23 +70,24 @@ public class CheckpointUtils {
throw new HoodieException("Checkpoint is not found in the commit metadata:
" + commitMetadata.getExtraMetadata());
}
- public static Checkpoint buildCheckpointFromGeneralSource(
- String sourceClassName, int writeTableVersion, String
checkpointToResume) {
- return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
sourceClassName)
- ? new StreamerCheckpointV2(checkpointToResume) : new
StreamerCheckpointV1(checkpointToResume);
+ /**
+ * For sources that do not have a semantic change in the checkpoint, always
use checkpoint V1.
+ *
+ * @param checkpointToResume value of the checkpoint to resume
+ * @return {@link Checkpoint} instance
+ */
+ public static Checkpoint createCheckpoint(String checkpointToResume) {
+ return new StreamerCheckpointV1(checkpointToResume);
}
- // Whenever we create checkpoint from streamer config checkpoint override,
we should use this function
- // to build checkpoints.
- public static Checkpoint buildCheckpointFromConfigOverride(
- String sourceClassName, int writeTableVersion, String
checkpointToResume) {
- return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
sourceClassName)
- ? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume) : new
StreamerCheckpointV1(checkpointToResume);
- }
-
- public static boolean shouldTargetCheckpointV2(int writeTableVersion, String
sourceClassName) {
- return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
- && !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
+ /**
+ * For sources that do not have a semantic change in the checkpoint, always
use checkpoint V1.
+ *
+ * @param checkpointToResume the checkpoint to resume
+ * @return {@link Checkpoint} instance
+ */
+ public static Checkpoint createCheckpoint(Checkpoint checkpointToResume) {
+ return new StreamerCheckpointV1(checkpointToResume);
}
// TODO(yihua): for checkpoint translation, handle cases where the
checkpoint is not exactly the
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index a4b1be058799..876c8a53162a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.checkpoint;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -31,8 +30,6 @@ import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
import java.util.stream.Stream;
@@ -56,7 +53,6 @@ public class TestCheckpointUtils {
private HoodieActiveTimeline activeTimeline;
private static final String CHECKPOINT_TO_RESUME = "20240101000000";
- private static final String GENERAL_SOURCE =
"org.apache.hudi.utilities.sources.GeneralSource";
@BeforeEach
public void setUp() {
@@ -197,64 +193,10 @@ public class TestCheckpointUtils {
assertEquals(completionTime, translatedCheckpoint.getCheckpointKey());
}
- @ParameterizedTest
- @CsvSource({
- // version, sourceClassName, expectedResult
- // Version >= 8 with allowed sources should return true
- "8, org.apache.hudi.utilities.sources.TestSource, true",
- "9, org.apache.hudi.utilities.sources.AnotherSource, true",
- // Version < 8 should return false regardless of source
- "7, org.apache.hudi.utilities.sources.TestSource, false",
- "6, org.apache.hudi.utilities.sources.AnotherSource, false",
- // Disallowed sources should return false even with version >= 8
- "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
- "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false",
- "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource,
false",
- "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource,
false"
- })
- public void testTargetCheckpointV2(int version, String sourceClassName,
boolean isV2Checkpoint) {
- assertEquals(isV2Checkpoint,
CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version,
"ignored") instanceof StreamerCheckpointV2);
- }
-
- @Test
- public void testBuildCheckpointFromGeneralSource() {
- // Test V2 checkpoint creation (newer table version + general source)
- Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
- GENERAL_SOURCE,
- HoodieTableVersion.EIGHT.versionCode(),
- CHECKPOINT_TO_RESUME
- );
- assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
- assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
-
- // Test V1 checkpoint creation (older table version)
- Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
- GENERAL_SOURCE,
- HoodieTableVersion.SEVEN.versionCode(),
- CHECKPOINT_TO_RESUME
- );
- assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
- assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
- }
-
@Test
- public void testBuildCheckpointFromConfigOverride() {
- // Test checkpoint from config creation (newer table version + general
source)
- Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(
- GENERAL_SOURCE,
- HoodieTableVersion.EIGHT.versionCode(),
- CHECKPOINT_TO_RESUME
- );
- assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class,
checkpoint1);
- assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
-
- // Test V1 checkpoint creation (older table version)
- Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromConfigOverride(
- GENERAL_SOURCE,
- HoodieTableVersion.SEVEN.versionCode(),
- CHECKPOINT_TO_RESUME
- );
- assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
- assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
+ void testCreateCheckpoint() {
+ Checkpoint checkpoint =
CheckpointUtils.createCheckpoint(CHECKPOINT_TO_RESUME);
+ assertInstanceOf(StreamerCheckpointV1.class, checkpoint);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint.getCheckpointKey());
}
}
diff --git
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
index c8fcd6b04e2a..97c0a5ec4fa7 100644
---
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
+++
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java
@@ -37,6 +37,8 @@ import org.apache.spark.sql.SparkSession;
import java.util.List;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
public class RandomJsonSource extends JsonSource {
private final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen;
private final TimeGenerator timeGenerator;
@@ -54,6 +56,6 @@ public class RandomJsonSource extends JsonSource {
String commitTime = TimelineUtils.generateInstantTime(true, timeGenerator);
List<String> inserts =
dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
- return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)),
commitTime);
+ return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)),
createCheckpoint(commitTime));
}
}
diff --git
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
new file mode 100644
index 000000000000..0691834abcd3
--- /dev/null
+++
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/common/TestRandomJsonSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.common;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.sources.InputBatch;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Tests for RandomJsonSource.
+ */
+class TestRandomJsonSource {
+
+ private static JavaSparkContext jsc;
+ private static SparkSession spark;
+
+ @BeforeAll
+ static void initSpark() {
+ spark =
SparkSession.builder().master("local[1]").appName("TestRandomJsonSource").getOrCreate();
+ jsc = new JavaSparkContext(spark.sparkContext());
+ }
+
+ @AfterAll
+ static void stopSpark() {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ if (spark != null) {
+ spark.stop();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {6, 8})
+ void testFetchNextReturnsTwentyRecordsWithCorrectCheckpointVersion(int
writeTableVersion) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(WRITE_TABLE_VERSION.key(),
String.valueOf(writeTableVersion));
+
+ RandomJsonSource source = new RandomJsonSource(props, jsc, spark, null);
+ InputBatch<JavaRDD<String>> batch = source.fetchNext(Option.empty(),
Long.MAX_VALUE);
+
+ assertNotNull(batch.getBatch());
+ assertEquals(20, batch.getBatch().get().count());
+ assertNotNull(batch.getCheckpointForNextBatch());
+ assertEquals(StreamerCheckpointV1.class,
batch.getCheckpointForNextBatch().getClass());
+ }
+}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index 2b8d95cc7df3..c4f85c6d98dc 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -20,7 +20,6 @@ package org.apache.hudi.integ.testsuite.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
@@ -41,6 +40,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
/**
@@ -99,14 +99,13 @@ public class DFSTestSuitePathSelector extends
DFSPathSelector {
// no data to readAvro
if (eligibleFiles.size() == 0) {
return new ImmutablePair<>(Option.empty(),
- lastCheckpoint.orElseGet(() -> new
StreamerCheckpointV2(String.valueOf(Long.MIN_VALUE))));
+ lastCheckpoint.orElseGet(() ->
createCheckpoint(String.valueOf(Long.MIN_VALUE))));
}
// readAvro the files out.
String pathStr = eligibleFiles.stream().map(f -> f.getPath().toString())
.collect(Collectors.joining(","));
- return new ImmutablePair<>(Option.ofNullable(pathStr),
- new StreamerCheckpointV2(String.valueOf(nextBatchId)));
+ return new ImmutablePair<>(Option.ofNullable(pathStr),
createCheckpoint(String.valueOf(nextBatchId)));
} catch (IOException ioe) {
throw new HoodieIOException(
"Unable to readAvro from source from checkpoint: " + lastCheckpoint,
ioe);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index d104108f6c0c..11a44f58a0b6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -184,7 +184,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
}
private def translateCheckpoint(commitTime: String): String = {
- if
(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion.versionCode(),
getClass.getName)) {
+ if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
commitTime
} else {
CheckpointUtils.convertToCheckpointV1ForCommitTime(
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index 2625c7d1da49..a7e699db4e90 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -45,6 +44,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -112,7 +112,7 @@ public class GcsEventsSource extends RowSource {
private final List<String> messagesToAck = new ArrayList<>();
- private static final Checkpoint CHECKPOINT_VALUE_ZERO = new
StreamerCheckpointV2("0");
+ private static final String CHECKPOINT_VALUE_ZERO = "0";
public GcsEventsSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
SchemaProvider schemaProvider) {
@@ -153,7 +153,7 @@ public class GcsEventsSource extends RowSource {
if (messageBatch.isEmpty()) {
log.info("No new data. Returning empty batch with checkpoint value: {}",
CHECKPOINT_VALUE_ZERO);
- return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO);
+ return Pair.of(Option.empty(), createCheckpoint(CHECKPOINT_VALUE_ZERO));
}
int numPartitions = (int) Math.ceil(
@@ -164,9 +164,11 @@ public class GcsEventsSource extends RowSource {
StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
if (sourceSchema != null) {
- return
Pair.of(Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)),
CHECKPOINT_VALUE_ZERO);
+ return Pair.of(
+
Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)),
createCheckpoint(CHECKPOINT_VALUE_ZERO));
} else {
- return Pair.of(Option.of(sparkSession.read().json(eventRecords)),
CHECKPOINT_VALUE_ZERO);
+ return Pair.of(
+ Option.of(sparkSession.read().json(eventRecords)),
createCheckpoint(CHECKPOINT_VALUE_ZERO));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index b5309debe41e..333ed7e33292 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.HiveIncrementalPuller;
@@ -48,6 +47,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -104,13 +104,13 @@ public class HiveIncrPullSource extends AvroSource {
if (!latestTargetCommit.isPresent()) {
// start from the beginning
- return Option.of(new StreamerCheckpointV2(commitTimes.get(0)));
+ return Option.of(createCheckpoint(commitTimes.get(0)));
}
for (String instantTime : commitTimes) {
// TODO(vc): Add an option to delete consumed commits
if (instantTime.compareTo(latestTargetCommit.get().getCheckpointKey()) >
0) {
- return Option.of(new StreamerCheckpointV2(instantTime));
+ return Option.of(createCheckpoint(instantTime));
}
}
return Option.empty();
@@ -123,7 +123,8 @@ public class HiveIncrPullSource extends AvroSource {
Option<Checkpoint> commitToPull = findCommitToPull(lastCheckpoint);
if (!commitToPull.isPresent()) {
- return new InputBatch<>(Option.empty(), lastCheckpoint.isPresent() ?
lastCheckpoint.get() : new StreamerCheckpointV2(""));
+ return new InputBatch<>(Option.empty(),
+ lastCheckpoint.isPresent() ?
createCheckpoint(lastCheckpoint.get()) : createCheckpoint(""));
}
// read the files out.
@@ -133,7 +134,7 @@ public class HiveIncrPullSource extends AvroSource {
AvroKey.class, NullWritable.class,
sparkContext.hadoopConfiguration());
sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new
data");
return new InputBatch<>(Option.of(avroRDD.keys().map(r ->
((GenericRecord) r.datum()))),
- String.valueOf(commitToPull.get()));
+ createCheckpoint(String.valueOf(commitToPull.get())));
} catch (Exception e) {
throw new HoodieReadFromSourceException("Unable to read from source from
checkpoint: " + lastCheckpoint, e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index c09196d37c48..937219d1721c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
@@ -49,6 +48,7 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
import org.apache.hudi.utilities.streamer.StreamContext;
+import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
@@ -203,7 +203,7 @@ public class HoodieIncrSource extends RowSource {
String srcPath = getStringWithAltKeys(props,
HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH);
HoodieTableVersion sourceTableVersion =
HoodieTableConfig.loadFromHoodieProps(
HoodieStorageUtils.getStorage(srcPath,
HadoopFSUtils.getStorageConf(sparkContext.hadoopConfiguration())),
srcPath).getTableVersion();
- if (sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) &&
CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
getClass().getName())) {
+ if (sourceTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) &&
StreamerCheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
getClass().getName())) {
return fetchNextBatchBasedOnCompletionTime(lastCheckpoint, sourceLimit);
} else {
return fetchNextBatchBasedOnRequestedTime(lastCheckpoint, sourceLimit);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index 9717ba726866..94b5c22afe7e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -40,14 +39,6 @@ public class InputBatch<T> {
@Getter(AccessLevel.NONE)
private final SchemaProvider schemaProvider;
- public InputBatch(Option<T> batch, String checkpointForNextBatch,
SchemaProvider schemaProvider) {
- this(batch, new StreamerCheckpointV2(checkpointForNextBatch),
schemaProvider);
- }
-
- public InputBatch(Option<T> batch, String checkpointForNextBatch) {
- this(batch, checkpointForNextBatch, null);
- }
-
public InputBatch(Option<T> batch, Checkpoint checkpointForNextBatch) {
this(batch, checkpointForNextBatch, null);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
index b236b9587a4b..0f5cd6fa68e9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JdbcSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -52,6 +51,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@@ -263,12 +263,12 @@ public class JdbcSource extends RowSource {
final String max =
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first().getString(0);
log.info("Checkpointing column {} with value: {}", incrementalColumn,
max);
if (max != null) {
- return new StreamerCheckpointV2(max);
+ return createCheckpoint(max);
}
return lastCheckpoint.isPresent() &&
!StringUtils.isNullOrEmpty(lastCheckpoint.get().getCheckpointKey())
- ? lastCheckpoint.get() : new
StreamerCheckpointV2(StringUtils.EMPTY_STRING);
+ ? createCheckpoint(lastCheckpoint.get()) :
createCheckpoint(StringUtils.EMPTY_STRING);
} else {
- return new StreamerCheckpointV2(StringUtils.EMPTY_STRING);
+ return createCheckpoint(StringUtils.EMPTY_STRING);
}
} catch (Exception e) {
log.error("Failed to checkpoint");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 943be5f30a9d..215a9c5b8833 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -49,6 +49,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
@@ -130,11 +131,13 @@ public abstract class KafkaSource<T> extends Source<T> {
totalNewMsgs, offsetGen.getTopicName(), Arrays.toString(offsetRanges));
if (totalNewMsgs <= 0) {
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
0);
- return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ return new InputBatch<>(
+ Option.empty(),
createCheckpoint(KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)));
}
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KAFKA_MESSAGE_IN_COUNT,
totalNewMsgs);
T newBatch = toBatch(offsetRanges);
- return new InputBatch<>(Option.of(newBatch),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
+ return new InputBatch<>(
+ Option.of(newBatch),
createCheckpoint(KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)));
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
index a5ec35994d8e..91b52567fb07 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java
@@ -51,6 +51,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
@Slf4j
@@ -97,7 +98,7 @@ public abstract class KinesisSource<T> extends Source<T> {
if (shardRangesWithUnreadRecords.length == 0) {
metrics.updateStreamerSourceNewMessageCount(METRIC_NAME_KINESIS_MESSAGE_IN_COUNT,
0);
String checkpointStr = lastCheckpoint.isPresent() ?
lastCheckpoint.get().getCheckpointKey() : "";
- return new InputBatch<>(Option.empty(), checkpointStr);
+ return new InputBatch<>(Option.empty(), createCheckpoint(checkpointStr));
}
// STEP 3: Otherwise, do the read.
T batch = toBatch(shardRangesWithUnreadRecords, sourceLimit);
@@ -111,7 +112,7 @@ public abstract class KinesisSource<T> extends Source<T> {
log.info("Read {} records from Kinesis stream {} with {} shards,
checkpoint: {}",
totalMsgs, offsetGen.getStreamName(),
shardRangesWithUnreadRecords.length, checkpointStr);
- return new InputBatch<>(Option.of(batch), checkpointStr);
+ return new InputBatch<>(Option.of(batch), createCheckpoint(checkpointStr));
}
/** Upper bound on consecutive empty GetRecords responses before giving up
on a shard. */
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
index d7696efdbf6c..cf6cd4093438 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -54,6 +53,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getLongWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -130,7 +130,7 @@ public class PulsarSource extends RowSource implements
Closeable {
.option("endingOffsets", endingOffsetStr)
.load();
- return Pair.of(Option.of(transform(sourceRows)), new
StreamerCheckpointV2(endingOffsetStr));
+ return Pair.of(Option.of(transform(sourceRows)),
createCheckpoint(endingOffsetStr));
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 9313fe508b5e..75cef40c1ee5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -23,7 +23,6 @@ import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.ConfigUtils;
@@ -47,7 +46,6 @@ import org.apache.spark.storage.StorageLevel;
import java.io.Serializable;
-import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.shouldTargetCheckpointV2;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD;
import static
org.apache.hudi.config.HoodieWriteConfig.TAGGED_RECORD_STORAGE_LEVEL_VALUE;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -121,57 +119,24 @@ public abstract class Source<T> implements
SourceCommitCallback, Serializable {
* After the checkpoint value is decided based on the existing
configurations at
*
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils#resolveWhatCheckpointToResume,
*
- * For most of the data sources the there is no difference between
checkpoint V1 and V2, it's
- * merely changing the wrapper class.
+ * Non-incremental sources always operate on V1 checkpoints regardless of
the write table version,
+ * so any V2 input read from older commit metadata is normalized to V1 here.
*
- * Check child class method overrides to see special case handling.
+ * Hudi incremental sources have their own V1/V2 semantics (requested time
vs completion time)
+ * and override this method.
* */
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint>
lastCheckpoint) {
if (lastCheckpoint.isEmpty()) {
return Option.empty();
}
- if (CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion,
getClass().getName())) {
- // V2 -> V2
- if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
- return lastCheckpoint;
- }
- // V1 -> V2
- if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
- StreamerCheckpointV2 newCheckpoint = new
StreamerCheckpointV2(lastCheckpoint.get());
- newCheckpoint.addV1Props();
- return Option.of(newCheckpoint);
- }
- } else {
- // V2 -> V1
- if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
- return Option.of(new StreamerCheckpointV1(lastCheckpoint.get()));
- }
- // V1 -> V1
- if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
- return lastCheckpoint;
- }
+ if (lastCheckpoint.get() instanceof StreamerCheckpointV1) {
+ return lastCheckpoint;
}
- throw new UnsupportedOperationException("Unsupported checkpoint type: " +
lastCheckpoint.get());
- }
-
- public void assertCheckpointVersion(Option<Checkpoint> lastCheckpoint,
Option<Checkpoint> lastCheckpointTranslated, Checkpoint checkpoint) {
- if (checkpoint != null) {
- boolean shouldBeV2Checkpoint =
shouldTargetCheckpointV2(writeTableVersion, getClass().getName());
- String errorMessage = String.format(
- "Data source should return checkpoint version V%s. The checkpoint
resumed in the iteration is %s, whose translated version is %s. "
- + "The checkpoint returned after the iteration %s.",
- shouldBeV2Checkpoint ? "2" : "1",
- lastCheckpoint.isEmpty() ? "null" : lastCheckpointTranslated.get(),
- lastCheckpointTranslated.isEmpty() ? "null" :
lastCheckpointTranslated.get(),
- checkpoint);
- if (shouldBeV2Checkpoint && !(checkpoint instanceof
StreamerCheckpointV2)) {
- throw new IllegalStateException(errorMessage);
- }
- if (!shouldBeV2Checkpoint && !(checkpoint instanceof
StreamerCheckpointV1)) {
- throw new IllegalStateException(errorMessage);
- }
+ if (lastCheckpoint.get() instanceof StreamerCheckpointV2) {
+ return Option.of(new StreamerCheckpointV1(lastCheckpoint.get()));
}
+ throw new UnsupportedOperationException("Unsupported checkpoint type: " +
lastCheckpoint.get());
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
index 60cac125a029..913806420323 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlFileBasedSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -39,6 +38,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Scanner;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -92,7 +92,7 @@ public class SqlFileBasedSource extends RowSource {
rows = sparkSession.sql(sqlStr);
}
}
- return Pair.of(Option.of(rows), shouldEmitCheckPoint ? new
StreamerCheckpointV2(String.valueOf(System.currentTimeMillis())) : null);
+ return Pair.of(Option.of(rows), shouldEmitCheckPoint ?
createCheckpoint(String.valueOf(System.currentTimeMillis())) : null);
} catch (IOException ioe) {
throw new HoodieIOException("Error reading source SQL file.", ioe);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 030f5e0e671b..d45cac3922ff 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources.debezium;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
@@ -59,6 +58,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS;
@@ -128,7 +128,8 @@ public abstract class DebeziumSource extends RowSource {
log.info("Spark schema of Kafka Payload for topic {}:\n{}",
offsetGen.getTopicName(), dataset.schema().treeString());
log.info("New checkpoint string: {}",
CheckpointUtils.offsetsToStr(offsetRanges));
return Pair.of(Option.of(dataset),
- new StreamerCheckpointV2(overrideCheckpointStr.isEmpty() ?
CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr));
+ createCheckpoint(overrideCheckpointStr.isEmpty()
+ ? CheckpointUtils.offsetsToStr(offsetRanges) :
overrideCheckpointStr));
} catch (Exception e) {
log.error("Fatal error reading and parsing incoming debezium event", e);
throw new HoodieReadFromSourceException("Fatal error reading and parsing
incoming debezium event", e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 594034308ee8..071499d13a9b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -20,7 +20,6 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -47,6 +46,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static
org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -151,13 +151,13 @@ public class DFSPathSelector implements Serializable {
// no data to read
if (filteredFiles.isEmpty()) {
- return new ImmutablePair<>(Option.empty(), new
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+ return new ImmutablePair<>(Option.empty(),
createCheckpoint(String.valueOf(newCheckpointTime)));
}
// read the files out.
String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
- return new ImmutablePair<>(Option.ofNullable(pathStr), new
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+ return new ImmutablePair<>(Option.ofNullable(pathStr),
createCheckpoint(String.valueOf(newCheckpointTime)));
} catch (IOException ioe) {
throw new HoodieIOException("Unable to read from source from checkpoint:
" + lastCheckpointStr, ioe);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 989be9163da1..0892ca4d3564 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -44,6 +43,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.DFSPathSelectorConfig.ROOT_INPUT_PATH;
@@ -159,13 +159,13 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
// no data to read
if (filteredFiles.isEmpty()) {
- return new ImmutablePair<>(Option.empty(), new
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+ return new ImmutablePair<>(Option.empty(),
createCheckpoint(String.valueOf(newCheckpointTime)));
}
// read the files out.
String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
- return new ImmutablePair<>(Option.ofNullable(pathStr), new
StreamerCheckpointV2(String.valueOf(newCheckpointTime)));
+ return new ImmutablePair<>(Option.ofNullable(pathStr),
createCheckpoint(String.valueOf(newCheckpointTime)));
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
index 3cd073e72109..b5ccaa58f374 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/S3EventsMetaSelector.java
@@ -20,7 +20,7 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -42,6 +42,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
/**
@@ -155,8 +156,10 @@ public class S3EventsMetaSelector extends
CloudObjectsSelector {
for (Map<String, Object> eventRecord : eventRecords) {
filteredEventRecords.add(SdkHttpUtils.urlDecode(MAPPER.writeValueAsString(eventRecord)));
}
- // Return the old checkpoint if no messages to consume from queue.
- Checkpoint newCheckpoint = newCheckpointTime == 0 ?
lastCheckpoint.orElse(null) : new
StreamerCheckpointV2(String.valueOf(newCheckpointTime));
+ // Re-wrap a prior V2 checkpoint as V1 to avoid leaking it back to
commit metadata.
+ Checkpoint newCheckpoint = newCheckpointTime == 0
+ ? lastCheckpoint.map(CheckpointUtils::createCheckpoint).orElse(null)
+ : createCheckpoint(String.valueOf(newCheckpointTime));
return new ImmutablePair<>(filteredEventRecords, newCheckpoint);
} catch (JSONException | IOException e) {
throw new HoodieException("Unable to read from SQS: ", e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 1af9c503b613..7f38d80f8308 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -153,7 +154,7 @@ import static
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordQualified
import static
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static
org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
-import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromGeneralSource;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
@@ -173,6 +174,7 @@ import static
org.apache.hudi.utilities.config.HoodieStreamerConfig.CHECKPOINT_F
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
import static
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo;
+import static
org.apache.hudi.utilities.streamer.StreamerCheckpointUtils.shouldTargetCheckpointV2;
/**
* Sync's one batch of data to hoodie table.
@@ -1031,7 +1033,8 @@ public class StreamSync implements Serializable,
Closeable {
}
// Otherwise create new checkpoint based on version
- Checkpoint checkpoint =
buildCheckpointFromGeneralSource(cfg.sourceClassName, versionCode, null);
+ Checkpoint checkpoint = shouldTargetCheckpointV2(versionCode,
cfg.sourceClassName)
+ ? new StreamerCheckpointV2((String) null) : createCheckpoint((String)
null);
return checkpoint.getCheckpointCommitMetadata(cfg.checkpoint,
cfg.ignoreCheckpoint);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index 323a2b95149d..2b28e09e3a2c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -44,8 +46,8 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2;
import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.HOODIE_INCREMENTAL_SOURCES;
-import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
@@ -56,6 +58,28 @@ import static
org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowng
@Slf4j
public class StreamerCheckpointUtils {
+ /**
+ * Wraps a user-supplied checkpoint override string. For HoodieIncrSource
family on table version
+ * 8+, returns {@link UnresolvedStreamerCheckpointBasedOnCfg} so the source
can resolve V1/V2
+ * cursor semantics from the override's prefix; for everything else returns
V1.
+ */
+ public static Checkpoint buildCheckpointFromConfigOverride(
+ String sourceClassName, int writeTableVersion, String
checkpointToResume) {
+ return shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
+ ? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume)
+ : new StreamerCheckpointV1(checkpointToResume);
+ }
+
+ /**
+ * True only for the HoodieIncrSource family on table version 8+, where V2
(completion-time)
+ * cursor semantics differ from V1 (requested-time). Every other source
operates on V1 only.
+ */
+ public static boolean shouldTargetCheckpointV2(int writeTableVersion, String
sourceClassName) {
+ return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
+ && HOODIE_INCREMENTAL_SOURCES.contains(sourceClassName)
+ && !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
+ }
+
/**
* The first phase of checkpoint resolution - read the checkpoint configs
from 2 sources and resolve
* conflicts:
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index f38f8db4f48b..8bece4c3df2c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -83,11 +82,13 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
+import static
org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
@@ -740,10 +741,24 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
HoodieInstant lastInstant = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata =
timeline.readCommitMetadata(lastInstant);
assertEquals(totalCommits, timeline.countInstants());
+ assertEquals(expected,
commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
+ assertNull(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
+ return lastInstant;
+ }
+
+ static HoodieInstant assertCommitMetadataForIncrSource(String expected,
String tablePath, int totalCommits)
+ throws IOException {
+ HoodieTableMetaClient meta = createMetaClient(storage.getConf(),
tablePath);
+ HoodieTimeline timeline =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ HoodieInstant lastInstant = timeline.lastInstant().get();
+ HoodieCommitMetadata commitMetadata =
timeline.readCommitMetadata(lastInstant);
+ assertEquals(totalCommits, timeline.countInstants());
if
(meta.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT))
{
- assertEquals(expected,
commitMetadata.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2));
+ assertEquals(expected,
commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
+ assertNull(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
} else {
assertEquals(expected,
commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY));
+ assertNull(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2));
}
return lastInstant;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index d180dc720ea5..ca418d519560 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -461,17 +461,17 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
expectedKeyGeneratorClassName,
metaClient.getTableConfig().getKeyGeneratorClassName());
Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
assertEquals(1000, res.count());
- assertUseV2Checkpoint(metaClient);
+ assertCheckpointVersion(metaClient);
}
- private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
+ private static void assertCheckpointVersion(HoodieTableMetaClient
metaClient) {
metaClient.reloadActiveTimeline();
Option<HoodieCommitMetadata> metadata =
HoodieClientTestUtils.getCommitMetadataForInstant(
metaClient, metaClient.getActiveTimeline().lastInstant().get());
assertFalse(metadata.isEmpty());
Map<String, String> extraMetadata = metadata.get().getExtraMetadata();
- assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
- assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+ assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
}
@Test
@@ -632,7 +632,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -645,7 +645,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are
deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -710,7 +710,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -734,7 +734,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
@@ -771,7 +771,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -791,7 +791,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
@@ -857,7 +857,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
tableBasePath, WriteOperationType.INSERT, hasTransformer,
orderingField, recordType, tableType);
syncOnce(cfg);
// Validate.
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500",
tableBasePath, 1);
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
@@ -873,7 +873,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
tableBasePath, WriteOperationType.UPSERT, hasTransformer,
orderingField, recordType, tableType);
syncOnce(cfg);
// Validate.
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(1500, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata(topicName + ",0:1250,1:1250",
tableBasePath, 2);
tableSchemaResolver = new TableSchemaResolver(
@@ -1450,7 +1450,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(),
MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -1479,7 +1479,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(),
MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
syncOnce(cfg);
- assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
+ assertCheckpointVersion(HoodieTestUtils.createMetaClient(storage,
tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be
shutdown");
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -2392,7 +2392,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
entry, metaClient, WriteOperationType.BULK_INSERT));
}
}
- assertUseV2Checkpoint(createMetaClient(jsc, tableBasePath));
+ assertCheckpointVersion(createMetaClient(jsc, tableBasePath));
} finally {
deltaStreamer.shutdownGracefully();
}
@@ -2512,7 +2512,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
-
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 1);
+
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 1);
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
@@ -2530,7 +2530,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(1000, downstreamTableBasePath,
sqlContext);
-
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 1);
+
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 1);
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
@@ -2554,7 +2554,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(2000, downstreamTableBasePath,
sqlContext);
HoodieInstant finalInstant =
-
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 2);
+
TestHelpers.assertCommitMetadataForIncrSource(lastInstantForUpstreamTable.getCompletionTime(),
downstreamTableBasePath, 2);
counts = countsPerCommit(downstreamTableBasePath, sqlContext);
assertEquals(2000, counts.stream().mapToLong(entry ->
entry.getLong(1)).sum());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index 59f8605663bc..b54be7cb1890 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -95,13 +95,13 @@ public class TestSourceFormatAdapter {
}
private void setupRowSource(Dataset<Row> ds, TypedProperties properties,
SchemaProvider schemaProvider) {
- InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds),
DUMMY_CHECKPOINT, schemaProvider);
+ InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), new
StreamerCheckpointV2(DUMMY_CHECKPOINT), schemaProvider);
testRowDataSource = new TestRowDataSource(properties, jsc, spark,
schemaProvider, batch);
}
private void setupJsonSource(JavaRDD<String> ds, HoodieSchema schema) {
SchemaProvider basicSchemaProvider = new BasicSchemaProvider(schema);
- InputBatch<JavaRDD<String>> batch = new InputBatch<>(Option.of(ds),
DUMMY_CHECKPOINT, basicSchemaProvider);
+ InputBatch<JavaRDD<String>> batch = new InputBatch<>(Option.of(ds), new
StreamerCheckpointV2(DUMMY_CHECKPOINT), basicSchemaProvider);
testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc,
spark, basicSchemaProvider, batch);
}
@@ -183,7 +183,7 @@ public class TestSourceFormatAdapter {
SchemaProvider schemaProvider = new
TestSchemaProviderWithTransformation(sourceSchema, targetSchema);
// Setup the row source
- InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(testDataset),
DUMMY_CHECKPOINT, schemaProvider);
+ InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(testDataset),
new StreamerCheckpointV2(DUMMY_CHECKPOINT), schemaProvider);
TestRowDataSource testSource = new TestRowDataSource(new
TypedProperties(), jsc, spark, schemaProvider, batch);
// Create SourceFormatAdapter and fetch data in Avro format
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 1ec7842fe4e1..ed993a837c99 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -33,6 +33,8 @@ import org.apache.spark.sql.SparkSession;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
/**
* An implementation of {@link Source}, that emits test upserts.
*/
@@ -69,6 +71,6 @@ public class TestDataSource extends AbstractBaseTestSource {
List<GenericRecord> records =
fetchNextBatch(props, (int) sourceLimit,
recordInstantTime.orElse(instantTime),
DEFAULT_PARTITION_NUM).collect(Collectors.toList());
JavaRDD<GenericRecord> avroRDD =
sparkContext.<GenericRecord>parallelize(records, 4);
- return new InputBatch<>(Option.of(avroRDD), instantTime);
+ return new InputBatch<>(Option.of(avroRDD), createCheckpoint(instantTime));
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index c45a9eaf7d10..e7b9b2b46c3c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -60,7 +61,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
protected FilebasedSchemaProvider schemaProvider;
private TypedProperties props;
- private static final Checkpoint CHECKPOINT_VALUE_ZERO = new
StreamerCheckpointV2("0");
+ private static final Checkpoint CHECKPOINT_VALUE_ZERO = new
StreamerCheckpointV1("0");
@BeforeAll
public static void beforeAll() throws Exception {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
new file mode 100644
index 000000000000..8fbb98e0ab6d
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHiveIncrPullSource.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+import org.apache.hudi.utilities.config.HiveIncrPullSourceConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHiveIncrPullSource extends HoodieSparkClientTestHarness {
+
+ private TypedProperties props;
+ private String incrPullRoot;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ initSparkContexts();
+ initPath();
+ initHoodieStorage();
+ incrPullRoot = basePath + "/incrPullRoot";
+ FileSystem fs = (FileSystem) storage.getFileSystem();
+ fs.mkdirs(new Path(incrPullRoot));
+ props = new TypedProperties();
+ props.setProperty(HiveIncrPullSourceConfig.ROOT_INPUT_PATH.key(),
incrPullRoot);
+ }
+
+ @AfterEach
+ void teardown() throws Exception {
+ cleanupResources();
+ }
+
+ private void createCommitDir(String commitTime) throws Exception {
+ FileSystem fs = (FileSystem) storage.getFileSystem();
+ fs.mkdirs(new Path(incrPullRoot, commitTime));
+ }
+
+ @Test
+ void
findCommitToPullReturnsV1CheckpointForFirstCommitWhenNoLatestTargetCommit()
throws Exception {
+ createCommitDir("20240101");
+ createCommitDir("20240102");
+ HiveIncrPullSource source = new HiveIncrPullSource(props, jsc,
sparkSession, null);
+ Method findCommitToPull =
HiveIncrPullSource.class.getDeclaredMethod("findCommitToPull", Option.class);
+ findCommitToPull.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Option<Checkpoint> result = (Option<Checkpoint>)
findCommitToPull.invoke(source, Option.empty());
+
+ assertTrue(result.isPresent());
+ assertInstanceOf(StreamerCheckpointV1.class, result.get());
+ assertEquals("20240101", result.get().getCheckpointKey());
+ }
+
+ @Test
+ void readFromCheckpointRewrapsV2LastCheckpointAsV1WhenNoCommitToPull()
throws Exception {
+ createCommitDir("20240101");
+ createCommitDir("20240102");
+ HiveIncrPullSource source = new HiveIncrPullSource(props, jsc,
sparkSession, null);
+
+ // lastCheckpoint past the last commit so findCommitToPull returns empty
and the early return hits.
+ InputBatch<?> batch = source.readFromCheckpoint(
+ Option.of(new StreamerCheckpointV2("20240103")), Long.MAX_VALUE);
+
+ assertFalse(batch.getBatch().isPresent());
+ assertInstanceOf(StreamerCheckpointV1.class,
batch.getCheckpointForNextBatch());
+ assertEquals("20240103",
batch.getCheckpointForNextBatch().getCheckpointKey());
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
index f7bb36076a7a..bf969ae2ced4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
@@ -34,7 +34,7 @@ public class TestInputBatch {
@Test
public void getSchemaProviderShouldThrowException() {
- final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
(String) null, null);
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
null, null);
Throwable t = assertThrows(HoodieException.class,
inputBatch::getSchemaProvider);
assertEquals("Schema provider is required for this operation and for the
source of interest. "
+ "Please set '--schemaprovider-class' in the top level HoodieStreamer
config for the source of interest. "
@@ -45,7 +45,7 @@ public class TestInputBatch {
@Test
public void getSchemaProviderShouldReturnNullSchemaProvider() {
- final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(),
(String) null, null);
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(),
null, null);
SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
}
@@ -53,7 +53,7 @@ public class TestInputBatch {
@Test
public void getSchemaProviderShouldReturnGivenSchemaProvider() {
SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
- final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
(String) null, schemaProvider);
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
null, schemaProvider);
assertSame(schemaProvider, inputBatch.getSchemaProvider());
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
index 6d00c9c91bf6..9fc76f94f513 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
@@ -273,7 +274,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 100);
Dataset<Row> rowDataset = batch.getBatch().get();
assertEquals(100, rowDataset.count());
- assertEquals(new StreamerCheckpointV2("100"),
batch.getCheckpointForNextBatch());
+ assertEquals(new StreamerCheckpointV1("100"),
batch.getCheckpointForNextBatch());
// Add 100 records with commit time "001"
insert("001", 100, connection, DATA_GENERATOR, PROPS);
@@ -361,7 +362,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
InputBatch<Dataset<Row>> batch = runSource(Option.empty(), 10);
Dataset<Row> rowDataset = batch.getBatch().get();
assertEquals(10, rowDataset.count());
- assertEquals(new StreamerCheckpointV2(""),
batch.getCheckpointForNextBatch());
+ assertEquals(new StreamerCheckpointV1(""),
batch.getCheckpointForNextBatch());
// Get max of incremental column
Column incrementalColumn = rowDataset
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
new file mode 100644
index 000000000000..32480348a565
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestStreamerSourceCheckpointVersion.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.DFSPathSelectorConfig;
+import org.apache.hudi.utilities.config.JdbcSourceConfig;
+import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
+import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
+import org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen;
+import
org.apache.hudi.utilities.sources.helpers.KinesisOffsetGen.KinesisShardRange;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.streamer.DefaultStreamContext;
+import org.apache.hudi.utilities.streamer.StreamerCheckpointUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.streaming.kafka010.OffsetRange;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Validates that every non-incremental streamer source emits a V1 checkpoint
regardless of the
+ * configured write table version or the wrapper class of the input checkpoint
(so V2 keys
+ * persisted by older releases are read on input but never written back as
V2). Hudi incremental
+ * sources own their own V1/V2 semantics and are exercised by
+ * {@link #testS3AndGcsIncrSourcesStayV1OnBothTableVersions()} plus
+ * {@code TestHoodieIncrSource} family.
+ */
+class TestStreamerSourceCheckpointVersion {
+
+ private static JavaSparkContext jsc;
+ private static SparkSession spark;
+
+ @TempDir
+ static java.nio.file.Path tempDir;
+
+ @BeforeAll
+ static void initSpark() {
+ spark =
SparkSession.builder().master("local[1]").appName("TestSourceCheckpointVersion").getOrCreate();
+ jsc = new JavaSparkContext(spark.sparkContext());
+ }
+
+ @AfterAll
+ static void stopSpark() {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ if (spark != null) {
+ spark.stop();
+ }
+ }
+
+ // Covers AvroDFSSource, JsonDFSSource, CsvDFSSource, ParquetDFSSource,
ORCDFSSource.
+ @ParameterizedTest
+ @ValueSource(ints = {6, 8})
+ void testDfsPathSelector(int writeTableVersion) throws IOException {
+ TypedProperties props = propsWith(writeTableVersion);
+ props.setProperty(DFSPathSelectorConfig.ROOT_INPUT_PATH.key(),
tempDir.toString());
+ Configuration hadoopConf = jsc.hadoopConfiguration();
+ FileSystem fs = FileSystem.get(hadoopConf);
+ fs.mkdirs(new Path(tempDir.toString()));
+ DFSPathSelector selector = new DFSPathSelector(props, hadoopConf);
+ Pair<Option<String>, Checkpoint> result =
+ selector.getNextFilePathsAndMaxModificationTime(jsc, Option.empty(),
Long.MAX_VALUE);
+ assertV1(result.getRight());
+ }
+
+ // Covers AvroKafkaSource, JsonKafkaSource, ProtoKafkaSource.
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testKafkaSource(int writeTableVersion, InputCheckpointKind inputKind) {
+ TestableKafkaSource source = new
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+ KafkaOffsetGen offsetGen = mock(KafkaOffsetGen.class);
+ when(offsetGen.getNextOffsetRanges(any(), anyLong(), any())).thenReturn(
+ new OffsetRange[] {OffsetRange.create("t", 0, 0L, 0L)});
+ when(offsetGen.getTopicName()).thenReturn("t");
+ source.offsetGen = offsetGen;
+ InputBatch<String> batch = source.fetchNext(makeInputCheckpoint(inputKind,
"k"), 1L);
+ assertV1(batch.getCheckpointForNextBatch());
+ }
+
+ // Covers JsonKinesisSource.
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testKinesisSource(int writeTableVersion, InputCheckpointKind inputKind)
{
+ TestableKinesisSource source = new
TestableKinesisSource(propsWith(writeTableVersion), jsc, spark);
+ KinesisOffsetGen offsetGen = mock(KinesisOffsetGen.class);
+ when(offsetGen.getStreamName()).thenReturn("s");
+ when(offsetGen.getNextShardRanges(any(), anyLong())).thenReturn(
+ new KinesisShardRange[0]);
+ source.setOffsetGen(offsetGen);
+ InputBatch<JavaRDD<String>> batch =
source.fetchNext(makeInputCheckpoint(inputKind, "s"), 1L);
+ assertV1(batch.getCheckpointForNextBatch());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testJdbcSource(int writeTableVersion, InputCheckpointKind inputKind)
throws Exception {
+ JdbcSource source = new JdbcSource(propsWith(writeTableVersion), jsc,
spark, null);
+ Method m = JdbcSource.class.getDeclaredMethod(
+ "checkpoint", Dataset.class, boolean.class, Option.class);
+ m.setAccessible(true);
+ Checkpoint c = (Checkpoint) m.invoke(source, null, false,
makeInputCheckpoint(inputKind, "k"));
+ assertV1(c);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testSqlFileBasedSource(int writeTableVersion, InputCheckpointKind
inputKind) throws IOException {
+ java.nio.file.Path sqlFile = tempDir.resolve("q.sql");
+ Files.write(sqlFile, "SELECT 1".getBytes());
+ TypedProperties props = propsWith(writeTableVersion);
+ props.setProperty("hoodie.streamer.source.sql.file", sqlFile.toString());
+ props.setProperty("hoodie.streamer.source.sql.checkpoint.emit", "true");
+ SqlFileBasedSource source = new SqlFileBasedSource(props, jsc, spark,
null);
+ Pair<Option<Dataset<Row>>, Checkpoint> result =
+ invokeRowSourceFetch(source, makeInputCheckpoint(inputKind, "k"));
+ assertV1(result.getRight());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testHiveIncrPullSource(int writeTableVersion, InputCheckpointKind
inputKind) throws Exception {
+ Files.createDirectories(tempDir.resolve("20200101000000"));
+ TypedProperties props = propsWith(writeTableVersion);
+ props.setProperty("hoodie.streamer.source.incrpull.root",
tempDir.toString());
+ HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, spark,
null);
+ Method m = HiveIncrPullSource.class.getDeclaredMethod("findCommitToPull",
Option.class);
+ m.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Option<Checkpoint> result = (Option<Checkpoint>) m.invoke(
+ source, makeInputCheckpoint(inputKind, "00000000000000"));
+ assertV1(result.get());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testGcsEventsSource(int writeTableVersion, InputCheckpointKind
inputKind) {
+ PubsubMessagesFetcher fetcher = mock(PubsubMessagesFetcher.class);
+ when(fetcher.fetchMessages()).thenReturn(Collections.emptyList());
+ TypedProperties props = propsWith(writeTableVersion);
+ props.setProperty("hoodie.streamer.source.gcs.project.id", "p");
+ props.setProperty("hoodie.streamer.source.gcs.subscription.id", "s");
+ GcsEventsSource source = new GcsEventsSource(props, jsc, spark, null,
fetcher);
+ Pair<Option<Dataset<Row>>, Checkpoint> result =
+ invokeRowSourceFetch(source, makeInputCheckpoint(inputKind, "k"));
+ assertV1(result.getRight());
+ }
+
+ @ParameterizedTest
+ @EnumSource(InputCheckpointKind.class)
+ void testJdbcSourceIncrementalWithMaxValueEmitsV1(InputCheckpointKind
inputKind) throws Exception {
+ TypedProperties props = propsWith(8);
+ props.setProperty(JdbcSourceConfig.INCREMENTAL_COLUMN.key(), "idx");
+ JdbcSource source = new JdbcSource(props, jsc, spark, null);
+ StructType schema = new StructType().add("idx", DataTypes.StringType,
true);
+ List<Row> rows = Arrays.asList(RowFactory.create("100"),
RowFactory.create("200"));
+ Dataset<Row> dataset = spark.createDataFrame(rows, schema);
+ Method m = JdbcSource.class.getDeclaredMethod(
+ "checkpoint", Dataset.class, boolean.class, Option.class);
+ m.setAccessible(true);
+ Checkpoint c = (Checkpoint) m.invoke(source, dataset, true,
makeInputCheckpoint(inputKind, "k"));
+ assertV1(c);
+ assertEquals("200", c.getCheckpointKey());
+ }
+
+ // Drives the isIncremental + max==null pass-through to confirm it re-wraps
a V2 input as V1.
+ @ParameterizedTest
+ @EnumSource(InputCheckpointKind.class)
+ void testJdbcSourcePassThroughEmitsV1(InputCheckpointKind inputKind) throws
Exception {
+ TypedProperties props = propsWith(8);
+ props.setProperty(JdbcSourceConfig.INCREMENTAL_COLUMN.key(), "idx");
+ JdbcSource source = new JdbcSource(props, jsc, spark, null);
+ StructType schema = new StructType().add("idx", DataTypes.StringType,
true);
+ List<Row> rows = Collections.singletonList(RowFactory.create((Object)
null));
+ Dataset<Row> nullColDataset = spark.createDataFrame(rows, schema);
+ Method m = JdbcSource.class.getDeclaredMethod(
+ "checkpoint", Dataset.class, boolean.class, Option.class);
+ m.setAccessible(true);
+ Checkpoint c = (Checkpoint) m.invoke(source, nullColDataset, true,
makeInputCheckpoint(inputKind, "k"));
+ assertV1(c);
+ }
+
+ // Input key sorts after the only commit on disk so findCommitToPull returns
empty and
+ // readFromCheckpoint enters its pass-through branch.
+ @ParameterizedTest
+ @EnumSource(InputCheckpointKind.class)
+ void testHiveIncrPullSourcePassThroughEmitsV1(InputCheckpointKind inputKind)
throws IOException {
+ Files.createDirectories(tempDir.resolve("20200101000000"));
+ TypedProperties props = propsWith(8);
+ props.setProperty("hoodie.streamer.source.incrpull.root",
tempDir.toString());
+ HiveIncrPullSource source = new HiveIncrPullSource(props, jsc, spark,
null);
+ InputBatch<?> batch = source.readFromCheckpoint(
+ makeInputCheckpoint(inputKind, "30000000000000"), 1L);
+ assertV1(batch.getCheckpointForNextBatch());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"6, V1", "6, V2", "8, V1", "8, V2"})
+ void testTranslateCheckpointNormalizesToV1(int writeTableVersion,
InputCheckpointKind inputKind) {
+ TestableKafkaSource source = new
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+ Option<Checkpoint> translated =
source.translateCheckpoint(makeInputCheckpoint(inputKind, "k"));
+ assertV1(translated.get());
+ assertEquals("k", translated.get().getCheckpointKey());
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {6, 8})
+ void testTranslateCheckpointPreservesEmpty(int writeTableVersion) {
+ TestableKafkaSource source = new
TestableKafkaSource(propsWith(writeTableVersion), jsc, spark);
+ assertTrue(source.translateCheckpoint(Option.empty()).isEmpty());
+ }
+
+ @Test
+ void testS3AndGcsIncrSourcesStayV1OnBothTableVersions() {
+ String s3 = S3EventsHoodieIncrSource.class.getName();
+ String gcs = GcsEventsHoodieIncrSource.class.getName();
+
assertTrue(CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(s3));
+
assertTrue(CheckpointUtils.DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(gcs));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, s3));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8, gcs));
+ }
+
+ private static TypedProperties propsWith(int writeTableVersion) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(WRITE_TABLE_VERSION.key(),
String.valueOf(writeTableVersion));
+ return props;
+ }
+
+ private static void assertV1(Checkpoint c) {
+ assertEquals(StreamerCheckpointV1.class, c.getClass());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Pair<Option<Dataset<Row>>, Checkpoint> invokeRowSourceFetch(
+ RowSource source, Option<Checkpoint> lastCheckpoint) {
+ try {
+ Method m = source.getClass().getDeclaredMethod("fetchNextBatch",
Option.class, long.class);
+ m.setAccessible(true);
+ return (Pair<Option<Dataset<Row>>, Checkpoint>) m.invoke(source,
lastCheckpoint, 1L);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private enum InputCheckpointKind { V1, V2 }
+
+ private static Option<Checkpoint> makeInputCheckpoint(InputCheckpointKind
kind, String key) {
+ switch (kind) {
+ case V1: return Option.of(new StreamerCheckpointV1(key));
+ case V2: return Option.of(new StreamerCheckpointV2(key));
+ default: throw new IllegalArgumentException("Unsupported kind: " + kind);
+ }
+ }
+
+ private static class TestableKafkaSource extends KafkaSource<String> {
+ TestableKafkaSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark) {
+ super(props, jsc, spark, SourceType.JSON,
mock(HoodieIngestionMetrics.class),
+ new DefaultStreamContext(null, Option.empty()));
+ }
+
+ @Override
+ protected String toBatch(OffsetRange[] offsetRanges) {
+ return "batch";
+ }
+ }
+
+ private static class TestableKinesisSource extends
KinesisSource<JavaRDD<String>> {
+ TestableKinesisSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark) {
+ super(props, jsc, spark, SourceType.JSON,
mock(HoodieIngestionMetrics.class),
+ new DefaultStreamContext((SchemaProvider) null, Option.empty()));
+ }
+
+ void setOffsetGen(KinesisOffsetGen gen) {
+ this.offsetGen = gen;
+ }
+
+ @Override
+ protected JavaRDD<String> toBatch(KinesisShardRange[] shardRanges, long
sourceLimit) {
+ return jsc.emptyRDD();
+ }
+
+ @Override
+ protected String createCheckpointFromBatch(JavaRDD<String> batch,
+ KinesisShardRange[] shardRangesWithUnreadRecords,
+ KinesisShardRange[] allOpenClosedShardRanges) {
+ return "checkpoint";
+ }
+
+ @Override
+ protected long getRecordCount(JavaRDD<String> batch) {
+ return 1L;
+ }
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
index 5b3d970c0368..30e138d5e3e9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDFSPathSelectorCommonMethods.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -44,6 +45,7 @@ import static
org.apache.hudi.common.testutils.FileCreateUtilsLegacy.createBaseF
import static
org.apache.hudi.utilities.config.DFSPathSelectorConfig.ROOT_INPUT_PATH;
import static
org.apache.hudi.utilities.config.DatePartitionPathSelectorConfig.PARTITIONS_LIST_PARALLELISM;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarness {
@@ -164,4 +166,17 @@ public class TestDFSPathSelectorCommonMethods extends
HoodieSparkClientTestHarne
String checkpointStr2ndRead =
nextFilePathsAndCheckpoint.getRight().getCheckpointKey();
assertEquals(2000L, Long.parseLong(checkpointStr2ndRead), "should read up
to foo5 (inclusive)");
}
+
+ @ParameterizedTest
+ @ValueSource(classes = {DFSPathSelector.class,
DatePartitionPathSelector.class})
+ void
getNextFilePathsAndMaxModificationTimeReturnsV1CheckpointWhenNoEligibleFiles(Class<?>
clazz) throws Exception {
+ DFSPathSelector selector = (DFSPathSelector)
ReflectionUtils.loadClass(clazz.getName(), props, storageConf.unwrap());
+ createBaseFile(basePath, "p1", "000", "foo1", 10, 1000);
+ createBaseFile(basePath, "p1", "000", "foo2", 10, 2000);
+ Pair<Option<String>, Checkpoint> result = selector
+ .getNextFilePathsAndMaxModificationTime(jsc, Option.of(new
StreamerCheckpointV2("999999999")), 30);
+ assertTrue(result.getLeft().isEmpty());
+ assertInstanceOf(StreamerCheckpointV1.class, result.getRight());
+ assertEquals("999999999", result.getRight().getCheckpointKey());
+ }
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
index 409b2d9fe9c8..1b010efab7cb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieIncrSourceE2E.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableVersion;
-import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -512,9 +511,9 @@ public class TestHoodieIncrSourceE2E extends
S3EventsHoodieIncrSourceHarness {
@Test
public void testTargetCheckpointV2ForS3Gcs() {
// To ensure we properly track sources that must use checkpoint V1.
- assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8,
S3EventsHoodieIncrSource.class.getName()));
- assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6,
S3EventsHoodieIncrSource.class.getName()));
- assertFalse(CheckpointUtils.shouldTargetCheckpointV2(8,
GcsEventsHoodieIncrSource.class.getName()));
- assertFalse(CheckpointUtils.shouldTargetCheckpointV2(6,
GcsEventsHoodieIncrSource.class.getName()));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8,
S3EventsHoodieIncrSource.class.getName()));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(6,
S3EventsHoodieIncrSource.class.getName()));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(8,
GcsEventsHoodieIncrSource.class.getName()));
+ assertFalse(StreamerCheckpointUtils.shouldTargetCheckpointV2(6,
GcsEventsHoodieIncrSource.class.getName()));
}
}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
new file mode 100644
index 000000000000..596be797e36b
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestParquetDfsCheckpointFormatOnV6.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Continues ingestion against a fixture table that was originally written
with V1 checkpoint
+ * format only and is now being resumed on a Hudi 1.x release with
+ * {@code hoodie.write.table.version=6}.
+ *
+ * <p>Expected: the next commit's {@code extraMetadata} carries the V1 key
+ * {@code deltastreamer.checkpoint.key} and NOT the V2 key {@code
streamer.checkpoint.key.v2},
+ * because non-incremental sources emit V1 regardless of write table version.
+ *
+ * <p>If a V2 key shows up on the resumed commit, that reproduces the bug
reported against
+ * {@code hoodie.write.table.version=6}.
+ */
+public class TestParquetDfsCheckpointFormatOnV6 extends
HoodieDeltaStreamerTestBase {
+
+ private static final String FIXTURE_RESOURCE =
"checkpoint-v6/parquet-dfs-v1-fixture.zip";
+
+ @Test
+ public void resumedV6CommitKeepsV1CheckpointFormat() throws Exception {
+ String dirName = "checkpoint-v6-resume-" + System.currentTimeMillis();
+ String dataPath = basePath + "/" + dirName;
+ Path zipOutput = Paths.get(new URI(dataPath));
+ Files.createDirectories(zipOutput);
+ HoodieTestUtils.extractZipToDirectory(FIXTURE_RESOURCE, zipOutput,
getClass());
+
+ // The fixture table is rooted directly at zipOutput (zip contains
.hoodie/, partitions, etc.).
+ String tableBasePath = zipOutput.toString();
+ assertTrue(Files.exists(zipOutput.resolve(".hoodie/hoodie.properties")),
+ "Fixture did not unpack a hudi table at " + tableBasePath);
+
+ // Sanity-check that the fixture's last commit carries a V1 checkpoint and
no V2 key.
+ HoodieCommitMetadata baselineCommit =
readLatestCommitMetadata(tableBasePath);
+
assertNotNull(baselineCommit.getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1),
+ "Fixture baseline expected to carry V1 checkpoint key.");
+
assertNull(baselineCommit.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2),
+ "Fixture baseline must not have a V2 checkpoint key (was built on
master).");
+
+ // Produce a fresh parquet source file under a new root so the streamer
has work to do.
+ String parquetSourceRoot = basePath + "/" + dirName + "-source";
+ prepareParquetDFSFiles(50, parquetSourceRoot, "resume.parquet", false,
null, null).close();
+
+ String propsFile = dirName + "-source.properties";
+ TypedProperties extraProps = new TypedProperties();
+ extraProps.setProperty("hoodie.datasource.write.table.type",
HoodieTableType.COPY_ON_WRITE.name());
+ prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
+ propsFile, parquetSourceRoot, false, "partition_path", "", extraProps,
false, false);
+
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT,
+ ParquetDFSSource.class.getName(), Collections.emptyList(), propsFile,
false, false,
+ 100_000, false, null, HoodieTableType.COPY_ON_WRITE.name(),
"timestamp", null);
+ cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=6");
+ new HoodieDeltaStreamer(cfg, jsc).sync();
+
+ // Confirm the on-disk table version stayed at 6 so we are testing the
v6-write path, not an
+ // accidental v6 -> v9 auto-upgrade (which would make V2 the correct
result).
+ HoodieTableMetaClient resumedMetaClient =
HoodieTestUtils.createMetaClient(storage, tableBasePath);
+ assertEquals(6,
resumedMetaClient.getTableConfig().getTableVersion().versionCode(),
+ "Table version on disk should still be 6 after resume");
+
+ HoodieCommitMetadata resumedCommit =
readLatestCommitMetadata(tableBasePath);
+ // Under hoodie.write.table.version=6, ParquetDFSSource must keep emitting
V1 keys; in the
+ // always-V1 design, this holds across every write table version for
non-incremental sources.
+
assertNotNull(resumedCommit.getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1),
+ "Resumed v6 commit must persist V1 checkpoint key. extraMetadata="
+ + resumedCommit.getExtraMetadata());
+
assertNull(resumedCommit.getMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2),
+ "Resumed v6 commit must NOT persist a V2 checkpoint key.
extraMetadata="
+ + resumedCommit.getExtraMetadata());
+ }
+
+ private static HoodieCommitMetadata readLatestCommitMetadata(String
tableBasePath) throws IOException {
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storage, tableBasePath);
+ HoodieInstant lastInstant = metaClient.getActiveTimeline()
+ .getCommitsTimeline()
+ .filterCompletedInstants()
+ .lastInstant()
+ .orElseThrow(() -> new IllegalStateException("No completed commit
found in " + tableBasePath));
+ return metaClient.getActiveTimeline().readCommitMetadata(lastInstant);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
index 26ba7599b451..4a851e7c0a06 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSync.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
@@ -103,10 +104,10 @@ public class TestStreamSync extends
SparkClientFunctionalTestHarness {
SourceFormatAdapter sourceFormatAdapter = mock(SourceFormatAdapter.class);
SchemaProvider inputBatchSchemaProvider = getSchemaProvider("InputBatch",
false);
Option<Dataset<Row>> fakeDataFrame = Option.of(mock(Dataset.class));
- InputBatch<Dataset<Row>> fakeRowInputBatch = new
InputBatch<>(fakeDataFrame, "chkpt", inputBatchSchemaProvider);
+ InputBatch<Dataset<Row>> fakeRowInputBatch = new
InputBatch<>(fakeDataFrame, new StreamerCheckpointV2("chkpt"),
inputBatchSchemaProvider);
when(sourceFormatAdapter.fetchNewDataInRowFormat(any(),
anyLong())).thenReturn(fakeRowInputBatch);
//batch is empty because we don't want getBatch().map() to do anything
because it calls static method we can't mock
- InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new
InputBatch<>(Option.empty(), "chkpt", inputBatchSchemaProvider);
+ InputBatch<JavaRDD<GenericRecord>> fakeAvroInputBatch = new
InputBatch<>(Option.empty(), new StreamerCheckpointV2("chkpt"),
inputBatchSchemaProvider);
when(sourceFormatAdapter.fetchNewDataInAvroFormat(any(),anyLong())).thenReturn(fakeAvroInputBatch);
//transformer
@@ -356,11 +357,33 @@ public class TestStreamSync extends
SparkClientFunctionalTestHarness {
}
@Test
- public void testExtractCheckpointMetadata_WhenCheckpointIsNullV2() {
+ void
testExtractCheckpointMetadata_WhenCheckpointIsNullNonIncrementalSourceOnV8UsesV1()
{
StreamSync streamSync = setupStreamSync();
HoodieStreamer.Config cfg = new HoodieStreamer.Config();
cfg.checkpoint = "test-checkpoint";
cfg.ignoreCheckpoint = "test-ignore";
+ cfg.sourceClassName = "org.apache.hudi.utilities.sources.KafkaSource";
+ TypedProperties props = new TypedProperties();
+
+ InputBatch inputBatch = mock(InputBatch.class);
+ when(inputBatch.getCheckpointForNextBatch()).thenReturn(null);
+
+ Map<String, String> result = streamSync.extractCheckpointMetadata(
+ inputBatch, props, HoodieTableVersion.EIGHT.versionCode(), cfg);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
+ expected.put(CHECKPOINT_RESET_KEY, "test-checkpoint");
+ assertEquals(expected, result, "Should fall back to V1 keys for
non-incremental sources on v8");
+ }
+
+ @Test
+ void
testExtractCheckpointMetadata_WhenCheckpointIsNullIncrementalSourceOnV8UsesV2()
{
+ StreamSync streamSync = setupStreamSync();
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.checkpoint = "test-checkpoint";
+ cfg.ignoreCheckpoint = "test-ignore";
+ cfg.sourceClassName = "org.apache.hudi.utilities.sources.HoodieIncrSource";
TypedProperties props = new TypedProperties();
InputBatch inputBatch = mock(InputBatch.class);
@@ -372,7 +395,7 @@ public class TestStreamSync extends
SparkClientFunctionalTestHarness {
Map<String, String> expected = new HashMap<>();
expected.put(CHECKPOINT_IGNORE_KEY, "test-ignore");
expected.put(STREAMER_CHECKPOINT_RESET_KEY_V2, "test-checkpoint");
- assertEquals(expected, result, "Should return default metadata when
checkpoint is null");
+ assertEquals(expected, result, "Should fall back to V2 keys for
incremental sources on v8");
}
@Test
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
index f4e30667b295..0d9c3f7d15f4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -41,6 +42,8 @@ import
org.apache.hudi.utilities.exception.HoodieStreamerException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
@@ -51,11 +54,13 @@ import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorage
import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(MockitoExtension.class)
public class TestStreamerCheckpointUtils extends
SparkClientFunctionalTestHarness {
+ private static final String CHECKPOINT_TO_RESUME = "20240101000000";
private TypedProperties props;
private HoodieStreamer.Config streamerConfig;
protected HoodieTableMetaClient metaClient;
@@ -68,6 +73,53 @@ public class TestStreamerCheckpointUtils extends
SparkClientFunctionalTestHarnes
streamerConfig.tableType = HoodieTableType.COPY_ON_WRITE.name();
}
+ @ParameterizedTest
+ @CsvSource({
+ // version, sourceClassName, expectedV2
+ // Only HoodieIncrSource family on v8+ targets V2.
+ "8, org.apache.hudi.utilities.sources.HoodieIncrSource, true",
+ "9, org.apache.hudi.utilities.sources.HoodieIncrSource, true",
+ "8, org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource, true",
+ // v6/v7 always V1.
+ "7, org.apache.hudi.utilities.sources.HoodieIncrSource, false",
+ "6, org.apache.hudi.utilities.sources.HoodieIncrSource, false",
+ // Non-incremental sources always V1 regardless of version.
+ "8, org.apache.hudi.utilities.sources.KafkaSource, false",
+ "9, org.apache.hudi.utilities.sources.JdbcSource, false",
+ // V2-not-supported allowlist always V1.
+ "8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false",
+ "8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false",
+ "8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource,
false",
+ "8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource,
false"
+ })
+ void testShouldTargetCheckpointV2(int version, String sourceClassName,
boolean expectedV2) {
+ assertEquals(expectedV2,
StreamerCheckpointUtils.shouldTargetCheckpointV2(version, sourceClassName));
+ }
+
+ @Test
+ void testBuildCheckpointFromConfigOverride() {
+ Checkpoint hoodieIncrV8 =
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+ "org.apache.hudi.utilities.sources.HoodieIncrSource",
+ HoodieTableVersion.EIGHT.versionCode(),
+ CHECKPOINT_TO_RESUME);
+ assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class,
hoodieIncrV8);
+ assertEquals(CHECKPOINT_TO_RESUME, hoodieIncrV8.getCheckpointKey());
+
+ Checkpoint nonIncrV8 =
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+ "org.apache.hudi.utilities.sources.KafkaSource",
+ HoodieTableVersion.EIGHT.versionCode(),
+ CHECKPOINT_TO_RESUME);
+ assertInstanceOf(StreamerCheckpointV1.class, nonIncrV8);
+ assertEquals(CHECKPOINT_TO_RESUME, nonIncrV8.getCheckpointKey());
+
+ Checkpoint hoodieIncrV6 =
StreamerCheckpointUtils.buildCheckpointFromConfigOverride(
+ "org.apache.hudi.utilities.sources.HoodieIncrSource",
+ HoodieTableVersion.SIX.versionCode(),
+ CHECKPOINT_TO_RESUME);
+ assertInstanceOf(StreamerCheckpointV1.class, hoodieIncrV6);
+ assertEquals(CHECKPOINT_TO_RESUME, hoodieIncrV6.getCheckpointKey());
+ }
+
@Test
public void testEmptyTimelineCase() throws IOException {
Option<Checkpoint> checkpoint =
StreamerCheckpointUtils.resolveCheckpointBetweenConfigAndPrevCommit(
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
index 017507f2f9ec..7b7c06d622d4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
@@ -35,6 +35,8 @@ import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.hudi.common.table.checkpoint.CheckpointUtils.createCheckpoint;
+
/**
* A Test DataSource which scales test-data generation by using spark
parallelism.
*/
@@ -57,7 +59,7 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
// No new data.
if (sourceLimit <= 0) {
- return new InputBatch<>(Option.empty(), instantTime);
+ return new InputBatch<>(Option.empty(), createCheckpoint(instantTime));
}
TypedProperties newProps = new TypedProperties();
@@ -77,6 +79,6 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
}
return fetchNextBatch(newProps, perPartitionSourceLimit,
instantTime, p).iterator();
}, true);
- return new InputBatch<>(Option.of(avroRDD), instantTime);
+ return new InputBatch<>(Option.of(avroRDD), createCheckpoint(instantTime));
}
}
diff --git
a/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip
b/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip
new file mode 100644
index 000000000000..29c1b9804435
Binary files /dev/null and
b/hudi-utilities/src/test/resources/checkpoint-v6/parquet-dfs-v1-fixture.zip
differ