This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new b3e03e55 [40] Add commit-level metadata and source-target id mapping
b3e03e55 is described below

commit b3e03e5522af9e1dc08133b849f7e802b755dbab
Author: Daniel Tu <[email protected]>
AuthorDate: Sat Oct 26 18:01:16 2024 -0700

    [40] Add commit-level metadata and source-target id mapping
---
 .../org/apache/xtable/model/InternalSnapshot.java  |   3 +
 .../java/org/apache/xtable/model/TableChange.java  |   4 +
 .../xtable/model/metadata/TableSyncMetadata.java   |  22 ++-
 .../xtable/spi/extractor/ConversionSource.java     |  14 ++
 .../apache/xtable/spi/sync/ConversionTarget.java   |  10 ++
 .../apache/xtable/spi/sync/TableFormatSync.java    |  23 ++-
 .../model/metadata/TestTableSyncMetadata.java      |  20 ++-
 .../spi/extractor/TestExtractFromSource.java       |  10 +-
 .../xtable/spi/sync/TestTableFormatSync.java       | 132 +++++++++++++----
 .../apache/xtable/delta/DeltaConversionSource.java |  12 +-
 .../apache/xtable/delta/DeltaConversionTarget.java |  65 ++++++++-
 .../apache/xtable/hudi/HudiConversionSource.java   |   7 +
 .../apache/xtable/hudi/HudiConversionTarget.java   |  70 ++++++---
 .../xtable/iceberg/IcebergConversionSource.java    |  12 +-
 .../xtable/iceberg/IcebergConversionTarget.java    |  41 +++++-
 .../xtable/iceberg/IcebergDataFileUpdatesSync.java |  17 ++-
 .../conversion/TestConversionController.java       |  59 +++++---
 .../org/apache/xtable/delta/TestDeltaSync.java     |  78 +++++++++-
 .../apache/xtable/hudi/ITHudiConversionTarget.java | 160 +++++++++++++++++++--
 .../xtable/hudi/TestHudiConversionTarget.java      |   3 +-
 .../org/apache/xtable/iceberg/TestIcebergSync.java | 123 ++++++++++++++--
 21 files changed, 770 insertions(+), 115 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java 
b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
index 345fda78..1963eba7 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 
 import lombok.Builder;
+import lombok.NonNull;
 import lombok.Value;
 
 import org.apache.xtable.model.storage.PartitionFileGroup;
@@ -47,4 +48,6 @@ public class InternalSnapshot {
   List<PartitionFileGroup> partitionedDataFiles;
   // pending commits before latest commit on the table.
   @Builder.Default List<Instant> pendingCommits = Collections.emptyList();
+  // commit identifier in source table
+  @NonNull String sourceIdentifier;
 }
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java 
b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index 51f0ee0b..fe3907ee 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -19,6 +19,7 @@
 package org.apache.xtable.model;
 
 import lombok.Builder;
+import lombok.NonNull;
 import lombok.Value;
 
 import org.apache.xtable.model.storage.DataFilesDiff;
