the-other-tim-brown commented on code in PR #569:
URL: https://github.com/apache/incubator-xtable/pull/569#discussion_r1898995137


##########
xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java:
##########
@@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
    *     false.
    */
   boolean isIncrementalSyncSafeFrom(Instant instant);
+
+  /**
+   * Extract the identifier of the provided commit, the identifier defined as 
1. Snapshot ID in

Review Comment:
   You can use `<ul><li>` tags for formatting the docs with the list if you 
want 



##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java:
##########
@@ -287,40 +287,81 @@ public Optional<TableSyncMetadata> getTableMetadata() {
                 .filterCompletedInstants()
                 .lastInstant()
                 .toJavaOptional()
-                .map(
-                    instant -> {
-                      try {
-                        if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-                          return HoodieReplaceCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieReplaceCommitMetadata.class)
-                              .getExtraMetadata();
-                        } else {
-                          return HoodieCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieCommitMetadata.class)
-                              .getExtraMetadata();
-                        }
-                      } catch (IOException ex) {
-                        throw new ReadException("Unable to read Hudi commit 
metadata", ex);
-                      }
-                    })
-                .flatMap(
-                    metadata ->
-                        TableSyncMetadata.fromJson(
-                            metadata.get(TableSyncMetadata.XTABLE_METADATA))));
+                .flatMap(instant -> getMetadata(instant, client)));
   }
 
   @Override
   public String getTableFormat() {
     return TableFormat.HUDI;
   }
 
+  @Override
+  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+    if (!metaClient.isPresent()) {
+      return Optional.empty();
+    }
+    return getTargetCommitIdentifier(sourceIdentifier, metaClient.get());
+  }
+
+  Optional<String> getTargetCommitIdentifier(
+      String sourceIdentifier, HoodieTableMetaClient metaClient) {
+
+    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline();
+
+    for (HoodieInstant instant : commitTimeline.getInstants()) {
+      try {
+        Optional<TableSyncMetadata> optionalMetadata = getMetadata(instant, 
metaClient);
+        if (!optionalMetadata.isPresent()) {
+          return Optional.empty();
+        }
+
+        TableSyncMetadata metadata = optionalMetadata.get();
+        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+          return Optional.of(String.valueOf(instant.getTimestamp()));

Review Comment:
   `getTimestamp` is already returning a string, can we remove the 
`String.valueOf`?



##########
xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java:
##########
@@ -300,5 +358,11 @@ private Format getFileFormat() {
       // fallback to existing deltalog value
       return deltaLog.snapshot().metadata().format();
     }
+
+    private Map<String, String> getCommitTags() {
+      Map<String, String> tags = new HashMap<>();
+      tags.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());

Review Comment:
   nitpick: we can use `Collections.singletonMap` here 



##########
xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java:
##########
@@ -287,40 +287,81 @@ public Optional<TableSyncMetadata> getTableMetadata() {
                 .filterCompletedInstants()
                 .lastInstant()
                 .toJavaOptional()
-                .map(
-                    instant -> {
-                      try {
-                        if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-                          return HoodieReplaceCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieReplaceCommitMetadata.class)
-                              .getExtraMetadata();
-                        } else {
-                          return HoodieCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieCommitMetadata.class)
-                              .getExtraMetadata();
-                        }
-                      } catch (IOException ex) {
-                        throw new ReadException("Unable to read Hudi commit 
metadata", ex);
-                      }
-                    })
-                .flatMap(
-                    metadata ->
-                        TableSyncMetadata.fromJson(
-                            metadata.get(TableSyncMetadata.XTABLE_METADATA))));
+                .flatMap(instant -> getMetadata(instant, client)));
   }
 
   @Override
   public String getTableFormat() {
     return TableFormat.HUDI;
   }
 
+  @Override
+  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+    if (!metaClient.isPresent()) {
+      return Optional.empty();
+    }
+    return getTargetCommitIdentifier(sourceIdentifier, metaClient.get());
+  }
+
+  Optional<String> getTargetCommitIdentifier(
+      String sourceIdentifier, HoodieTableMetaClient metaClient) {
+
+    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline();
+
+    for (HoodieInstant instant : commitTimeline.getInstants()) {
+      try {
+        Optional<TableSyncMetadata> optionalMetadata = getMetadata(instant, 
metaClient);
+        if (!optionalMetadata.isPresent()) {
+          return Optional.empty();
+        }
+
+        TableSyncMetadata metadata = optionalMetadata.get();
+        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+          return Optional.of(String.valueOf(instant.getTimestamp()));
+        }
+      } catch (Exception e) {
+        log.warn("Failed to parse commit metadata for instant: {}", instant, 
e);
+      }
+    }
+    return Optional.empty();
+  }
+
   private HoodieTableMetaClient getMetaClient() {
     return metaClient.orElseThrow(
         () -> new IllegalStateException("beginSync must be called before 
calling this method"));
   }
 
+  private Optional<TableSyncMetadata> getMetadata(
+      HoodieInstant instant, HoodieTableMetaClient metaClient) {
+    try {
+      // Get instant details
+      Option<byte[]> instantDetails = 
metaClient.getActiveTimeline().getInstantDetails(instant);
+      if (!instantDetails.isPresent()) {
+        return Optional.empty();
+      }
+
+      // Check action and parse the appropriate metadata
+      Map<String, String> extraMetadata;
+      if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {

Review Comment:
   I just discovered there is a TimelineUtils#getCommitMetadata that we can use 
to handle this logic for us. 
https://github.com/apache/hudi/blob/release-0.14.0/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java#L271C38-L271C55
   
   The `HoodieReplaceCommitMetadata` extends `HoodieCommitMetadata` so it will 
work for our use case where we just want to inspect the metadata



##########
xtable-api/src/main/java/org/apache/xtable/model/TableChange.java:
##########
@@ -36,4 +36,7 @@ public class TableChange {
 
   /** The {@link InternalTable} at the commit time to which this table change 
belongs. */
   InternalTable tableAsOfChange;
+
+  // Commit identifier in source table
+  @Builder.Default String sourceIdentifier = "";

Review Comment:
   I think this may be safer without a default and require non-null, what do 
you think?
   
   Similar note for `InternalSnapshot`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to