This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ca77fda51fe [HUDI-7618] Add ability to ignore checkpoints in delta
streamer (#11018)
ca77fda51fe is described below
commit ca77fda51fe3036f86d4ddb8b0e58a2f160882dc
Author: Sampan S Nayak <[email protected]>
AuthorDate: Fri Apr 19 11:55:43 2024 +0530
[HUDI-7618] Add ability to ignore checkpoints in delta streamer (#11018)
---
.../hudi/utilities/streamer/HoodieStreamer.java | 7 +++
.../apache/hudi/utilities/streamer/StreamSync.java | 13 ++++-
.../streamer/TestStreamSyncUnitTests.java | 61 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 59c1bf3d164..0dd488bffcb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -428,6 +428,13 @@ public class HoodieStreamer implements Serializable {
@Parameter(names = {"--config-hot-update-strategy-class"}, description =
"Configuration hot update in continuous mode")
public String configHotUpdateStrategyClass = "";
+ @Parameter(names = {"--ignore-checkpoint"}, description = "Set this config
with a unique value, recommend using a timestamp value or UUID."
+ + " Setting this config indicates that the subsequent sync should
ignore the last committed checkpoint for the source. The config value is stored"
+ + " in the commit history, so setting the config with same values
would not have any affect. This config can be used in scenarios like kafka
topic change,"
+ + " where we would want to start ingesting from the latest or earliest
offset after switching the topic (in this case we would want to ignore the
previously"
+ + " committed checkpoint, and rely on other configs to pick the
starting offsets).")
+ public String ignoreCheckpoint = null;
+
public boolean isAsyncCompactionEnabled() {
return continuousMode && !forceDisableCompaction
&&
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index c9521058b12..2f5bd1fd3ff 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -164,6 +164,7 @@ public class StreamSync implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamSync.class);
private static final String NULL_PLACEHOLDER = "[null]";
+ public static final String CHECKPOINT_IGNORE_KEY =
"deltastreamer.checkpoint.ignore_key";
/**
* Delta Sync Config.
@@ -733,7 +734,8 @@ public class StreamSync implements Serializable, Closeable {
* @return the checkpoint to resume from if applicable.
* @throws IOException
*/
- private Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitsTimelineOpt) throws IOException {
+ @VisibleForTesting
+ Option<String> getCheckpointToResume(Option<HoodieTimeline>
commitsTimelineOpt) throws IOException {
Option<String> resumeCheckpointStr = Option.empty();
// try get checkpoint from commits(including commit and deltacommit)
// in COW migrating to MOR case, the first batch of the deltastreamer will
lost the checkpoint from COW table, cause the dataloss
@@ -750,7 +752,11 @@ public class StreamSync implements Serializable, Closeable
{
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
LOG.debug("Checkpoint reset from metadata: " +
commitMetadata.getMetadata(CHECKPOINT_RESET_KEY));
- if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
+ if (cfg.ignoreCheckpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY))
+ ||
!cfg.ignoreCheckpoint.equals(commitMetadata.getMetadata(CHECKPOINT_IGNORE_KEY))))
{
+ // we ignore any existing checkpoint and start ingesting afresh
+ resumeCheckpointStr = Option.empty();
+ } else if (cfg.checkpoint != null &&
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
||
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} else if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
@@ -852,6 +858,9 @@ public class StreamSync implements Serializable, Closeable {
if (cfg.checkpoint != null) {
checkpointCommitMetadata.put(CHECKPOINT_RESET_KEY, cfg.checkpoint);
}
+ if (cfg.ignoreCheckpoint != null) {
+ checkpointCommitMetadata.put(CHECKPOINT_IGNORE_KEY,
cfg.ignoreCheckpoint);
+ }
}
if (hasErrors) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
index c22c948e70b..8ff5b6ee933 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -22,7 +22,10 @@ package org.apache.hudi.utilities.streamer;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.storage.HoodieStorage;
@@ -43,9 +46,13 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.util.stream.Stream;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA;
+import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
+import static
org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_RESET_KEY;
+import static
org.apache.hudi.utilities.streamer.StreamSync.CHECKPOINT_IGNORE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -130,6 +137,60 @@ public class TestStreamSyncUnitTests {
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue());
}
+ @ParameterizedTest
+ @MethodSource("getCheckpointToResumeCases")
+ void testGetCheckpointToResume(HoodieStreamer.Config cfg,
HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint)
throws IOException {
+ HoodieSparkEngineContext hoodieSparkEngineContext =
mock(HoodieSparkEngineContext.class);
+ FileSystem fs = mock(FileSystem.class);
+ TypedProperties props = new TypedProperties();
+ SparkSession sparkSession = mock(SparkSession.class);
+ Configuration configuration = mock(Configuration.class);
+ HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
+ HoodieInstant hoodieInstant = mock(HoodieInstant.class);
+
+ when(commitsTimeline.filter(any())).thenReturn(commitsTimeline);
+ when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
+
+ StreamSync streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkEngineContext,
+ fs, configuration, client -> true,
null,Option.empty(),null,Option.empty(),true,true);
+ StreamSync spy = spy(streamSync);
+
doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any());
+
+ Option<String> resumeCheckpoint =
spy.getCheckpointToResume(Option.of(commitsTimeline));
+ assertEquals(expectedResumeCheckpoint,resumeCheckpoint);
+ }
+
+ private static Stream<Arguments> getCheckpointToResumeCases() {
+ return Stream.of(
+ // Checkpoint has been manually overridden (reset-checkpoint)
+
Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,null),Option.of("new-reset-checkpoint")),
+ // Checkpoint not reset/ Ignored, continuing from previous run
+
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint",null),generateCommitMetadata("old-reset-checkpoint",null,"checkpoint-prev-run"),Option.of("checkpoint-prev-run")),
+ // Checkpoint not reset/ Ignored, continuing from previous run (ignore
checkpoint has not changed)
+
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123445","checkpoint-prev-run"),Option.of("checkpoint-prev-run")),
+ // Ignore checkpoint set, existing checkpoints will be ignored
+
Arguments.of(generateDeltaStreamerConfig("old-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty()),
+ // Ignore checkpoint set, existing checkpoints will be ignored
(reset-checkpoint ignored)
+
Arguments.of(generateDeltaStreamerConfig("new-reset-checkpoint","123445"),generateCommitMetadata("old-reset-checkpoint","123422","checkpoint-prev-run"),Option.empty())
+ );
+ }
+
+ private static HoodieStreamer.Config generateDeltaStreamerConfig(String
checkpoint, String ignoreCheckpoint) {
+ HoodieStreamer.Config cfg = new HoodieStreamer.Config();
+ cfg.checkpoint = checkpoint;
+ cfg.ignoreCheckpoint = ignoreCheckpoint;
+ cfg.tableType = "MERGE_ON_READ";
+ return cfg;
+ }
+
+ private static HoodieCommitMetadata generateCommitMetadata(String
resetCheckpointValue, String ignoreCheckpointValue, String checkpointValue) {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ commitMetadata.addMetadata(CHECKPOINT_RESET_KEY,resetCheckpointValue);
+ commitMetadata.addMetadata(CHECKPOINT_IGNORE_KEY,ignoreCheckpointValue);
+ commitMetadata.addMetadata(CHECKPOINT_KEY,checkpointValue);
+ return commitMetadata;
+ }
+
private SchemaProvider getSchemaProvider(String name, boolean
isNullTargetSchema) {
SchemaProvider schemaProvider = mock(SchemaProvider.class);
Schema sourceSchema = mock(Schema.class);