@@ -36,4 +37,7 @@ public class TableChange {
 
   /** The {@link InternalTable} at the commit time to which this table change 
belongs. */
   InternalTable tableAsOfChange;
+
+  // Commit identifier in source table
+  @NonNull String sourceIdentifier;
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
index fa5eeaff..d8c70791 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
@@ -54,10 +54,30 @@ public class TableSyncMetadata {
   Instant lastInstantSynced;
   List<Instant> instantsToConsiderForNextSync;
   int version;
+  String sourceTableFormat;
+  String sourceIdentifier;
 
+  /**
+   * @deprecated Use {@link #of(Instant, List, String, String)} instead. This 
method exists for
+   *     backward compatibility and will be removed in a future version.
+   */
+  @Deprecated
   public static TableSyncMetadata of(
       Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
-    return new TableSyncMetadata(lastInstantSynced, 
instantsToConsiderForNextSync, CURRENT_VERSION);
+    return TableSyncMetadata.of(lastInstantSynced, 
instantsToConsiderForNextSync, null, null);
+  }
+
+  public static TableSyncMetadata of(
+      Instant lastInstantSynced,
+      List<Instant> instantsToConsiderForNextSync,
+      String sourceTableFormat,
+      String sourceIdentifier) {
+    return new TableSyncMetadata(
+        lastInstantSynced,
+        instantsToConsiderForNextSync,
+        CURRENT_VERSION,
+        sourceTableFormat,
+        sourceIdentifier);
   }
 
   public String toJson() {
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index 21f7f63f..42d2b26b 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -87,4 +87,18 @@ public interface ConversionSource<COMMIT> extends Closeable {
    *     false.
    */
   boolean isIncrementalSyncSafeFrom(Instant instant);
+
+  /**
+   * Extract the identifier of the provided commit. The identifier is defined 
as:
+   *
+   * <ul>
+   *   <li>Snapshot ID in Iceberg
+   *   <li>Version ID in Delta
+   *   <li>Timestamp in Hudi
+   * </ul>
+   *
+   * @param commit The provided commit
+   * @return the string version of the commit identifier
+   */
+  String getCommitIdentifier(COMMIT commit);
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index 736f49e4..476578cb 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -90,4 +90,14 @@ public interface ConversionTarget {
 
   /** Initializes the client with provided configuration */
   void init(TargetTable targetTable, Configuration configuration);
+
+  /**
+   * Retrieves the commit identifier from the target table that corresponds to 
a given source table
+   * commit identifier
+   *
+   * @param sourceIdentifier the unique identifier of the source table commit
+   * @return an {@link Optional} containing the target commit identifier if a 
corresponding commit
+   *     exists, or an empty {@link Optional} if no match is found
+   */
+  Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java 
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
index bb340669..78a22b76 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
@@ -73,7 +73,8 @@ public class TableFormatSync {
                 internalTable,
                 target -> 
target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
                 startTime,
-                snapshot.getPendingCommits()));
+                snapshot.getPendingCommits(),
+                snapshot.getSourceIdentifier()));
       } catch (Exception e) {
         log.error("Failed to sync snapshot", e);
         results.put(
@@ -121,7 +122,8 @@ public class TableFormatSync {
                   change.getTableAsOfChange(),
                   target -> target.syncFilesForDiff(change.getFilesDiff()),
                   startTime,
-                  changes.getPendingCommits()));
+                  changes.getPendingCommits(),
+                  change.getSourceIdentifier()));
         } catch (Exception e) {
           log.error("Failed to sync table changes", e);
           resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, 
startTime, e));
@@ -149,19 +151,26 @@ public class TableFormatSync {
       InternalTable tableState,
       SyncFiles fileSyncMethod,
       Instant startTime,
-      List<Instant> pendingCommits) {
+      List<Instant> pendingCommits,
+      String sourceIdentifier) {
     // initialize the sync
     conversionTarget.beginSync(tableState);
+    // Persist the latest commit time in table properties for incremental syncs
+    // Syncing metadata must precede the following steps to ensure that the 
metadata is available
+    // before committing
+    TableSyncMetadata latestState =
+        TableSyncMetadata.of(
+            tableState.getLatestCommitTime(),
+            pendingCommits,
+            tableState.getTableFormat(),
+            sourceIdentifier);
+    conversionTarget.syncMetadata(latestState);
     // sync schema updates
     conversionTarget.syncSchema(tableState.getReadSchema());
     // sync partition updates
     conversionTarget.syncPartitionSpec(tableState.getPartitioningFields());
     // Update the files in the target table
     fileSyncMethod.sync(conversionTarget);
-    // Persist the latest commit time in table properties for incremental 
syncs.
-    TableSyncMetadata latestState =
-        TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
-    conversionTarget.syncMetadata(latestState);
     conversionTarget.completeSync();
 
     return SyncResult.builder()
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
 
b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
index 3dcc6fbe..56cce262 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
@@ -44,6 +44,7 @@ class TestTableSyncMetadata {
 
   private static Stream<Arguments> provideMetadataAndJson() {
     return Stream.of(
+        // Old version of metadata and JSON
         Arguments.of(
             TableSyncMetadata.of(
                 Instant.parse("2020-07-04T10:15:30.00Z"),
@@ -56,7 +57,24 @@ class TestTableSyncMetadata {
             
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
         Arguments.of(
             TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), 
null),
-            "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
+            "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"),
+        // New version of metadata and JSON with `sourceTableFormat` and 
`sourceIdentifier` fields
+        Arguments.of(
+            TableSyncMetadata.of(
+                Instant.parse("2020-07-04T10:15:30.00Z"),
+                Arrays.asList(
+                    Instant.parse("2020-08-21T11:15:30.00Z"),
+                    Instant.parse("2024-01-21T12:15:30.00Z")),
+                "TEST",
+                "0"),
+            
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
+        Arguments.of(
+            TableSyncMetadata.of(
+                Instant.parse("2020-07-04T10:15:30.00Z"), 
Collections.emptyList(), "TEST", "0"),
+            
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
+        Arguments.of(
+            TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), 
null, "TEST", "0"),
+            
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
   }
 
   @Test
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
 
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 8503057d..5c7f8cfc 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -53,7 +53,11 @@ public class TestExtractFromSource {
     InternalTable table = 
InternalTable.builder().latestCommitTime(Instant.now()).build();
     List<PartitionFileGroup> dataFiles = Collections.emptyList();
     InternalSnapshot internalSnapshot =
-        
InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build();
+        InternalSnapshot.builder()
+            .table(table)
+            .partitionedDataFiles(dataFiles)
+            .sourceIdentifier("0")
+            .build();
     
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
     assertEquals(internalSnapshot, 
ExtractFromSource.of(mockConversionSource).extractSnapshot());
   }
@@ -86,6 +90,7 @@ public class TestExtractFromSource {
             .tableAsOfChange(tableAtFirstInstant)
             .filesDiff(
                 
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+            .sourceIdentifier("0")
             .build();
     when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
         .thenReturn(tableChangeToReturnAtFirstInstant);
@@ -94,6 +99,7 @@ public class TestExtractFromSource {
             .tableAsOfChange(tableAtFirstInstant)
             .filesDiff(
                 
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+            .sourceIdentifier("0")
             .build();
 
     // add 2 new files, remove 2 files
@@ -110,6 +116,7 @@ public class TestExtractFromSource {
                     .filesAdded(Arrays.asList(newFile2, newFile3))
                     .filesRemoved(Arrays.asList(initialFile3, newFile1))
                     .build())
+            .sourceIdentifier("1")
             .build();
     when(mockConversionSource.getTableChangeForCommit(secondCommitToSync))
         .thenReturn(tableChangeToReturnAtSecondInstant);
@@ -121,6 +128,7 @@ public class TestExtractFromSource {
                     .filesAdded(Arrays.asList(newFile2, newFile3))
                     .filesRemoved(Arrays.asList(initialFile3, newFile1))
                     .build())
+            .sourceIdentifier("1")
             .build();
 
     IncrementalTableChanges actual =
diff --git 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 39480f8b..852d4e13 100644
--- 
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++ 
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -76,6 +76,7 @@ public class TestTableFormatSync {
             .table(startingTableState)
             .partitionedDataFiles(fileGroups)
             .pendingCommits(pendingCommitInstants)
+            .sourceIdentifier("0")
             .build();
     
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
     when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
@@ -109,7 +110,10 @@ public class TestTableFormatSync {
         failureResult.getTableFormatSyncStatus());
 
     verifyBaseConversionTargetCalls(
-        mockConversionTarget2, startingTableState, pendingCommitInstants);
+        mockConversionTarget2,
+        startingTableState,
+        pendingCommitInstants,
+        snapshot.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups);
     verify(mockConversionTarget2).completeSync();
     verify(mockConversionTarget1, never()).completeSync();
@@ -127,15 +131,27 @@ public class TestTableFormatSync {
     InternalTable tableState1 = getTableState(1);
     DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
-        
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState1)
+            .filesDiff(dataFilesDiff1)
+            .sourceIdentifier("0")
+            .build();
     InternalTable tableState2 = getTableState(2);
     DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
     TableChange tableChange2 =
-        
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState2)
+            .filesDiff(dataFilesDiff2)
+            .sourceIdentifier("1")
+            .build();
     InternalTable tableState3 = getTableState(3);
     DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
     TableChange tableChange3 =
-        
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState3)
+            .filesDiff(dataFilesDiff3)
+            .sourceIdentifier("2")
+            .build();
 
     List<Instant> pendingCommitInstants = 
Collections.singletonList(Instant.now());
     
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -154,10 +170,12 @@ public class TestTableFormatSync {
     Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = 
new HashMap<>();
     conversionTargetWithMetadata.put(
         mockConversionTarget1,
-        TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), 
Collections.emptyList()));
+        TableSyncMetadata.of(
+            Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), 
"TEST", "0"));
     conversionTargetWithMetadata.put(
         mockConversionTarget2,
-        TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), 
Collections.emptyList()));
+        TableSyncMetadata.of(
+            Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), 
"TEST", "1"));
 
     Map<String, List<SyncResult>> result =
         TableFormatSync.getInstance()
@@ -201,13 +219,29 @@ public class TestTableFormatSync {
       assertSyncResultTimes(successResults.get(i), start);
     }
 
-    verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget1,
+        tableState1,
+        pendingCommitInstants,
+        tableChange1.getSourceIdentifier());
     verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState1,
+        pendingCommitInstants,
+        tableChange1.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState2,
+        pendingCommitInstants,
+        tableChange2.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState3,
+        pendingCommitInstants,
+        tableChange3.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
     verify(mockConversionTarget1, times(1)).completeSync();
     verify(mockConversionTarget2, times(3)).completeSync();
@@ -219,15 +253,27 @@ public class TestTableFormatSync {
     InternalTable tableState1 = getTableState(1);
     DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
-        
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState1)
+            .filesDiff(dataFilesDiff1)
+            .sourceIdentifier("0")
+            .build();
     InternalTable tableState2 = getTableState(2);
     DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
     TableChange tableChange2 =
-        
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState2)
+            .filesDiff(dataFilesDiff2)
+            .sourceIdentifier("1")
+            .build();
     InternalTable tableState3 = getTableState(3);
     DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
     TableChange tableChange3 =
-        
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState3)
+            .filesDiff(dataFilesDiff3)
+            .sourceIdentifier("2")
+            .build();
 
     List<Instant> pendingCommitInstants = 
Collections.singletonList(Instant.now());
     
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -247,12 +293,17 @@ public class TestTableFormatSync {
         mockConversionTarget1,
         TableSyncMetadata.of(
             tableChange2.getTableAsOfChange().getLatestCommitTime(),
-            
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime())));
+            
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()),
+            "TEST",
+            tableChange2.getSourceIdentifier()));
     // mockConversionTarget2 will have synced the first table change previously
     conversionTargetWithMetadata.put(
         mockConversionTarget2,
         TableSyncMetadata.of(
-            tableChange1.getTableAsOfChange().getLatestCommitTime(), 
Collections.emptyList()));
+            tableChange1.getTableAsOfChange().getLatestCommitTime(),
+            Collections.emptyList(),
+            "TEST",
+            tableChange1.getSourceIdentifier()));
 
     Map<String, List<SyncResult>> result =
         TableFormatSync.getInstance()
@@ -290,15 +341,31 @@ public class TestTableFormatSync {
     }
 
     // conversionTarget1 syncs table changes 1 and 3
