This is an automated email from the ASF dual-hosted git repository.

nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 471bb48338bd refactor: move checkpoint metadata lookup helper to 
hudi-common (#18489)
471bb48338bd is described below

commit 471bb48338bd9642bd593472b029915180156038
Author: Surya Prasanna <[email protected]>
AuthorDate: Mon May 4 13:42:43 2026 -0700

    refactor: move checkpoint metadata lookup helper to hudi-common (#18489)
    
    This PR moves the checkpoint metadata lookup helper into hudi-common so 
ingestion-related code can reuse the same timeline utility instead of keeping 
the logic in utilities-only code.
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 27 ++++++++++++++++++++++
 .../streamer/StreamerCheckpointUtils.java          | 22 ++++++------------
 2 files changed, 34 insertions(+), 15 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index cfc1520b2393..4b2e6ee55fc6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.text.ParseException;
 import java.util.AbstractMap;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -647,4 +648,30 @@ public class TimelineUtils {
     }
     return writerOption;
   }
+
+  /**
+   * Returns the latest reverse-ordered instant whose commit metadata contains 
at least one of the provided
+   * checkpoint metadata keys.
+   *
+   * @param timeline timeline to scan; expected to contain completed instants
+   * @param checkpointKeys checkpoint metadata keys to look for in commit 
metadata
+   * @return an {@link Option} containing a {@link Pair} of the matching 
instant's
+   *         {@link HoodieInstant#toString()} value and its
+   *         {@link HoodieCommitMetadata}; {@link Option#empty()} if no 
matching instant is found
+   * @throws IOException if reading commit metadata fails
+   */
+  @SuppressWarnings("unchecked")
+  public static Option<Pair<String, HoodieCommitMetadata>> 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(
+      HoodieTimeline timeline, String... checkpointKeys) throws IOException {
+    return (Option<Pair<String, HoodieCommitMetadata>>) 
timeline.getReverseOrderedInstants().map(instant -> {
+      try {
+        HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(instant);
+        boolean hasCheckpointMetadata = Arrays.stream(checkpointKeys)
+            .anyMatch(key -> 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(key)));
+        return hasCheckpointMetadata ? Option.of(Pair.of(instant.toString(), 
commitMetadata)) : Option.empty();
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse HoodieCommitMetadata for 
" + instant.toString(), e);
+      }
+    }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index 03db67b9d03f..560d9fd2172a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -201,21 +202,12 @@ public class StreamerCheckpointUtils {
 
   public static Option<Pair<String, HoodieCommitMetadata>> 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline 
timeline)
       throws IOException {
-    return (Option<Pair<String, HoodieCommitMetadata>>) 
timeline.getReverseOrderedInstants().map(instant -> {
-      try {
-        HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(instant);
-        if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))
-            || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))
-            || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
-            || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
-          return Option.of(Pair.of(instant.toString(), commitMetadata));
-        } else {
-          return Option.empty();
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Failed to parse HoodieCommitMetadata for 
" + instant.toString(), e);
-      }
-    }).filter(Option::isPresent).findFirst().orElse(Option.empty());
+    return 
TimelineUtils.getLatestInstantAndCommitMetadataWithValidCheckpointInfo(
+        timeline,
+        HoodieStreamer.CHECKPOINT_KEY,
+        HoodieStreamer.CHECKPOINT_RESET_KEY,
+        STREAMER_CHECKPOINT_KEY_V2,
+        STREAMER_CHECKPOINT_RESET_KEY_V2);
   }
 
   public static Option<HoodieCommitMetadata> 
getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline) throws 
IOException {

Reply via email to