This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 471bb48338bd refactor: move checkpoint metadata lookup helper to
hudi-common (#18489)
471bb48338bd is described below
commit 471bb48338bd9642bd593472b029915180156038
Author: Surya Prasanna <[email protected]>
AuthorDate: Mon May 4 13:42:43 2026 -0700
refactor: move checkpoint metadata lookup helper to hudi-common (#18489)
This PR moves the checkpoint metadata lookup helper into hudi-common so
ingestion-related code can reuse the same timeline utility instead of keeping
the logic in utilities-only code.
---
.../hudi/common/table/timeline/TimelineUtils.java | 27 ++++++++++++++++++++++
.../streamer/StreamerCheckpointUtils.java | 22 ++++++------------
2 files changed, 34 insertions(+), 15 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index cfc1520b2393..4b2e6ee55fc6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.text.ParseException;
import java.util.AbstractMap;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -647,4 +648,30 @@ public class TimelineUtils {
}
return writerOption;
}
+
+ /**
+ * Returns the latest reverse-ordered instant whose commit metadata contains
at least one of the provided
+ * checkpoint metadata keys.
+ *
+ * @param timeline timeline to scan; expected to contain completed instants
+ * @param checkpointKeys checkpoint metadata keys to look for in commit
metadata
+ * @return an {@link Option} containing a {@link Pair} of the matching
instant's
+ * {@link HoodieInstant#toString()} value and its
+ * {@link HoodieCommitMetadata}; {@link Option#empty()} if no
matching instant is found
+ * @throws IOException if reading commit metadata fails
+ */
+ @SuppressWarnings("unchecked")
+ public static Option<Pair<String, HoodieCommitMetadata>>
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(
+ HoodieTimeline timeline, String... checkpointKeys) throws IOException {
+ return (Option<Pair<String, HoodieCommitMetadata>>)
timeline.getReverseOrderedInstants().map(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata =
timeline.readCommitMetadata(instant);
+ boolean hasCheckpointMetadata = Arrays.stream(checkpointKeys)
+ .anyMatch(key ->
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(key)));
+ return hasCheckpointMetadata ? Option.of(Pair.of(instant.toString(),
commitMetadata)) : Option.empty();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to parse HoodieCommitMetadata for
" + instant.toString(), e);
+ }
+ }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index 03db67b9d03f..560d9fd2172a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -201,21 +202,12 @@ public class StreamerCheckpointUtils {
public static Option<Pair<String, HoodieCommitMetadata>>
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline
timeline)
throws IOException {
- return (Option<Pair<String, HoodieCommitMetadata>>)
timeline.getReverseOrderedInstants().map(instant -> {
- try {
- HoodieCommitMetadata commitMetadata =
timeline.readCommitMetadata(instant);
- if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))
- ||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))
- ||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
- ||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
{
- return Option.of(Pair.of(instant.toString(), commitMetadata));
- } else {
- return Option.empty();
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to parse HoodieCommitMetadata for
" + instant.toString(), e);
- }
- }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+ return
TimelineUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(
+ timeline,
+ HoodieStreamer.CHECKPOINT_KEY,
+ HoodieStreamer.CHECKPOINT_RESET_KEY,
+ STREAMER_CHECKPOINT_KEY_V2,
+ STREAMER_CHECKPOINT_RESET_KEY_V2);
}
public static Option<HoodieCommitMetadata>
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws
IOException {