-    verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget1,
+        tableState1,
+        pendingCommitInstants,
+        tableChange1.getSourceIdentifier());
     verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
-    verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget1,
+        tableState3,
+        pendingCommitInstants,
+        tableChange3.getSourceIdentifier());
     verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
     verify(mockConversionTarget1, times(2)).completeSync();
     // conversionTarget2 syncs table changes 2 and 3
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState2,
+        pendingCommitInstants,
+        tableChange2.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState3,
+        pendingCommitInstants,
+        tableChange3.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
     verify(mockConversionTarget2, times(2)).completeSync();
   }
@@ -309,7 +376,11 @@ public class TestTableFormatSync {
     InternalTable tableState1 = getTableState(1);
     DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
     TableChange tableChange1 =
-        
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+        TableChange.builder()
+            .tableAsOfChange(tableState1)
+            .filesDiff(dataFilesDiff1)
+            .sourceIdentifier("0")
+            .build();
 
     List<Instant> pendingCommitInstants = Collections.emptyList();
     
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -325,11 +396,13 @@ public class TestTableFormatSync {
     Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = 
new HashMap<>();
     // mockConversionTarget1 will have nothing to sync
     conversionTargetWithMetadata.put(
-        mockConversionTarget1, TableSyncMetadata.of(Instant.now(), 
Collections.emptyList()));
+        mockConversionTarget1,
+        TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST", 
"0"));
     // mockConversionTarget2 will have synced the first table change previously
     conversionTargetWithMetadata.put(
         mockConversionTarget2,
-        TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), 
Collections.emptyList()));
+        TableSyncMetadata.of(
+            Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), 
"TEST", "1"));
 
     Map<String, List<SyncResult>> result =
         TableFormatSync.getInstance()
@@ -348,7 +421,11 @@ public class TestTableFormatSync {
     verify(mockConversionTarget1, never()).syncFilesForDiff(any());
     verify(mockConversionTarget1, never()).completeSync();
 
-    verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, 
pendingCommitInstants);
+    verifyBaseConversionTargetCalls(
+        mockConversionTarget2,
+        tableState1,
+        pendingCommitInstants,
+        tableChange1.getSourceIdentifier());
     verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
   }
 
@@ -389,12 +466,17 @@ public class TestTableFormatSync {
   private void verifyBaseConversionTargetCalls(
       ConversionTarget mockConversionTarget,
       InternalTable startingTableState,
-      List<Instant> pendingCommitInstants) {
+      List<Instant> pendingCommitInstants,
+      String sourceIdentifier) {
     verify(mockConversionTarget).beginSync(startingTableState);
     
verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema());
     
verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields());
     verify(mockConversionTarget)
         .syncMetadata(
-            TableSyncMetadata.of(startingTableState.getLatestCommitTime(), 
pendingCommitInstants));
+            TableSyncMetadata.of(
+                startingTableState.getLatestCommitTime(),
+                pendingCommitInstants,
+                startingTableState.getTableFormat(),
+                sourceIdentifier));
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 140eb8ad..de4a2b69 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -98,6 +98,7 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     return InternalSnapshot.builder()
         .table(table)
         .partitionedDataFiles(getInternalDataFiles(snapshot, 
table.getReadSchema()))
+        .sourceIdentifier(getCommitIdentifier(snapshot.version()))
         .build();
   }
 
@@ -167,7 +168,11 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
             .filesAdded(addedFiles.values())
             .filesRemoved(removedFiles.values())
             .build();
-    return 
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
+    return TableChange.builder()
+        .tableAsOfChange(tableAtVersion)
+        .filesDiff(dataFilesDiff)
+        .sourceIdentifier(getCommitIdentifier(versionNumber))
+        .build();
   }
 
   @Override
@@ -200,6 +205,11 @@ public class DeltaConversionSource implements 
ConversionSource<Long> {
     return deltaCommitInstant.equals(instant) || 
deltaCommitInstant.isBefore(instant);
   }
 
+  @Override
+  public String getCommitIdentifier(Long commit) {
+    return String.valueOf(commit);
+  }
+
   private DeltaIncrementalChangesState getChangesState() {
     return deltaIncrementalChangesState.orElseThrow(
         () -> new IllegalStateException("DeltaIncrementalChangesState is not 
initialized"));
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 343a2d21..fbe99801 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -22,6 +22,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -30,6 +31,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import lombok.extern.log4j.Log4j2;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.SparkSession;
@@ -41,14 +43,17 @@ import org.apache.spark.sql.delta.DeltaConfigs;
 import org.apache.spark.sql.delta.DeltaLog;
 import org.apache.spark.sql.delta.DeltaOperations;
 import org.apache.spark.sql.delta.OptimisticTransaction;
+import org.apache.spark.sql.delta.Snapshot;
 import org.apache.spark.sql.delta.actions.Action;
 import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.CommitInfo;
 import org.apache.spark.sql.delta.actions.Format;
 import org.apache.spark.sql.delta.actions.Metadata;
 import org.apache.spark.sql.delta.actions.RemoveFile;
 
 import scala.Option;
 import scala.Some;
+import scala.Tuple2;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -66,6 +71,7 @@ import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.schema.SparkSchemaExtractor;
 import org.apache.xtable.spi.sync.ConversionTarget;
 
+@Log4j2
 public class DeltaConversionTarget implements ConversionTarget {
   private static final String MIN_READER_VERSION = String.valueOf(1);
   // gets access to generated columns.
@@ -216,6 +222,58 @@ public class DeltaConversionTarget implements 
ConversionTarget {
     return TableFormat.DELTA;
   }
 
+  @Override
+  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+    Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();
+
+    Iterator<Tuple2<Object, Seq<Action>>> versionIterator =
+        JavaConverters.asJavaIteratorConverter(
+                deltaLog.getChanges(currentSnapshot.version(), false))
+            .asJava();
+    while (versionIterator.hasNext()) {
+      Tuple2<Object, Seq<Action>> currentChange = versionIterator.next();
+      Long targetVersion = currentSnapshot.version();
+      List<Action> actions = 
JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava();
+
+      // Find the CommitInfo in the changes belongs to certain version
+      Optional<CommitInfo> commitInfo =
+          actions.stream()
+              .filter(action -> action instanceof CommitInfo)
+              .map(action -> (CommitInfo) action)
+              .findFirst();
+      if (!commitInfo.isPresent()) {
+        continue;
+      }
+
+      Option<scala.collection.immutable.Map<String, String>> tags = 
commitInfo.get().tags();
+      if (tags.isEmpty()) {
+        continue;
+      }
+
+      Option<String> sourceMetadataJson = 
tags.get().get(TableSyncMetadata.XTABLE_METADATA);
+      if (sourceMetadataJson.isEmpty()) {
+        continue;
+      }
+
+      try {
+        Optional<TableSyncMetadata> optionalMetadata =
+            TableSyncMetadata.fromJson(sourceMetadataJson.get());
+        if (!optionalMetadata.isPresent()) {
+          continue;
+        }
+
+        TableSyncMetadata metadata = optionalMetadata.get();
+        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+          return Optional.of(String.valueOf(targetVersion));
+        }
+      } catch (Exception e) {
+        log.warn("Failed to parse commit metadata for commit: {}", 
targetVersion, e);
+      }
+    }
+
+    return Optional.empty();
+  }
+
   @EqualsAndHashCode
   @ToString
   private class TransactionState {
@@ -265,7 +323,8 @@ public class DeltaConversionTarget implements 
ConversionTarget {
       transaction.updateMetadata(metadata, false);
       transaction.commit(
           actions,
-          new 
DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))));
+          new 
DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))),
+          ScalaUtils.convertJavaMapToScala(getCommitTags()));
     }
 
     private Map<String, String> getConfigurationsForDeltaSync() {
@@ -301,5 +360,9 @@ public class DeltaConversionTarget implements 
ConversionTarget {
       // fallback to existing deltalog value
       return deltaLog.snapshot().metadata().format();
     }
+
+    private Map<String, String> getCommitTags() {
+      return Collections.singletonMap(TableSyncMetadata.XTABLE_METADATA, 
metadata.toJson());
+    }
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index 02423c2d..cb65c341 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -114,6 +114,7 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
                     hoodieInstant ->
                         
HudiInstantUtils.parseFromInstantTime(hoodieInstant.getTimestamp()))
                 .collect(CustomCollectors.toList(pendingInstants.size())))
+        .sourceIdentifier(getCommitIdentifier(latestCommit))
         .build();
   }
 
@@ -130,6 +131,7 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
         .filesDiff(
             dataFileExtractor.getDiffForCommit(
                 hoodieInstantForDiff, table, hoodieInstantForDiff, 
visibleTimeline))
+        .sourceIdentifier(getCommitIdentifier(hoodieInstantForDiff))
         .build();
   }
 
@@ -161,6 +163,11 @@ public class HudiConversionSource implements 
ConversionSource<HoodieInstant> {
     return doesCommitExistsAsOfInstant(instant) && 
!isAffectedByCleanupProcess(instant);
   }
 
+  @Override
+  public String getCommitIdentifier(HoodieInstant commit) {
+    return commit.getTimestamp();
+  }
+
   private boolean doesCommitExistsAsOfInstant(Instant instant) {
     HoodieInstant hoodieInstant = getCommitAtInstant(instant);
     return hoodieInstant != null;
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index c3ef6f92..97c2f9e3 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -54,13 +54,13 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ExternalFilePathUtil;
@@ -287,28 +287,7 @@ public class HudiConversionTarget implements 
ConversionTarget {
                 .filterCompletedInstants()
                 .lastInstant()
                 .toJavaOptional()
-                .map(
-                    instant -> {
-                      try {
-                        if 
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
-                          return HoodieReplaceCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieReplaceCommitMetadata.class)
-                              .getExtraMetadata();
-                        } else {
-                          return HoodieCommitMetadata.fromBytes(
-                                  
client.getActiveTimeline().getInstantDetails(instant).get(),
-                                  HoodieCommitMetadata.class)
-                              .getExtraMetadata();
-                        }
-                      } catch (IOException ex) {
-                        throw new ReadException("Unable to read Hudi commit 
metadata", ex);
-                      }
-                    })
-                .flatMap(
-                    metadata ->
-                        TableSyncMetadata.fromJson(
-                            metadata.get(TableSyncMetadata.XTABLE_METADATA))));
+                .flatMap(instant -> getMetadata(instant, client)));
   }
 
   @Override
@@ -316,11 +295,56 @@ public class HudiConversionTarget implements 
ConversionTarget {
     return TableFormat.HUDI;
   }
 
+  @Override
+  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+    if (!metaClient.isPresent()) {
+      return Optional.empty();
+    }
+    return getTargetCommitIdentifier(sourceIdentifier, metaClient.get());
+  }
+
+  Optional<String> getTargetCommitIdentifier(
+      String sourceIdentifier, HoodieTableMetaClient metaClient) {
+
+    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline();
+
+    for (HoodieInstant instant : commitTimeline.getInstants()) {
+      try {
+        Optional<TableSyncMetadata> optionalMetadata = getMetadata(instant, 
metaClient);
+        if (!optionalMetadata.isPresent()) {
+          continue;
+        }
+
+        TableSyncMetadata metadata = optionalMetadata.get();
+        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+          return Optional.of(instant.getTimestamp());
+        }
+      } catch (Exception e) {
+        log.warn("Failed to parse commit metadata for instant: {}", instant, 
e);
+      }
+    }
+    return Optional.empty();
+  }
+
   private HoodieTableMetaClient getMetaClient() {
     return metaClient.orElseThrow(
         () -> new IllegalStateException("beginSync must be called before 
calling this method"));
   }
 
+  private Optional<TableSyncMetadata> getMetadata(
+      HoodieInstant instant, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieCommitMetadata commitMetadata =
+          TimelineUtils.getCommitMetadata(instant, 
metaClient.getActiveTimeline());
+      String sourceMetadataJson =
+          
commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA);
+      return TableSyncMetadata.fromJson(sourceMetadataJson);
+    } catch (Exception e) {
+      log.warn("Failed to parse commit metadata for instant: {}", instant, e);
+      return Optional.empty();
+    }
+  }
+
   static class CommitState {
     private HoodieTableMetaClient metaClient;
     @Getter private final String instantTime;
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 0d400e28..cb75b6d0 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -165,6 +165,7 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
         .version(String.valueOf(currentSnapshot.snapshotId()))
         .table(irTable)
         .partitionedDataFiles(partitionedDataFiles)
+        .sourceIdentifier(getCommitIdentifier(currentSnapshot))
         .build();
   }
 
@@ -196,7 +197,11 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
         
DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build();
 
     InternalTable table = getTable(snapshot);
-    return 
TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build();
+    return TableChange.builder()
+        .tableAsOfChange(table)
+        .filesDiff(filesDiff)
+        .sourceIdentifier(getCommitIdentifier(snapshot))
+        .build();
   }
 
   @Override
@@ -265,6 +270,11 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
     return true;
   }
 
+  @Override
+  public String getCommitIdentifier(Snapshot commit) {
+    return String.valueOf(commit.snapshotId());
+  }
+
   @Override
   public void close() {
     getTableOps().close();
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index ecdbfa26..69faf1ea 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -21,6 +21,7 @@ package org.apache.xtable.iceberg;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import lombok.extern.log4j.Log4j2;
@@ -66,6 +67,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
   private Transaction transaction;
   private Table table;
   private InternalTable internalTableState;
+  private TableSyncMetadata tableSyncMetadata;
 
   public IcebergConversionTarget() {}
 
@@ -174,6 +176,8 @@ public class IcebergConversionTarget implements 
ConversionTarget {
 
   @Override
   public void syncMetadata(TableSyncMetadata metadata) {
+    tableSyncMetadata = metadata;
+
     UpdateProperties updateProperties = transaction.updateProperties();
     updateProperties.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
     if (!table.properties().containsKey(TableProperties.WRITE_DATA_LOCATION)) {
@@ -200,13 +204,18 @@ public class IcebergConversionTarget implements 
ConversionTarget {
         transaction,
         partitionedDataFiles,
         transaction.table().schema(),
-        transaction.table().spec());
+        transaction.table().spec(),
+        tableSyncMetadata);
   }
 
   @Override
   public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
     dataFileUpdatesExtractor.applyDiff(
-        transaction, dataFilesDiff, transaction.table().schema(), 
transaction.table().spec());
+        transaction,
+        dataFilesDiff,
+        transaction.table().schema(),
+        transaction.table().spec(),
+        tableSyncMetadata);
   }
 
   @Override
@@ -221,6 +230,7 @@ public class IcebergConversionTarget implements 
ConversionTarget {
     transaction.commitTransaction();
     transaction = null;
     internalTableState = null;
+    tableSyncMetadata = null;
   }
 
   private void safeDelete(String file) {
@@ -242,6 +252,33 @@ public class IcebergConversionTarget implements 
ConversionTarget {
     return TableFormat.ICEBERG;
   }
 
+  @Override
+  public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+    for (Snapshot snapshot : table.snapshots()) {
+      Map<String, String> summary = snapshot.summary();
+      String sourceMetadataJson = 
summary.get(TableSyncMetadata.XTABLE_METADATA);
+      if (sourceMetadataJson == null) {
+        continue;
+      }
+
+      try {
+        Optional<TableSyncMetadata> optionalMetadata =
+            TableSyncMetadata.fromJson(sourceMetadataJson);
+        if (!optionalMetadata.isPresent()) {
+          continue;
+        }
+
+        TableSyncMetadata metadata = optionalMetadata.get();
+        if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+          return Optional.of(String.valueOf(snapshot.snapshotId()));
+        }
+      } catch (Exception e) {
+        log.warn("Failed to parse parse snapshot metadata for {}", 
snapshot.snapshotId(), e);
+      }
+    }
+    return Optional.empty();
+  }
+
   private void rollbackCorruptCommits() {
     if (table == null) {
       // there is no existing table so exit early
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 80e1559f..0c8fa9a4 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.exception.ReadException;
 import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.storage.DataFilesDiff;
 import org.apache.xtable.model.storage.FilesDiff;
 import org.apache.xtable.model.storage.InternalDataFile;
@@ -46,7 +47,8 @@ public class IcebergDataFileUpdatesSync {
       Transaction transaction,
       List<PartitionFileGroup> partitionedDataFiles,
       Schema schema,
-      PartitionSpec partitionSpec) {
+      PartitionSpec partitionSpec,
+      TableSyncMetadata metadata) {
 
     Map<String, DataFile> previousFiles = new HashMap<>();
     try (CloseableIterable<FileScanTask> iterator = 
table.newScan().planFiles()) {
@@ -60,21 +62,24 @@ public class IcebergDataFileUpdatesSync {
     FilesDiff<InternalDataFile, DataFile> diff =
         DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, 
previousFiles);
 
-    applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(), 
schema, partitionSpec);
+    applyDiff(
+        transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema, 
partitionSpec, metadata);
   }
 
   public void applyDiff(
       Transaction transaction,
       DataFilesDiff dataFilesDiff,
       Schema schema,
-      PartitionSpec partitionSpec) {
+      PartitionSpec partitionSpec,
+      TableSyncMetadata metadata) {
 
     Collection<DataFile> filesRemoved =
         dataFilesDiff.getFilesRemoved().stream()
             .map(file -> getDataFile(partitionSpec, schema, file))
             .collect(Collectors.toList());
 
-    applyDiff(transaction, dataFilesDiff.getFilesAdded(), filesRemoved, 
schema, partitionSpec);
+    applyDiff(
+        transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema, 
partitionSpec, metadata);
   }
 
   private void applyDiff(
@@ -82,10 +87,12 @@ public class IcebergDataFileUpdatesSync {
       Collection<InternalDataFile> filesAdded,
       Collection<DataFile> filesRemoved,
       Schema schema,
-      PartitionSpec partitionSpec) {
+      PartitionSpec partitionSpec,
+      TableSyncMetadata metadata) {
     OverwriteFiles overwriteFiles = transaction.newOverwrite();
     filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, 
schema, f)));
     filesRemoved.forEach(overwriteFiles::deleteFile);
+    overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
     overwriteFiles.commit();
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 0f34103e..9bf8f010 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -94,7 +94,7 @@ public class TestConversionController {
   void testAllSnapshotSyncAsPerConfig() {
     SyncMode syncMode = SyncMode.FULL;
     InternalTable internalTable = getInternalTable();
-    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", 
"0");
     Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
     SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
     Map<String, SyncResult> perTableResults = new HashMap<>();
@@ -172,16 +172,19 @@ public class TestConversionController {
     CommitsBacklog<Instant> commitsBacklog =
         
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
     Optional<TableSyncMetadata> conversionTarget1Metadata =
-        Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, 
pendingInstantsForIceberg));
+        Optional.of(
+            TableSyncMetadata.of(icebergLastSyncInstant, 
pendingInstantsForIceberg, "TEST", "0"));
     
when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata);
     Optional<TableSyncMetadata> conversionTarget2Metadata =
-        Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, 
pendingInstantsForDelta));
+        Optional.of(
+            TableSyncMetadata.of(deltaLastSyncInstant, 
pendingInstantsForDelta, "TEST", "0"));
     
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
     when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
         .thenReturn(commitsBacklog);
     List<TableChange> tableChanges = new ArrayList<>();
-    for (Instant instant : instantsToProcess) {
-      TableChange tableChange = getTableChange(instant);
+    for (int i = 0; i < instantsToProcess.size(); i++) {
+      Instant instant = instantsToProcess.get(i);
+      TableChange tableChange = getTableChange(instant, String.valueOf(i));
       tableChanges.add(tableChange);
       
when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange);
     }
@@ -224,7 +227,7 @@ public class TestConversionController {
     SyncMode syncMode = SyncMode.INCREMENTAL;
     InternalTable internalTable = getInternalTable();
     Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
-    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", 
"0");
     SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
     Map<String, SyncResult> syncResults = new HashMap<>();
     syncResults.put(TableFormat.ICEBERG, syncResult);
@@ -247,9 +250,11 @@ public class TestConversionController {
 
     // Both Iceberg and Delta last synced at instantAt5 and have no pending 
instants.
     when(mockConversionTarget1.getTableMetadata())
-        .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5, 
Collections.emptyList())));
+        .thenReturn(
+            Optional.of(TableSyncMetadata.of(instantAt5, 
Collections.emptyList(), "TEST", "0")));
     when(mockConversionTarget2.getTableMetadata())
-        .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5, 
Collections.emptyList())));
+        .thenReturn(
+            Optional.of(TableSyncMetadata.of(instantAt5, 
Collections.emptyList(), "TEST", "0")));
 
     
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
     when(tableFormatSync.syncSnapshot(
@@ -309,22 +314,26 @@ public class TestConversionController {
         
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
     when(mockConversionTarget1.getTableMetadata())
         .thenReturn(
-            Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, 
pendingInstantsForIceberg)));
+            Optional.of(
+                TableSyncMetadata.of(
+                    icebergLastSyncInstant, pendingInstantsForIceberg, "TEST", 
"0")));
     Optional<TableSyncMetadata> conversionTarget2Metadata =
-        Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, 
pendingInstantsForDelta));
+        Optional.of(
+            TableSyncMetadata.of(deltaLastSyncInstant, 
pendingInstantsForDelta, "TEST", "0"));
     
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
     when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
         .thenReturn(commitsBacklog);
     List<TableChange> tableChanges = new ArrayList<>();
-    for (Instant instant : instantsToProcess) {
-      TableChange tableChange = getTableChange(instant);
+    for (int i = 0; i < instantsToProcess.size(); i++) {
+      Instant instant = instantsToProcess.get(i);
+      TableChange tableChange = getTableChange(instant, String.valueOf(i));
       tableChanges.add(tableChange);
       
when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange);
     }
     // Iceberg needs to sync by snapshot since instant15 is affected by table 
clean-up.
     InternalTable internalTable = getInternalTable();
     Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
-    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", 
"0");
     SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
     Map<String, SyncResult> snapshotResult =
         Collections.singletonMap(TableFormat.ICEBERG, syncResult);
@@ -388,10 +397,12 @@ public class TestConversionController {
     CommitsBacklog<Instant> commitsBacklog =
         
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
     Optional<TableSyncMetadata> conversionTarget1Metadata =
-        Optional.of(TableSyncMetadata.of(icebergLastSyncInstant, 
Collections.emptyList()));
+        Optional.of(
+            TableSyncMetadata.of(icebergLastSyncInstant, 
Collections.emptyList(), "TEST", "0"));
     
when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata);
     Optional<TableSyncMetadata> conversionTarget2Metadata =
-        Optional.of(TableSyncMetadata.of(deltaLastSyncInstant, 
Collections.emptyList()));
+        Optional.of(
+            TableSyncMetadata.of(deltaLastSyncInstant, 
Collections.emptyList(), "TEST", "0"));
     
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
     when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
         .thenReturn(commitsBacklog);
@@ -426,7 +437,7 @@ public class TestConversionController {
     List<TargetCatalogConfig> targetCatalogs =
         Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
     InternalTable internalTable = getInternalTable();
-    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+    InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1", 
"0");
     // Conversion source and target mocks.
     ConversionConfig conversionConfig =
         getTableSyncConfig(
@@ -513,8 +524,11 @@ public class TestConversionController {
         .collect(Collectors.toList());
   }
 
-  private TableChange getTableChange(Instant instant) {
-    return 
TableChange.builder().tableAsOfChange(getInternalTable(instant)).build();
+  private TableChange getTableChange(Instant instant, String sourceIdentifier) 
{
+    return TableChange.builder()
+        .tableAsOfChange(getInternalTable(instant))
+        .sourceIdentifier(sourceIdentifier)
+        .build();
   }
 
   private SyncResult buildSyncResult(SyncMode syncMode, Instant 
lastSyncedInstant) {
@@ -536,8 +550,13 @@ public class TestConversionController {
         .build();
   }
 
-  private InternalSnapshot buildSnapshot(InternalTable internalTable, String 
version) {
-    return 
InternalSnapshot.builder().table(internalTable).version(version).build();
+  private InternalSnapshot buildSnapshot(
+      InternalTable internalTable, String version, String sourceIdentifier) {
+    return InternalSnapshot.builder()
+        .table(internalTable)
+        .version(version)
+        .sourceIdentifier(sourceIdentifier)
+        .build();
   }
 
   private InternalTable getInternalTable() {
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index f0f889d2..7e921efe 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -19,6 +19,7 @@
 package org.apache.xtable.delta;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -81,6 +83,7 @@ import io.delta.standalone.types.StringType;
 import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
@@ -159,8 +162,8 @@ public class TestDeltaSync {
     InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), 
basePath);
     InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), 
basePath);
 
-    InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
-    InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
+    InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, 
dataFile2);
+    InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, 
dataFile3);
 
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -214,7 +217,7 @@ public class TestDeltaSync {
     EqualTo equalToExpr =
         new EqualTo(new Column("string_field", new StringType()), 
Literal.of("warning"));
 
-    InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -294,7 +297,7 @@ public class TestDeltaSync {
     EqualTo equalToExpr2 = new EqualTo(new Column("int_field", new 
IntegerType()), Literal.of(20));
     And CombinedExpr = new And(equalToExpr1, equalToExpr2);
 
-    InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
     validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2)), 
CombinedExpr);
@@ -337,7 +340,7 @@ public class TestDeltaSync {
     InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);
 
-    InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     TableFormatSync.getInstance()
         .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -367,6 +370,67 @@ public class TestDeltaSync {
             .contains(String.format("xtable_partition_col_%s_timestamp_field", 
transformType)));
   }
 
+  @Test
+  public void testSourceTargetIdMapping() throws Exception {
+    InternalSchema baseSchema = getInternalSchema();
+    InternalTable sourceTable =
+        getInternalTable("source_table", basePath, baseSchema, null, 
LAST_COMMIT_TIME);
+
+    InternalDataFile sourceDataFile1 = getDataFile(101, 
Collections.emptyList(), basePath);
+    InternalDataFile sourceDataFile2 = getDataFile(102, 
Collections.emptyList(), basePath);
+    InternalDataFile sourceDataFile3 = getDataFile(103, 
Collections.emptyList(), basePath);
+
+    InternalSnapshot sourceSnapshot1 =
+        buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2);
+    InternalSnapshot sourceSnapshot2 =
+        buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3);
+
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), 
sourceSnapshot1);
+    Optional<String> mappedTargetId1 =
+        
conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier());
+    validateDeltaTable(
+        basePath, new HashSet<>(Arrays.asList(sourceDataFile1, 
sourceDataFile2)), null);
+    assertTrue(mappedTargetId1.isPresent());
+    assertEquals("0", mappedTargetId1.get());
+
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), 
sourceSnapshot2);
+    Optional<String> mappedTargetId2 =
+        
conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier());
+    validateDeltaTable(
+        basePath, new HashSet<>(Arrays.asList(sourceDataFile2, 
sourceDataFile3)), null);
+    assertTrue(mappedTargetId2.isPresent());
+    assertEquals("1", mappedTargetId2.get());
+
+    Optional<String> unmappedTargetId = 
conversionTarget.getTargetCommitIdentifier("s3");
+    assertFalse(unmappedTargetId.isPresent());
+  }
+
+  @Test
+  public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws 
Exception {
+    InternalSchema baseSchema = getInternalSchema();
+    InternalTable internalTable =
+        getInternalTable("source_table", basePath, baseSchema, null, 
LAST_COMMIT_TIME);
+    InternalDataFile sourceDataFile = getDataFile(101, 
Collections.emptyList(), basePath);
+    InternalSnapshot snapshot = buildSnapshot(internalTable, "0", 
sourceDataFile);
+
+    // Mock the snapshot sync process like getSyncResult()
+    conversionTarget.beginSync(internalTable);
+    TableSyncMetadata tableSyncMetadata =
+        TableSyncMetadata.of(internalTable.getLatestCommitTime(), 
snapshot.getPendingCommits());
+    conversionTarget.syncMetadata(tableSyncMetadata);
+    conversionTarget.syncSchema(internalTable.getReadSchema());
+    conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields());
+    conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles());
+    conversionTarget.completeSync();
+
+    // No crash should happen during the process
+    Optional<String> unmappedTargetId = 
conversionTarget.getTargetCommitIdentifier("0");
+    // The targetIdentifier is expected to not be found
+    assertFalse(unmappedTargetId.isPresent());
+  }
+
   private static Stream<Arguments> timestampPartitionTestingArgs() {
     return Stream.of(
         Arguments.of(PartitionTransformType.YEAR),
@@ -408,10 +472,12 @@ public class TestDeltaSync {
         internalDataFiles.size(), count, "Number of files from DeltaScan don't 
match expectation");
   }
 
-  private InternalSnapshot buildSnapshot(InternalTable table, 
InternalDataFile... dataFiles) {
+  private InternalSnapshot buildSnapshot(
+      InternalTable table, String sourceIdentifier, InternalDataFile... 
dataFiles) {
     return InternalSnapshot.builder()
         .table(table)
         
.partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+        .sourceIdentifier(sourceIdentifier)
         .build();
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
index 03bb6e2c..82125d25 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
@@ -22,6 +22,8 @@ import static 
org.apache.xtable.hudi.HudiTestUtil.createWriteStatus;
 import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig;
 import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
 import java.time.Duration;
@@ -32,6 +34,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -204,7 +207,8 @@ public class ITHudiConversionTarget {
     targetClient.syncFilesForDiff(dataFilesDiff);
     targetClient.syncSchema(SCHEMA);
     TableSyncMetadata latestState =
-        TableSyncMetadata.of(initialState.getLatestCommitTime(), 
Collections.emptyList());
+        TableSyncMetadata.of(
+            initialState.getLatestCommitTime(), Collections.emptyList(), 
"TEST", "0");
     targetClient.syncMetadata(latestState);
     targetClient.completeSync();
 
@@ -243,7 +247,8 @@ public class ITHudiConversionTarget {
     targetClient.beginSync(initialState);
     targetClient.syncFilesForSnapshot(snapshot);
     TableSyncMetadata latestState =
-        TableSyncMetadata.of(initialState.getLatestCommitTime(), 
Collections.emptyList());
+        TableSyncMetadata.of(
+            initialState.getLatestCommitTime(), Collections.emptyList(), 
"TEST", "0");
     targetClient.syncSchema(initialState.getReadSchema());
     targetClient.syncMetadata(latestState);
     targetClient.completeSync();
@@ -288,7 +293,8 @@ public class ITHudiConversionTarget {
     targetClient.beginSync(initialState);
     targetClient.syncFilesForSnapshot(snapshot);
     TableSyncMetadata latestState =
-        TableSyncMetadata.of(initialState.getLatestCommitTime(), 
Collections.emptyList());
+        TableSyncMetadata.of(
+            initialState.getLatestCommitTime(), Collections.emptyList(), 
"TEST", "0");
     targetClient.syncMetadata(latestState);
     targetClient.syncSchema(initialState.getReadSchema());
     targetClient.completeSync();
@@ -311,7 +317,8 @@ public class ITHudiConversionTarget {
         targetClient,
         Collections.singletonList(getTestFile(partitionPath, fileName2)),
         Collections.singletonList(getTestFile(partitionPath, fileName1)),
-        Instant.now().minus(12, ChronoUnit.HOURS));
+        Instant.now().minus(12, ChronoUnit.HOURS),
+        "1");
 
     assertFileGroupCorrectness(
         metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName2, 
filePath2)));
@@ -331,7 +338,9 @@ public class ITHudiConversionTarget {
         targetClient,
         Collections.singletonList(getTestFile(partitionPath, fileName3)),
         Collections.singletonList(getTestFile(partitionPath, fileName2)),
-        Instant.now().minus(8, ChronoUnit.HOURS));
+        Instant.now().minus(8, ChronoUnit.HOURS),
+        "2");
+    
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
 
     // create a commit that just adds fileName4
     String fileName4 = "file_4.parquet";
@@ -340,7 +349,9 @@ public class ITHudiConversionTarget {
         targetClient,
         Collections.singletonList(getTestFile(partitionPath, fileName4)),
         Collections.emptyList(),
-        Instant.now());
+        Instant.now(),
+        "3");
+    
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
 
     // create another commit that should trigger archival of the first two 
commits
     String fileName5 = "file_5.parquet";
@@ -349,7 +360,9 @@ public class ITHudiConversionTarget {
         targetClient,
         Collections.singletonList(getTestFile(partitionPath, fileName5)),
         Collections.emptyList(),
-        Instant.now());
+        Instant.now(),
+        "4");
+    
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
 
     assertFileGroupCorrectness(
         metaClient,
@@ -372,18 +385,147 @@ public class ITHudiConversionTarget {
         2, 
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"partition_path", ""})
+  void testSourceTargetMappingWithSnapshotAndIncrementalSync(String 
partitionPath) {
+    // Step 1: Initialize Test Files for Initial Snapshot
+    String fileName0 = "file_0.parquet";
+    String fileName1 = "file_1.parquet";
+
+    List<PartitionFileGroup> initialSnapshot =
+        Collections.singletonList(
+            PartitionFileGroup.builder()
+                .files(
+                    Arrays.asList(
+                        getTestFile(partitionPath, fileName0),
+                        getTestFile(partitionPath, fileName1)))
+                .partitionValues(
+                    Collections.singletonList(
+                        PartitionValue.builder()
+                            .partitionField(PARTITION_FIELD)
+                            .range(Range.scalar("partitionPath"))
+                            .build()))
+                .build());
+
+    // Step 2: Sync Initial Snapshot
+    InternalTable initialState = getState(Instant.now().minus(24, 
ChronoUnit.HOURS));
+    HudiConversionTarget targetClient = getTargetClient();
+    targetClient.beginSync(initialState);
+    targetClient.syncFilesForSnapshot(initialSnapshot);
+    TableSyncMetadata latestState =
+        TableSyncMetadata.of(
+            initialState.getLatestCommitTime(), Collections.emptyList(), 
"TEST", "0");
+    targetClient.syncMetadata(latestState);
+    targetClient.syncSchema(initialState.getReadSchema());
+    targetClient.completeSync();
+
+    // Step 3: Verify Source-Target Mapping for Initial Snapshot
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+
+    Optional<String> initialTargetIdentifier =
+        
targetClient.getTargetCommitIdentifier(latestState.getSourceIdentifier(), 
metaClient);
+    assertTrue(initialTargetIdentifier.isPresent());
+    assertEquals(
+        initialTargetIdentifier.get(),
+        metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+    // Step 4: Perform Incremental Sync (Remove file1, Add file2)
+    String fileName2 = "file_2.parquet";
+    incrementalSync(
+        targetClient,
+        Collections.singletonList(getTestFile(partitionPath, fileName2)), // 
Adding file2
+        Collections.singletonList(getTestFile(partitionPath, fileName1)), // 
Removing file1
+        Instant.now().minus(12, ChronoUnit.HOURS),
+        "1"); // Incremental commit ID = "1"
+
+    // Step 5: Verify Source-Target Mapping for Incremental Sync
+    metaClient.reloadActiveTimeline();
+    Optional<String> incrementalTargetIdentifier =
+        targetClient.getTargetCommitIdentifier("1", metaClient);
+    assertTrue(incrementalTargetIdentifier.isPresent());
+    assertEquals(
+        incrementalTargetIdentifier.get(),
+        metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+    // Step 6: Perform Another Incremental Sync (Remove file2, Add file3)
+    String fileName3 = "file_3.parquet";
+    incrementalSync(
+        targetClient,
+        Collections.singletonList(getTestFile(partitionPath, fileName3)), // 
Adding file3
+        Collections.singletonList(getTestFile(partitionPath, fileName2)), // 
Removing file2
+        Instant.now().minus(8, ChronoUnit.HOURS),
+        "2"); // Incremental commit ID = "2"
+
+    // Step 7: Verify Source-Target Mapping for Second Incremental Sync
+    metaClient.reloadActiveTimeline();
+    Optional<String> incrementalTargetIdentifier2 =
+        targetClient.getTargetCommitIdentifier("2", metaClient);
+    assertTrue(incrementalTargetIdentifier2.isPresent());
+    assertEquals(
+        incrementalTargetIdentifier2.get(),
+        metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+    // Step 8: Verify Non-Existent Source ID Returns Empty
+    Optional<String> nonExistentTargetIdentifier =
+        targetClient.getTargetCommitIdentifier("3", metaClient);
+    assertFalse(nonExistentTargetIdentifier.isPresent());
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"partition_path", ""})
+  void testGetTargetCommitIdentifierWithNullSourceIdentifier(String 
partitionPath) {
+    // Initialize Test Files and Snapshot
+    String fileName0 = "file_0.parquet";
+    String fileName1 = "file_1.parquet";
+
+    List<PartitionFileGroup> initialSnapshot =
+        Collections.singletonList(
+            PartitionFileGroup.builder()
+                .files(
+                    Arrays.asList(
+                        getTestFile(partitionPath, fileName0),
+                        getTestFile(partitionPath, fileName1)))
+                .partitionValues(
+                    Collections.singletonList(
+                        PartitionValue.builder()
+                            .partitionField(PARTITION_FIELD)
+                            .range(Range.scalar("partitionPath"))
+                            .build()))
+                .build());
+    InternalTable internalTable = getState(Instant.now().minus(24, 
ChronoUnit.HOURS));
+    HudiConversionTarget targetClient = getTargetClient();
+
+    targetClient.beginSync(internalTable);
+    targetClient.syncFilesForSnapshot(initialSnapshot);
+    TableSyncMetadata tableSyncMetadata =
+        TableSyncMetadata.of(internalTable.getLatestCommitTime(), 
Collections.emptyList());
+    targetClient.syncMetadata(tableSyncMetadata);
+    targetClient.syncSchema(internalTable.getReadSchema());
+    targetClient.completeSync();
+
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+    // No crash should happen during the process
+    Optional<String> targetIdentifier = 
targetClient.getTargetCommitIdentifier("0", metaClient);
+    // The targetIdentifier is expected to not be found
+    assertFalse(targetIdentifier.isPresent());
+  }
+
   private TableSyncMetadata incrementalSync(
       ConversionTarget conversionTarget,
       List<InternalDataFile> filesToAdd,
       List<InternalDataFile> filesToRemove,
-      Instant commitStart) {
+      Instant commitStart,
+      String sourceIdentifier) {
     DataFilesDiff dataFilesDiff2 =
         
DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
     InternalTable state3 = getState(commitStart);
     conversionTarget.beginSync(state3);
     conversionTarget.syncFilesForDiff(dataFilesDiff2);
     TableSyncMetadata latestState =
-        TableSyncMetadata.of(state3.getLatestCommitTime(), 
Collections.emptyList());
+        TableSyncMetadata.of(
+            state3.getLatestCommitTime(), Collections.emptyList(), "TEST", 
sourceIdentifier);
     conversionTarget.syncMetadata(latestState);
     conversionTarget.completeSync();
     return latestState;
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
index fe20db05..da1ec033 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
@@ -141,7 +141,8 @@ public class TestHudiConversionTarget {
     HudiConversionTarget targetClient = getTargetClient(null);
     HudiConversionTarget.CommitState mockCommitState =
         initMocksForBeginSync(targetClient).getLeft();
-    TableSyncMetadata metadata = TableSyncMetadata.of(COMMIT_TIME, 
Collections.emptyList());
+    TableSyncMetadata metadata =
+        TableSyncMetadata.of(COMMIT_TIME, Collections.emptyList(), "TEST", 
"0");
     targetClient.syncMetadata(metadata);
     // validate that metadata is set in commitState
     verify(mockCommitState).setTableSyncMetadata(metadata);
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index bd36dde9..c02d7f26 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -227,8 +227,8 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
     InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
     InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
-    InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
-    InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
+    InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, 
dataFile2);
+    InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, 
dataFile3);
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
     ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -322,9 +322,9 @@ public class TestIcebergSync {
     InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
     InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
     InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList());
-    InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
-    InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
-    InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4);
+    InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, 
dataFile2);
+    InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, 
dataFile3);
+    InternalSnapshot snapshot3 = buildSnapshot(table2, "2", dataFile3, 
dataFile4);
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
     ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -398,7 +398,7 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
     InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
-    InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     when(mockSchemaExtractor.toIceberg(internalSchema))
         .thenReturn(icebergSchema)
@@ -461,7 +461,7 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
     InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
-    InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     PartitionSpec partitionSpec =
@@ -515,7 +515,7 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
     InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
-    InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     PartitionSpec partitionSpec =
@@ -590,7 +590,7 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
     InternalDataFile dataFile2 = getDataFile(2, partitionValues2);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues3);
-    InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     PartitionSpec partitionSpec =
@@ -656,7 +656,7 @@ public class TestIcebergSync {
     InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
     InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
     InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
-    InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, 
dataFile3);
+    InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, 
dataFile2, dataFile3);
 
     
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
     PartitionSpec partitionSpec =
@@ -681,10 +681,111 @@ public class TestIcebergSync {
         Expressions.equal(partitionField.getSourceField().getPath(), 
"value1"));
   }
 
-  private InternalSnapshot buildSnapshot(InternalTable table, 
InternalDataFile... dataFiles) {
+  @Test
+  public void testSourceTargetMapping() throws Exception {
+    // Prepare schemas
+    List<InternalField> fields2 = new ArrayList<>(internalSchema.getFields());
+    fields2.add(
+        InternalField.builder()
+            .name("long_field")
+            
.schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build())
+            .build());
+    InternalSchema schema2 = 
internalSchema.toBuilder().fields(fields2).build();
+    List<Types.NestedField> fields = new ArrayList<>(icebergSchema.columns());
+    fields.add(Types.NestedField.of(6, false, "long_field", 
Types.LongType.get()));
+    Schema icebergSchema2 = new Schema(fields);
+
+    InternalTable table1 =
+        getInternalTable(tableName, basePath, internalSchema, null, 
LAST_COMMIT_TIME);
+    InternalTable table2 = getInternalTable(tableName, basePath, schema2, 
null, LAST_COMMIT_TIME);
+
+    // Create data files and snapshots
+    InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+    InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
+    InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
+    InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, 
dataFile2);
+    InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, 
dataFile3);
+
+    
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
+    when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
+    when(mockPartitionSpecExtractor.toIceberg(eq(null), any()))
+        .thenReturn(PartitionSpec.unpartitioned());
+
+    mockColStatsForFile(dataFile1, 2);
+    mockColStatsForFile(dataFile2, 2);
+    mockColStatsForFile(dataFile3, 1);
+
+    // Sync first snapshot
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+    Optional<String> targetIdentifier1 =
+        
conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier());
+    assertTrue(targetIdentifier1.isPresent());
+    assertEquals(
+        targetIdentifier1.get(), 
String.valueOf(getTable(basePath).currentSnapshot().snapshotId()));
+
+    // Sync second snapshot
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+    Optional<String> targetIdentifier2 =
+        
conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier());
+    assertTrue(targetIdentifier2.isPresent());
+    assertEquals(
+        targetIdentifier2.get(), 
String.valueOf(getTable(basePath).currentSnapshot().snapshotId()));
+
+    // Validate that an unknown source ID returns empty
+    Optional<String> emptyTargetIdentifier = 
conversionTarget.getTargetCommitIdentifier("999");
+    assertFalse(emptyTargetIdentifier.isPresent());
+  }
+
+  @Test
+  public void testGetTargetCommitIdentifierWithNullSourceIdentifier() {
+    // Prepare schemas
+    List<InternalField> fields2 = new ArrayList<>(internalSchema.getFields());
+    fields2.add(
+        InternalField.builder()
+            .name("long_field")
+            
.schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build())
+            .build());
+    InternalSchema schema2 = 
internalSchema.toBuilder().fields(fields2).build();
+    List<Types.NestedField> fields = new ArrayList<>(icebergSchema.columns());
+    fields.add(Types.NestedField.of(6, false, "long_field", 
Types.LongType.get()));
+    Schema icebergSchema2 = new Schema(fields);
+
+    InternalTable internalTable =
+        getInternalTable(tableName, basePath, internalSchema, null, 
LAST_COMMIT_TIME);
+
+    // Create data files and snapshots
+    InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+    InternalSnapshot snapshot = buildSnapshot(internalTable, "0", dataFile1);
+    
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
+    when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
+    when(mockPartitionSpecExtractor.toIceberg(eq(null), any()))
+        .thenReturn(PartitionSpec.unpartitioned());
+    mockColStatsForFile(dataFile1, 2);
+
+    // Mock the snapshot sync process like getSyncResult()
+    conversionTarget.beginSync(internalTable);
+    TableSyncMetadata tableSyncMetadata =
+        TableSyncMetadata.of(internalTable.getLatestCommitTime(), 
snapshot.getPendingCommits());
+    conversionTarget.syncMetadata(tableSyncMetadata);
+    conversionTarget.syncSchema(internalTable.getReadSchema());
+    conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields());
+    conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles());
+    conversionTarget.completeSync();
+
+    // No crash should happen during the process
+    Optional<String> targetIdentifier = 
conversionTarget.getTargetCommitIdentifier("0");
+    // The targetIdentifier is expected to not be found
+    assertFalse(targetIdentifier.isPresent());
+  }
+
+  private InternalSnapshot buildSnapshot(
+      InternalTable table, String sourceIdentifier, InternalDataFile... 
dataFiles) {
     return InternalSnapshot.builder()
         .table(table)
         
.partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+        .sourceIdentifier(sourceIdentifier)
         .build();
   }
 

Reply via email to