This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new b3e03e55 [40] Add commit-level metadata and source-target id mapping
b3e03e55 is described below
commit b3e03e5522af9e1dc08133b849f7e802b755dbab
Author: Daniel Tu <[email protected]>
AuthorDate: Sat Oct 26 18:01:16 2024 -0700
[40] Add commit-level metadata and source-target id mapping
---
.../org/apache/xtable/model/InternalSnapshot.java | 3 +
.../java/org/apache/xtable/model/TableChange.java | 4 +
.../xtable/model/metadata/TableSyncMetadata.java | 22 ++-
.../xtable/spi/extractor/ConversionSource.java | 14 ++
.../apache/xtable/spi/sync/ConversionTarget.java | 10 ++
.../apache/xtable/spi/sync/TableFormatSync.java | 23 ++-
.../model/metadata/TestTableSyncMetadata.java | 20 ++-
.../spi/extractor/TestExtractFromSource.java | 10 +-
.../xtable/spi/sync/TestTableFormatSync.java | 132 +++++++++++++----
.../apache/xtable/delta/DeltaConversionSource.java | 12 +-
.../apache/xtable/delta/DeltaConversionTarget.java | 65 ++++++++-
.../apache/xtable/hudi/HudiConversionSource.java | 7 +
.../apache/xtable/hudi/HudiConversionTarget.java | 70 ++++++---
.../xtable/iceberg/IcebergConversionSource.java | 12 +-
.../xtable/iceberg/IcebergConversionTarget.java | 41 +++++-
.../xtable/iceberg/IcebergDataFileUpdatesSync.java | 17 ++-
.../conversion/TestConversionController.java | 59 +++++---
.../org/apache/xtable/delta/TestDeltaSync.java | 78 +++++++++-
.../apache/xtable/hudi/ITHudiConversionTarget.java | 160 +++++++++++++++++++--
.../xtable/hudi/TestHudiConversionTarget.java | 3 +-
.../org/apache/xtable/iceberg/TestIcebergSync.java | 123 ++++++++++++++--
21 files changed, 770 insertions(+), 115 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
index 345fda78..1963eba7 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/InternalSnapshot.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import lombok.Builder;
+import lombok.NonNull;
import lombok.Value;
import org.apache.xtable.model.storage.PartitionFileGroup;
@@ -47,4 +48,6 @@ public class InternalSnapshot {
List<PartitionFileGroup> partitionedDataFiles;
// pending commits before latest commit on the table.
@Builder.Default List<Instant> pendingCommits = Collections.emptyList();
+ // commit identifier in source table
+ @NonNull String sourceIdentifier;
}
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index 51f0ee0b..fe3907ee 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -19,6 +19,7 @@
package org.apache.xtable.model;
import lombok.Builder;
+import lombok.NonNull;
import lombok.Value;
import org.apache.xtable.model.storage.DataFilesDiff;
@@ -36,4 +37,7 @@ public class TableChange {
/** The {@link InternalTable} at the commit time to which this table change
belongs. */
InternalTable tableAsOfChange;
+
+ // Commit identifier in source table
+ @NonNull String sourceIdentifier;
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
index fa5eeaff..d8c70791 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
@@ -54,10 +54,30 @@ public class TableSyncMetadata {
Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
+ String sourceTableFormat;
+ String sourceIdentifier;
+ /**
+ * @deprecated Use {@link #of(Instant, List, String, String)} instead. This
method exists for
+ * backward compatibility and will be removed in a future version.
+ */
+ @Deprecated
public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
- return new TableSyncMetadata(lastInstantSynced,
instantsToConsiderForNextSync, CURRENT_VERSION);
+ return TableSyncMetadata.of(lastInstantSynced,
instantsToConsiderForNextSync, null, null);
+ }
+
+ public static TableSyncMetadata of(
+ Instant lastInstantSynced,
+ List<Instant> instantsToConsiderForNextSync,
+ String sourceTableFormat,
+ String sourceIdentifier) {
+ return new TableSyncMetadata(
+ lastInstantSynced,
+ instantsToConsiderForNextSync,
+ CURRENT_VERSION,
+ sourceTableFormat,
+ sourceIdentifier);
}
public String toJson() {
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index 21f7f63f..42d2b26b 100644
---
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -87,4 +87,18 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);
+
+ /**
+ * Extract the identifier of the provided commit. The identifier is defined
as:
+ *
+ * <ul>
+ * <li>Snapshot ID in Iceberg
+ * <li>Version ID in Delta
+ * <li>Timestamp in Hudi
+ * </ul>
+ *
+ * @param commit The provided commit
+ * @return the string version of the commit identifier
+ */
+ String getCommitIdentifier(COMMIT commit);
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index 736f49e4..476578cb 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -90,4 +90,14 @@ public interface ConversionTarget {
/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);
+
+ /**
+ * Retrieves the commit identifier from the target table that corresponds to
a given source table
+ * commit identifier
+ *
+ * @param sourceIdentifier the unique identifier of the source table commit
+ * @return an {@link Optional} containing the target commit identifier if a
corresponding commit
+ * exists, or an empty {@link Optional} if no match is found
+ */
+ Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
index bb340669..78a22b76 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
@@ -73,7 +73,8 @@ public class TableFormatSync {
internalTable,
target ->
target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
- snapshot.getPendingCommits()));
+ snapshot.getPendingCommits(),
+ snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
@@ -121,7 +122,8 @@ public class TableFormatSync {
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
- changes.getPendingCommits()));
+ changes.getPendingCommits(),
+ change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL,
startTime, e));
@@ -149,19 +151,26 @@ public class TableFormatSync {
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
- List<Instant> pendingCommits) {
+ List<Instant> pendingCommits,
+ String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
+ // Persist the latest commit time in table properties for incremental syncs
+ // Syncing metadata must precede the following steps to ensure that the
metadata is available
+ // before committing
+ TableSyncMetadata latestState =
+ TableSyncMetadata.of(
+ tableState.getLatestCommitTime(),
+ pendingCommits,
+ tableState.getTableFormat(),
+ sourceIdentifier);
+ conversionTarget.syncMetadata(latestState);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
conversionTarget.syncPartitionSpec(tableState.getPartitioningFields());
// Update the files in the target table
fileSyncMethod.sync(conversionTarget);
- // Persist the latest commit time in table properties for incremental
syncs.
- TableSyncMetadata latestState =
- TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
- conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();
return SyncResult.builder()
diff --git
a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
index 3dcc6fbe..56cce262 100644
---
a/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
+++
b/xtable-api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
@@ -44,6 +44,7 @@ class TestTableSyncMetadata {
private static Stream<Arguments> provideMetadataAndJson() {
return Stream.of(
+ // Old version of metadata and JSON
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
@@ -56,7 +57,24 @@ class TestTableSyncMetadata {
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"),
null),
- "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
+ "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"),
+ // New version of metadata and JSON with `sourceTableFormat` and
`sourceIdentifier` fields
+ Arguments.of(
+ TableSyncMetadata.of(
+ Instant.parse("2020-07-04T10:15:30.00Z"),
+ Arrays.asList(
+ Instant.parse("2020-08-21T11:15:30.00Z"),
+ Instant.parse("2024-01-21T12:15:30.00Z")),
+ "TEST",
+ "0"),
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
+ Arguments.of(
+ TableSyncMetadata.of(
+ Instant.parse("2020-07-04T10:15:30.00Z"),
Collections.emptyList(), "TEST", "0"),
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
+ Arguments.of(
+ TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"),
null, "TEST", "0"),
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
}
@Test
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
index 8503057d..5c7f8cfc 100644
---
a/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
+++
b/xtable-api/src/test/java/org/apache/xtable/spi/extractor/TestExtractFromSource.java
@@ -53,7 +53,11 @@ public class TestExtractFromSource {
InternalTable table =
InternalTable.builder().latestCommitTime(Instant.now()).build();
List<PartitionFileGroup> dataFiles = Collections.emptyList();
InternalSnapshot internalSnapshot =
-
InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build();
+ InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(dataFiles)
+ .sourceIdentifier("0")
+ .build();
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
assertEquals(internalSnapshot,
ExtractFromSource.of(mockConversionSource).extractSnapshot());
}
@@ -86,6 +90,7 @@ public class TestExtractFromSource {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+ .sourceIdentifier("0")
.build();
when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
.thenReturn(tableChangeToReturnAtFirstInstant);
@@ -94,6 +99,7 @@ public class TestExtractFromSource {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
+ .sourceIdentifier("0")
.build();
// add 2 new files, remove 2 files
@@ -110,6 +116,7 @@ public class TestExtractFromSource {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
+ .sourceIdentifier("1")
.build();
when(mockConversionSource.getTableChangeForCommit(secondCommitToSync))
.thenReturn(tableChangeToReturnAtSecondInstant);
@@ -121,6 +128,7 @@ public class TestExtractFromSource {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
+ .sourceIdentifier("1")
.build();
IncrementalTableChanges actual =
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 39480f8b..852d4e13 100644
---
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -76,6 +76,7 @@ public class TestTableFormatSync {
.table(startingTableState)
.partitionedDataFiles(fileGroups)
.pendingCommits(pendingCommitInstants)
+ .sourceIdentifier("0")
.build();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
@@ -109,7 +110,10 @@ public class TestTableFormatSync {
failureResult.getTableFormatSyncStatus());
verifyBaseConversionTargetCalls(
- mockConversionTarget2, startingTableState, pendingCommitInstants);
+ mockConversionTarget2,
+ startingTableState,
+ pendingCommitInstants,
+ snapshot.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups);
verify(mockConversionTarget2).completeSync();
verify(mockConversionTarget1, never()).completeSync();
@@ -127,15 +131,27 @@ public class TestTableFormatSync {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
-
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState1)
+ .filesDiff(dataFilesDiff1)
+ .sourceIdentifier("0")
+ .build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
-
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState2)
+ .filesDiff(dataFilesDiff2)
+ .sourceIdentifier("1")
+ .build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
-
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState3)
+ .filesDiff(dataFilesDiff3)
+ .sourceIdentifier("2")
+ .build();
List<Instant> pendingCommitInstants =
Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -154,10 +170,12 @@ public class TestTableFormatSync {
Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata =
new HashMap<>();
conversionTargetWithMetadata.put(
mockConversionTarget1,
- TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS),
Collections.emptyList()));
+ TableSyncMetadata.of(
+ Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(),
"TEST", "0"));
conversionTargetWithMetadata.put(
mockConversionTarget2,
- TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS),
Collections.emptyList()));
+ TableSyncMetadata.of(
+ Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(),
"TEST", "1"));
Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
@@ -201,13 +219,29 @@ public class TestTableFormatSync {
assertSyncResultTimes(successResults.get(i), start);
}
- verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget1,
+ tableState1,
+ pendingCommitInstants,
+ tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState1,
+ pendingCommitInstants,
+ tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState2,
+ pendingCommitInstants,
+ tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState3,
+ pendingCommitInstants,
+ tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(1)).completeSync();
verify(mockConversionTarget2, times(3)).completeSync();
@@ -219,15 +253,27 @@ public class TestTableFormatSync {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
-
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState1)
+ .filesDiff(dataFilesDiff1)
+ .sourceIdentifier("0")
+ .build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
-
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState2)
+ .filesDiff(dataFilesDiff2)
+ .sourceIdentifier("1")
+ .build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
-
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState3)
+ .filesDiff(dataFilesDiff3)
+ .sourceIdentifier("2")
+ .build();
List<Instant> pendingCommitInstants =
Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -247,12 +293,17 @@ public class TestTableFormatSync {
mockConversionTarget1,
TableSyncMetadata.of(
tableChange2.getTableAsOfChange().getLatestCommitTime(),
-
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime())));
+
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()),
+ "TEST",
+ tableChange2.getSourceIdentifier()));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(
- tableChange1.getTableAsOfChange().getLatestCommitTime(),
Collections.emptyList()));
+ tableChange1.getTableAsOfChange().getLatestCommitTime(),
+ Collections.emptyList(),
+ "TEST",
+ tableChange1.getSourceIdentifier()));
Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
@@ -290,15 +341,31 @@ public class TestTableFormatSync {
}
// conversionTarget1 syncs table changes 1 and 3
- verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget1,
+ tableState1,
+ pendingCommitInstants,
+ tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
- verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget1,
+ tableState3,
+ pendingCommitInstants,
+ tableChange3.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(2)).completeSync();
// conversionTarget2 syncs table changes 2 and 3
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState2,
+ pendingCommitInstants,
+ tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState3,
+ pendingCommitInstants,
+ tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget2, times(2)).completeSync();
}
@@ -309,7 +376,11 @@ public class TestTableFormatSync {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
-
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
+ TableChange.builder()
+ .tableAsOfChange(tableState1)
+ .filesDiff(dataFilesDiff1)
+ .sourceIdentifier("0")
+ .build();
List<Instant> pendingCommitInstants = Collections.emptyList();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
@@ -325,11 +396,13 @@ public class TestTableFormatSync {
Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata =
new HashMap<>();
// mockConversionTarget1 will have nothing to sync
conversionTargetWithMetadata.put(
- mockConversionTarget1, TableSyncMetadata.of(Instant.now(),
Collections.emptyList()));
+ mockConversionTarget1,
+ TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST",
"0"));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
- TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS),
Collections.emptyList()));
+ TableSyncMetadata.of(
+ Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(),
"TEST", "1"));
Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
@@ -348,7 +421,11 @@ public class TestTableFormatSync {
verify(mockConversionTarget1, never()).syncFilesForDiff(any());
verify(mockConversionTarget1, never()).completeSync();
- verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1,
pendingCommitInstants);
+ verifyBaseConversionTargetCalls(
+ mockConversionTarget2,
+ tableState1,
+ pendingCommitInstants,
+ tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
}
@@ -389,12 +466,17 @@ public class TestTableFormatSync {
private void verifyBaseConversionTargetCalls(
ConversionTarget mockConversionTarget,
InternalTable startingTableState,
- List<Instant> pendingCommitInstants) {
+ List<Instant> pendingCommitInstants,
+ String sourceIdentifier) {
verify(mockConversionTarget).beginSync(startingTableState);
verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema());
verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields());
verify(mockConversionTarget)
.syncMetadata(
- TableSyncMetadata.of(startingTableState.getLatestCommitTime(),
pendingCommitInstants));
+ TableSyncMetadata.of(
+ startingTableState.getLatestCommitTime(),
+ pendingCommitInstants,
+ startingTableState.getTableFormat(),
+ sourceIdentifier));
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 140eb8ad..de4a2b69 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -98,6 +98,7 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(getInternalDataFiles(snapshot,
table.getReadSchema()))
+ .sourceIdentifier(getCommitIdentifier(snapshot.version()))
.build();
}
@@ -167,7 +168,11 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
.filesAdded(addedFiles.values())
.filesRemoved(removedFiles.values())
.build();
- return
TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
+ return TableChange.builder()
+ .tableAsOfChange(tableAtVersion)
+ .filesDiff(dataFilesDiff)
+ .sourceIdentifier(getCommitIdentifier(versionNumber))
+ .build();
}
@Override
@@ -200,6 +205,11 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
return deltaCommitInstant.equals(instant) ||
deltaCommitInstant.isBefore(instant);
}
+ @Override
+ public String getCommitIdentifier(Long commit) {
+ return String.valueOf(commit);
+ }
+
private DeltaIncrementalChangesState getChangesState() {
return deltaIncrementalChangesState.orElseThrow(
() -> new IllegalStateException("DeltaIncrementalChangesState is not
initialized"));
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 343a2d21..fbe99801 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -22,6 +22,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -30,6 +31,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
@@ -41,14 +43,17 @@ import org.apache.spark.sql.delta.DeltaConfigs;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOperations;
import org.apache.spark.sql.delta.OptimisticTransaction;
+import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.CommitInfo;
import org.apache.spark.sql.delta.actions.Format;
import org.apache.spark.sql.delta.actions.Metadata;
import org.apache.spark.sql.delta.actions.RemoveFile;
import scala.Option;
import scala.Some;
+import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@@ -66,6 +71,7 @@ import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.schema.SparkSchemaExtractor;
import org.apache.xtable.spi.sync.ConversionTarget;
+@Log4j2
public class DeltaConversionTarget implements ConversionTarget {
private static final String MIN_READER_VERSION = String.valueOf(1);
// gets access to generated columns.
@@ -216,6 +222,58 @@ public class DeltaConversionTarget implements
ConversionTarget {
return TableFormat.DELTA;
}
+ @Override
+ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+ Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();
+
+ Iterator<Tuple2<Object, Seq<Action>>> versionIterator =
+ JavaConverters.asJavaIteratorConverter(
+ deltaLog.getChanges(currentSnapshot.version(), false))
+ .asJava();
+ while (versionIterator.hasNext()) {
+ Tuple2<Object, Seq<Action>> currentChange = versionIterator.next();
+ Long targetVersion = currentSnapshot.version();
+ List<Action> actions =
JavaConverters.seqAsJavaListConverter(currentChange._2()).asJava();
+
+ // Find the CommitInfo in the changes belongs to certain version
+ Optional<CommitInfo> commitInfo =
+ actions.stream()
+ .filter(action -> action instanceof CommitInfo)
+ .map(action -> (CommitInfo) action)
+ .findFirst();
+ if (!commitInfo.isPresent()) {
+ continue;
+ }
+
+ Option<scala.collection.immutable.Map<String, String>> tags =
commitInfo.get().tags();
+ if (tags.isEmpty()) {
+ continue;
+ }
+
+ Option<String> sourceMetadataJson =
tags.get().get(TableSyncMetadata.XTABLE_METADATA);
+ if (sourceMetadataJson.isEmpty()) {
+ continue;
+ }
+
+ try {
+ Optional<TableSyncMetadata> optionalMetadata =
+ TableSyncMetadata.fromJson(sourceMetadataJson.get());
+ if (!optionalMetadata.isPresent()) {
+ continue;
+ }
+
+ TableSyncMetadata metadata = optionalMetadata.get();
+ if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+ return Optional.of(String.valueOf(targetVersion));
+ }
+ } catch (Exception e) {
+ log.warn("Failed to parse commit metadata for commit: {}",
targetVersion, e);
+ }
+ }
+
+ return Optional.empty();
+ }
+
@EqualsAndHashCode
@ToString
private class TransactionState {
@@ -265,7 +323,8 @@ public class DeltaConversionTarget implements
ConversionTarget {
transaction.updateMetadata(metadata, false);
transaction.commit(
actions,
- new
DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))));
+ new
DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))),
+ ScalaUtils.convertJavaMapToScala(getCommitTags()));
}
private Map<String, String> getConfigurationsForDeltaSync() {
@@ -301,5 +360,9 @@ public class DeltaConversionTarget implements
ConversionTarget {
// fallback to existing deltalog value
return deltaLog.snapshot().metadata().format();
}
+
+ private Map<String, String> getCommitTags() {
+ return Collections.singletonMap(TableSyncMetadata.XTABLE_METADATA,
metadata.toJson());
+ }
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index 02423c2d..cb65c341 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -114,6 +114,7 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
hoodieInstant ->
HudiInstantUtils.parseFromInstantTime(hoodieInstant.getTimestamp()))
.collect(CustomCollectors.toList(pendingInstants.size())))
+ .sourceIdentifier(getCommitIdentifier(latestCommit))
.build();
}
@@ -130,6 +131,7 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
.filesDiff(
dataFileExtractor.getDiffForCommit(
hoodieInstantForDiff, table, hoodieInstantForDiff,
visibleTimeline))
+ .sourceIdentifier(getCommitIdentifier(hoodieInstantForDiff))
.build();
}
@@ -161,6 +163,11 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
return doesCommitExistsAsOfInstant(instant) &&
!isAffectedByCleanupProcess(instant);
}
+ @Override
+ public String getCommitIdentifier(HoodieInstant commit) {
+ return commit.getTimestamp();
+ }
+
private boolean doesCommitExistsAsOfInstant(Instant instant) {
HoodieInstant hoodieInstant = getCommitAtInstant(instant);
return hoodieInstant != null;
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index c3ef6f92..97c2f9e3 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -54,13 +54,13 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
-import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ExternalFilePathUtil;
@@ -287,28 +287,7 @@ public class HudiConversionTarget implements
ConversionTarget {
.filterCompletedInstants()
.lastInstant()
.toJavaOptional()
- .map(
- instant -> {
- try {
- if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return HoodieReplaceCommitMetadata.fromBytes(
-
client.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieReplaceCommitMetadata.class)
- .getExtraMetadata();
- } else {
- return HoodieCommitMetadata.fromBytes(
-
client.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class)
- .getExtraMetadata();
- }
- } catch (IOException ex) {
- throw new ReadException("Unable to read Hudi commit
metadata", ex);
- }
- })
- .flatMap(
- metadata ->
- TableSyncMetadata.fromJson(
- metadata.get(TableSyncMetadata.XTABLE_METADATA))));
+ .flatMap(instant -> getMetadata(instant, client)));
}
@Override
@@ -316,11 +295,56 @@ public class HudiConversionTarget implements
ConversionTarget {
return TableFormat.HUDI;
}
+ @Override
+ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+ if (!metaClient.isPresent()) {
+ return Optional.empty();
+ }
+ return getTargetCommitIdentifier(sourceIdentifier, metaClient.get());
+ }
+
+ Optional<String> getTargetCommitIdentifier(
+ String sourceIdentifier, HoodieTableMetaClient metaClient) {
+
+ HoodieTimeline commitTimeline = metaClient.getCommitsTimeline();
+
+ for (HoodieInstant instant : commitTimeline.getInstants()) {
+ try {
+ Optional<TableSyncMetadata> optionalMetadata = getMetadata(instant,
metaClient);
+ if (!optionalMetadata.isPresent()) {
+ continue;
+ }
+
+ TableSyncMetadata metadata = optionalMetadata.get();
+ if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+ return Optional.of(instant.getTimestamp());
+ }
+ } catch (Exception e) {
+ log.warn("Failed to parse commit metadata for instant: {}", instant,
e);
+ }
+ }
+ return Optional.empty();
+ }
+
private HoodieTableMetaClient getMetaClient() {
return metaClient.orElseThrow(
() -> new IllegalStateException("beginSync must be called before
calling this method"));
}
+ private Optional<TableSyncMetadata> getMetadata(
+ HoodieInstant instant, HoodieTableMetaClient metaClient) {
+ try {
+ HoodieCommitMetadata commitMetadata =
+ TimelineUtils.getCommitMetadata(instant,
metaClient.getActiveTimeline());
+ String sourceMetadataJson =
+
commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA);
+ return TableSyncMetadata.fromJson(sourceMetadataJson);
+ } catch (Exception e) {
+ log.warn("Failed to parse commit metadata for instant: {}", instant, e);
+ return Optional.empty();
+ }
+ }
+
static class CommitState {
private HoodieTableMetaClient metaClient;
@Getter private final String instantTime;
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 0d400e28..cb75b6d0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -165,6 +165,7 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
.version(String.valueOf(currentSnapshot.snapshotId()))
.table(irTable)
.partitionedDataFiles(partitionedDataFiles)
+ .sourceIdentifier(getCommitIdentifier(currentSnapshot))
.build();
}
@@ -196,7 +197,11 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
DataFilesDiff.builder().filesAdded(dataFilesAdded).filesRemoved(dataFilesRemoved).build();
InternalTable table = getTable(snapshot);
- return
TableChange.builder().tableAsOfChange(table).filesDiff(filesDiff).build();
+ return TableChange.builder()
+ .tableAsOfChange(table)
+ .filesDiff(filesDiff)
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
}
@Override
@@ -265,6 +270,11 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
return true;
}
+ @Override
+ public String getCommitIdentifier(Snapshot commit) {
+ return String.valueOf(commit.snapshotId());
+ }
+
@Override
public void close() {
getTableOps().close();
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index ecdbfa26..69faf1ea 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -21,6 +21,7 @@ package org.apache.xtable.iceberg;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import lombok.extern.log4j.Log4j2;
@@ -66,6 +67,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
private Transaction transaction;
private Table table;
private InternalTable internalTableState;
+ private TableSyncMetadata tableSyncMetadata;
public IcebergConversionTarget() {}
@@ -174,6 +176,8 @@ public class IcebergConversionTarget implements
ConversionTarget {
@Override
public void syncMetadata(TableSyncMetadata metadata) {
+ tableSyncMetadata = metadata;
+
UpdateProperties updateProperties = transaction.updateProperties();
updateProperties.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
if (!table.properties().containsKey(TableProperties.WRITE_DATA_LOCATION)) {
@@ -200,13 +204,18 @@ public class IcebergConversionTarget implements
ConversionTarget {
transaction,
partitionedDataFiles,
transaction.table().schema(),
- transaction.table().spec());
+ transaction.table().spec(),
+ tableSyncMetadata);
}
@Override
public void syncFilesForDiff(DataFilesDiff dataFilesDiff) {
dataFileUpdatesExtractor.applyDiff(
- transaction, dataFilesDiff, transaction.table().schema(),
transaction.table().spec());
+ transaction,
+ dataFilesDiff,
+ transaction.table().schema(),
+ transaction.table().spec(),
+ tableSyncMetadata);
}
@Override
@@ -221,6 +230,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
transaction.commitTransaction();
transaction = null;
internalTableState = null;
+ tableSyncMetadata = null;
}
private void safeDelete(String file) {
@@ -242,6 +252,33 @@ public class IcebergConversionTarget implements
ConversionTarget {
return TableFormat.ICEBERG;
}
+ @Override
+ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
+ for (Snapshot snapshot : table.snapshots()) {
+ Map<String, String> summary = snapshot.summary();
+ String sourceMetadataJson =
summary.get(TableSyncMetadata.XTABLE_METADATA);
+ if (sourceMetadataJson == null) {
+ continue;
+ }
+
+ try {
+ Optional<TableSyncMetadata> optionalMetadata =
+ TableSyncMetadata.fromJson(sourceMetadataJson);
+ if (!optionalMetadata.isPresent()) {
+ continue;
+ }
+
+ TableSyncMetadata metadata = optionalMetadata.get();
+ if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
+ return Optional.of(String.valueOf(snapshot.snapshotId()));
+ }
+ } catch (Exception e) {
+ log.warn("Failed to parse parse snapshot metadata for {}",
snapshot.snapshotId(), e);
+ }
+ }
+ return Optional.empty();
+ }
+
private void rollbackCorruptCommits() {
if (table == null) {
// there is no existing table so exit early
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
index 80e1559f..0c8fa9a4 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FilesDiff;
import org.apache.xtable.model.storage.InternalDataFile;
@@ -46,7 +47,8 @@ public class IcebergDataFileUpdatesSync {
Transaction transaction,
List<PartitionFileGroup> partitionedDataFiles,
Schema schema,
- PartitionSpec partitionSpec) {
+ PartitionSpec partitionSpec,
+ TableSyncMetadata metadata) {
Map<String, DataFile> previousFiles = new HashMap<>();
try (CloseableIterable<FileScanTask> iterator =
table.newScan().planFiles()) {
@@ -60,21 +62,24 @@ public class IcebergDataFileUpdatesSync {
FilesDiff<InternalDataFile, DataFile> diff =
DataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles,
previousFiles);
- applyDiff(transaction, diff.getFilesAdded(), diff.getFilesRemoved(),
schema, partitionSpec);
+ applyDiff(
+ transaction, diff.getFilesAdded(), diff.getFilesRemoved(), schema,
partitionSpec, metadata);
}
public void applyDiff(
Transaction transaction,
DataFilesDiff dataFilesDiff,
Schema schema,
- PartitionSpec partitionSpec) {
+ PartitionSpec partitionSpec,
+ TableSyncMetadata metadata) {
Collection<DataFile> filesRemoved =
dataFilesDiff.getFilesRemoved().stream()
.map(file -> getDataFile(partitionSpec, schema, file))
.collect(Collectors.toList());
- applyDiff(transaction, dataFilesDiff.getFilesAdded(), filesRemoved,
schema, partitionSpec);
+ applyDiff(
+ transaction, dataFilesDiff.getFilesAdded(), filesRemoved, schema,
partitionSpec, metadata);
}
private void applyDiff(
@@ -82,10 +87,12 @@ public class IcebergDataFileUpdatesSync {
Collection<InternalDataFile> filesAdded,
Collection<DataFile> filesRemoved,
Schema schema,
- PartitionSpec partitionSpec) {
+ PartitionSpec partitionSpec,
+ TableSyncMetadata metadata) {
OverwriteFiles overwriteFiles = transaction.newOverwrite();
filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec,
schema, f)));
filesRemoved.forEach(overwriteFiles::deleteFile);
+ overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
overwriteFiles.commit();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 0f34103e..9bf8f010 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -94,7 +94,7 @@ public class TestConversionController {
void testAllSnapshotSyncAsPerConfig() {
SyncMode syncMode = SyncMode.FULL;
InternalTable internalTable = getInternalTable();
- InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1",
"0");
Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
Map<String, SyncResult> perTableResults = new HashMap<>();
@@ -172,16 +172,19 @@ public class TestConversionController {
CommitsBacklog<Instant> commitsBacklog =
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
Optional<TableSyncMetadata> conversionTarget1Metadata =
- Optional.of(TableSyncMetadata.of(icebergLastSyncInstant,
pendingInstantsForIceberg));
+ Optional.of(
+ TableSyncMetadata.of(icebergLastSyncInstant,
pendingInstantsForIceberg, "TEST", "0"));
when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata);
Optional<TableSyncMetadata> conversionTarget2Metadata =
- Optional.of(TableSyncMetadata.of(deltaLastSyncInstant,
pendingInstantsForDelta));
+ Optional.of(
+ TableSyncMetadata.of(deltaLastSyncInstant,
pendingInstantsForDelta, "TEST", "0"));
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
.thenReturn(commitsBacklog);
List<TableChange> tableChanges = new ArrayList<>();
- for (Instant instant : instantsToProcess) {
- TableChange tableChange = getTableChange(instant);
+ for (int i = 0; i < instantsToProcess.size(); i++) {
+ Instant instant = instantsToProcess.get(i);
+ TableChange tableChange = getTableChange(instant, String.valueOf(i));
tableChanges.add(tableChange);
when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange);
}
@@ -224,7 +227,7 @@ public class TestConversionController {
SyncMode syncMode = SyncMode.INCREMENTAL;
InternalTable internalTable = getInternalTable();
Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
- InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1",
"0");
SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
Map<String, SyncResult> syncResults = new HashMap<>();
syncResults.put(TableFormat.ICEBERG, syncResult);
@@ -247,9 +250,11 @@ public class TestConversionController {
// Both Iceberg and Delta last synced at instantAt5 and have no pending
instants.
when(mockConversionTarget1.getTableMetadata())
- .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5,
Collections.emptyList())));
+ .thenReturn(
+ Optional.of(TableSyncMetadata.of(instantAt5,
Collections.emptyList(), "TEST", "0")));
when(mockConversionTarget2.getTableMetadata())
- .thenReturn(Optional.of(TableSyncMetadata.of(instantAt5,
Collections.emptyList())));
+ .thenReturn(
+ Optional.of(TableSyncMetadata.of(instantAt5,
Collections.emptyList(), "TEST", "0")));
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
when(tableFormatSync.syncSnapshot(
@@ -309,22 +314,26 @@ public class TestConversionController {
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
when(mockConversionTarget1.getTableMetadata())
.thenReturn(
- Optional.of(TableSyncMetadata.of(icebergLastSyncInstant,
pendingInstantsForIceberg)));
+ Optional.of(
+ TableSyncMetadata.of(
+ icebergLastSyncInstant, pendingInstantsForIceberg, "TEST",
"0")));
Optional<TableSyncMetadata> conversionTarget2Metadata =
- Optional.of(TableSyncMetadata.of(deltaLastSyncInstant,
pendingInstantsForDelta));
+ Optional.of(
+ TableSyncMetadata.of(deltaLastSyncInstant,
pendingInstantsForDelta, "TEST", "0"));
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
.thenReturn(commitsBacklog);
List<TableChange> tableChanges = new ArrayList<>();
- for (Instant instant : instantsToProcess) {
- TableChange tableChange = getTableChange(instant);
+ for (int i = 0; i < instantsToProcess.size(); i++) {
+ Instant instant = instantsToProcess.get(i);
+ TableChange tableChange = getTableChange(instant, String.valueOf(i));
tableChanges.add(tableChange);
when(mockConversionSource.getTableChangeForCommit(instant)).thenReturn(tableChange);
}
// Iceberg needs to sync by snapshot since instant15 is affected by table
clean-up.
InternalTable internalTable = getInternalTable();
Instant instantBeforeHour = Instant.now().minus(Duration.ofHours(1));
- InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1",
"0");
SyncResult syncResult = buildSyncResult(syncMode, instantBeforeHour);
Map<String, SyncResult> snapshotResult =
Collections.singletonMap(TableFormat.ICEBERG, syncResult);
@@ -388,10 +397,12 @@ public class TestConversionController {
CommitsBacklog<Instant> commitsBacklog =
CommitsBacklog.<Instant>builder().commitsToProcess(instantsToProcess).build();
Optional<TableSyncMetadata> conversionTarget1Metadata =
- Optional.of(TableSyncMetadata.of(icebergLastSyncInstant,
Collections.emptyList()));
+ Optional.of(
+ TableSyncMetadata.of(icebergLastSyncInstant,
Collections.emptyList(), "TEST", "0"));
when(mockConversionTarget1.getTableMetadata()).thenReturn(conversionTarget1Metadata);
Optional<TableSyncMetadata> conversionTarget2Metadata =
- Optional.of(TableSyncMetadata.of(deltaLastSyncInstant,
Collections.emptyList()));
+ Optional.of(
+ TableSyncMetadata.of(deltaLastSyncInstant,
Collections.emptyList(), "TEST", "0"));
when(mockConversionTarget2.getTableMetadata()).thenReturn(conversionTarget2Metadata);
when(mockConversionSource.getCommitsBacklog(instantsForIncrementalSync))
.thenReturn(commitsBacklog);
@@ -426,7 +437,7 @@ public class TestConversionController {
List<TargetCatalogConfig> targetCatalogs =
Arrays.asList(getTargetCatalog("1"), getTargetCatalog("2"));
InternalTable internalTable = getInternalTable();
- InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1");
+ InternalSnapshot internalSnapshot = buildSnapshot(internalTable, "v1",
"0");
// Conversion source and target mocks.
ConversionConfig conversionConfig =
getTableSyncConfig(
@@ -513,8 +524,11 @@ public class TestConversionController {
.collect(Collectors.toList());
}
- private TableChange getTableChange(Instant instant) {
- return
TableChange.builder().tableAsOfChange(getInternalTable(instant)).build();
+ private TableChange getTableChange(Instant instant, String sourceIdentifier)
{
+ return TableChange.builder()
+ .tableAsOfChange(getInternalTable(instant))
+ .sourceIdentifier(sourceIdentifier)
+ .build();
}
private SyncResult buildSyncResult(SyncMode syncMode, Instant
lastSyncedInstant) {
@@ -536,8 +550,13 @@ public class TestConversionController {
.build();
}
- private InternalSnapshot buildSnapshot(InternalTable internalTable, String
version) {
- return
InternalSnapshot.builder().table(internalTable).version(version).build();
+ private InternalSnapshot buildSnapshot(
+ InternalTable internalTable, String version, String sourceIdentifier) {
+ return InternalSnapshot.builder()
+ .table(internalTable)
+ .version(version)
+ .sourceIdentifier(sourceIdentifier)
+ .build();
}
private InternalTable getInternalTable() {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index f0f889d2..7e921efe 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -19,6 +19,7 @@
package org.apache.xtable.delta;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -81,6 +83,7 @@ import io.delta.standalone.types.StringType;
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
@@ -159,8 +162,8 @@ public class TestDeltaSync {
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(),
basePath);
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(),
basePath);
- InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
- InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1,
dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2,
dataFile3);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -214,7 +217,7 @@ public class TestDeltaSync {
EqualTo equalToExpr =
new EqualTo(new Column("string_field", new StringType()),
Literal.of("warning"));
- InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -294,7 +297,7 @@ public class TestDeltaSync {
EqualTo equalToExpr2 = new EqualTo(new Column("int_field", new
IntegerType()), Literal.of(20));
And CombinedExpr = new And(equalToExpr1, equalToExpr2);
- InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2)),
CombinedExpr);
@@ -337,7 +340,7 @@ public class TestDeltaSync {
InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);
- InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
@@ -367,6 +370,67 @@ public class TestDeltaSync {
.contains(String.format("xtable_partition_col_%s_timestamp_field",
transformType)));
}
+ @Test
+ public void testSourceTargetIdMapping() throws Exception {
+ InternalSchema baseSchema = getInternalSchema();
+ InternalTable sourceTable =
+ getInternalTable("source_table", basePath, baseSchema, null,
LAST_COMMIT_TIME);
+
+ InternalDataFile sourceDataFile1 = getDataFile(101,
Collections.emptyList(), basePath);
+ InternalDataFile sourceDataFile2 = getDataFile(102,
Collections.emptyList(), basePath);
+ InternalDataFile sourceDataFile3 = getDataFile(103,
Collections.emptyList(), basePath);
+
+ InternalSnapshot sourceSnapshot1 =
+ buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2);
+ InternalSnapshot sourceSnapshot2 =
+ buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3);
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget),
sourceSnapshot1);
+ Optional<String> mappedTargetId1 =
+
conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier());
+ validateDeltaTable(
+ basePath, new HashSet<>(Arrays.asList(sourceDataFile1,
sourceDataFile2)), null);
+ assertTrue(mappedTargetId1.isPresent());
+ assertEquals("0", mappedTargetId1.get());
+
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget),
sourceSnapshot2);
+ Optional<String> mappedTargetId2 =
+
conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier());
+ validateDeltaTable(
+ basePath, new HashSet<>(Arrays.asList(sourceDataFile2,
sourceDataFile3)), null);
+ assertTrue(mappedTargetId2.isPresent());
+ assertEquals("1", mappedTargetId2.get());
+
+ Optional<String> unmappedTargetId =
conversionTarget.getTargetCommitIdentifier("s3");
+ assertFalse(unmappedTargetId.isPresent());
+ }
+
+ @Test
+ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws
Exception {
+ InternalSchema baseSchema = getInternalSchema();
+ InternalTable internalTable =
+ getInternalTable("source_table", basePath, baseSchema, null,
LAST_COMMIT_TIME);
+ InternalDataFile sourceDataFile = getDataFile(101,
Collections.emptyList(), basePath);
+ InternalSnapshot snapshot = buildSnapshot(internalTable, "0",
sourceDataFile);
+
+ // Mock the snapshot sync process like getSyncResult()
+ conversionTarget.beginSync(internalTable);
+ TableSyncMetadata tableSyncMetadata =
+ TableSyncMetadata.of(internalTable.getLatestCommitTime(),
snapshot.getPendingCommits());
+ conversionTarget.syncMetadata(tableSyncMetadata);
+ conversionTarget.syncSchema(internalTable.getReadSchema());
+ conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields());
+ conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles());
+ conversionTarget.completeSync();
+
+ // No crash should happen during the process
+ Optional<String> unmappedTargetId =
conversionTarget.getTargetCommitIdentifier("0");
+ // The targetIdentifier is expected to not be found
+ assertFalse(unmappedTargetId.isPresent());
+ }
+
private static Stream<Arguments> timestampPartitionTestingArgs() {
return Stream.of(
Arguments.of(PartitionTransformType.YEAR),
@@ -408,10 +472,12 @@ public class TestDeltaSync {
internalDataFiles.size(), count, "Number of files from DeltaScan don't
match expectation");
}
- private InternalSnapshot buildSnapshot(InternalTable table,
InternalDataFile... dataFiles) {
+ private InternalSnapshot buildSnapshot(
+ InternalTable table, String sourceIdentifier, InternalDataFile...
dataFiles) {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+ .sourceIdentifier(sourceIdentifier)
.build();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
index 03bb6e2c..82125d25 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
@@ -22,6 +22,8 @@ import static
org.apache.xtable.hudi.HudiTestUtil.createWriteStatus;
import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig;
import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.time.Duration;
@@ -32,6 +34,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -204,7 +207,8 @@ public class ITHudiConversionTarget {
targetClient.syncFilesForDiff(dataFilesDiff);
targetClient.syncSchema(SCHEMA);
TableSyncMetadata latestState =
- TableSyncMetadata.of(initialState.getLatestCommitTime(),
Collections.emptyList());
+ TableSyncMetadata.of(
+ initialState.getLatestCommitTime(), Collections.emptyList(),
"TEST", "0");
targetClient.syncMetadata(latestState);
targetClient.completeSync();
@@ -243,7 +247,8 @@ public class ITHudiConversionTarget {
targetClient.beginSync(initialState);
targetClient.syncFilesForSnapshot(snapshot);
TableSyncMetadata latestState =
- TableSyncMetadata.of(initialState.getLatestCommitTime(),
Collections.emptyList());
+ TableSyncMetadata.of(
+ initialState.getLatestCommitTime(), Collections.emptyList(),
"TEST", "0");
targetClient.syncSchema(initialState.getReadSchema());
targetClient.syncMetadata(latestState);
targetClient.completeSync();
@@ -288,7 +293,8 @@ public class ITHudiConversionTarget {
targetClient.beginSync(initialState);
targetClient.syncFilesForSnapshot(snapshot);
TableSyncMetadata latestState =
- TableSyncMetadata.of(initialState.getLatestCommitTime(),
Collections.emptyList());
+ TableSyncMetadata.of(
+ initialState.getLatestCommitTime(), Collections.emptyList(),
"TEST", "0");
targetClient.syncMetadata(latestState);
targetClient.syncSchema(initialState.getReadSchema());
targetClient.completeSync();
@@ -311,7 +317,8 @@ public class ITHudiConversionTarget {
targetClient,
Collections.singletonList(getTestFile(partitionPath, fileName2)),
Collections.singletonList(getTestFile(partitionPath, fileName1)),
- Instant.now().minus(12, ChronoUnit.HOURS));
+ Instant.now().minus(12, ChronoUnit.HOURS),
+ "1");
assertFileGroupCorrectness(
metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName2,
filePath2)));
@@ -331,7 +338,9 @@ public class ITHudiConversionTarget {
targetClient,
Collections.singletonList(getTestFile(partitionPath, fileName3)),
Collections.singletonList(getTestFile(partitionPath, fileName2)),
- Instant.now().minus(8, ChronoUnit.HOURS));
+ Instant.now().minus(8, ChronoUnit.HOURS),
+ "2");
+
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
// create a commit that just adds fileName4
String fileName4 = "file_4.parquet";
@@ -340,7 +349,9 @@ public class ITHudiConversionTarget {
targetClient,
Collections.singletonList(getTestFile(partitionPath, fileName4)),
Collections.emptyList(),
- Instant.now());
+ Instant.now(),
+ "3");
+
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
// create another commit that should trigger archival of the first two
commits
String fileName5 = "file_5.parquet";
@@ -349,7 +360,9 @@ public class ITHudiConversionTarget {
targetClient,
Collections.singletonList(getTestFile(partitionPath, fileName5)),
Collections.emptyList(),
- Instant.now());
+ Instant.now(),
+ "4");
+
System.out.println(metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
assertFileGroupCorrectness(
metaClient,
@@ -372,18 +385,147 @@ public class ITHudiConversionTarget {
2,
metaClient.getArchivedTimeline().reload().filterCompletedInstants().countInstants());
}
+ @ParameterizedTest
+ @ValueSource(strings = {"partition_path", ""})
+ void testSourceTargetMappingWithSnapshotAndIncrementalSync(String
partitionPath) {
+ // Step 1: Initialize Test Files for Initial Snapshot
+ String fileName0 = "file_0.parquet";
+ String fileName1 = "file_1.parquet";
+
+ List<PartitionFileGroup> initialSnapshot =
+ Collections.singletonList(
+ PartitionFileGroup.builder()
+ .files(
+ Arrays.asList(
+ getTestFile(partitionPath, fileName0),
+ getTestFile(partitionPath, fileName1)))
+ .partitionValues(
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(PARTITION_FIELD)
+ .range(Range.scalar("partitionPath"))
+ .build()))
+ .build());
+
+ // Step 2: Sync Initial Snapshot
+ InternalTable initialState = getState(Instant.now().minus(24,
ChronoUnit.HOURS));
+ HudiConversionTarget targetClient = getTargetClient();
+ targetClient.beginSync(initialState);
+ targetClient.syncFilesForSnapshot(initialSnapshot);
+ TableSyncMetadata latestState =
+ TableSyncMetadata.of(
+ initialState.getLatestCommitTime(), Collections.emptyList(),
"TEST", "0");
+ targetClient.syncMetadata(latestState);
+ targetClient.syncSchema(initialState.getReadSchema());
+ targetClient.completeSync();
+
+ // Step 3: Verify Source-Target Mapping for Initial Snapshot
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+
+ Optional<String> initialTargetIdentifier =
+
targetClient.getTargetCommitIdentifier(latestState.getSourceIdentifier(),
metaClient);
+ assertTrue(initialTargetIdentifier.isPresent());
+ assertEquals(
+ initialTargetIdentifier.get(),
+ metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+ // Step 4: Perform Incremental Sync (Remove file1, Add file2)
+ String fileName2 = "file_2.parquet";
+ incrementalSync(
+ targetClient,
+ Collections.singletonList(getTestFile(partitionPath, fileName2)), //
Adding file2
+ Collections.singletonList(getTestFile(partitionPath, fileName1)), //
Removing file1
+ Instant.now().minus(12, ChronoUnit.HOURS),
+ "1"); // Incremental commit ID = "1"
+
+ // Step 5: Verify Source-Target Mapping for Incremental Sync
+ metaClient.reloadActiveTimeline();
+ Optional<String> incrementalTargetIdentifier =
+ targetClient.getTargetCommitIdentifier("1", metaClient);
+ assertTrue(incrementalTargetIdentifier.isPresent());
+ assertEquals(
+ incrementalTargetIdentifier.get(),
+ metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+ // Step 6: Perform Another Incremental Sync (Remove file2, Add file3)
+ String fileName3 = "file_3.parquet";
+ incrementalSync(
+ targetClient,
+ Collections.singletonList(getTestFile(partitionPath, fileName3)), //
Adding file3
+ Collections.singletonList(getTestFile(partitionPath, fileName2)), //
Removing file2
+ Instant.now().minus(8, ChronoUnit.HOURS),
+ "2"); // Incremental commit ID = "2"
+
+ // Step 7: Verify Source-Target Mapping for Second Incremental Sync
+ metaClient.reloadActiveTimeline();
+ Optional<String> incrementalTargetIdentifier2 =
+ targetClient.getTargetCommitIdentifier("2", metaClient);
+ assertTrue(incrementalTargetIdentifier2.isPresent());
+ assertEquals(
+ incrementalTargetIdentifier2.get(),
+ metaClient.getCommitsTimeline().lastInstant().get().getTimestamp());
+
+ // Step 8: Verify Non-Existent Source ID Returns Empty
+ Optional<String> nonExistentTargetIdentifier =
+ targetClient.getTargetCommitIdentifier("3", metaClient);
+ assertFalse(nonExistentTargetIdentifier.isPresent());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"partition_path", ""})
+ void testGetTargetCommitIdentifierWithNullSourceIdentifier(String
partitionPath) {
+ // Initialize Test Files and Snapshot
+ String fileName0 = "file_0.parquet";
+ String fileName1 = "file_1.parquet";
+
+ List<PartitionFileGroup> initialSnapshot =
+ Collections.singletonList(
+ PartitionFileGroup.builder()
+ .files(
+ Arrays.asList(
+ getTestFile(partitionPath, fileName0),
+ getTestFile(partitionPath, fileName1)))
+ .partitionValues(
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(PARTITION_FIELD)
+ .range(Range.scalar("partitionPath"))
+ .build()))
+ .build());
+ InternalTable internalTable = getState(Instant.now().minus(24,
ChronoUnit.HOURS));
+ HudiConversionTarget targetClient = getTargetClient();
+
+ targetClient.beginSync(internalTable);
+ targetClient.syncFilesForSnapshot(initialSnapshot);
+ TableSyncMetadata tableSyncMetadata =
+ TableSyncMetadata.of(internalTable.getLatestCommitTime(),
Collections.emptyList());
+ targetClient.syncMetadata(tableSyncMetadata);
+ targetClient.syncSchema(internalTable.getReadSchema());
+ targetClient.completeSync();
+
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+ // No crash should happen during the process
+ Optional<String> targetIdentifier =
targetClient.getTargetCommitIdentifier("0", metaClient);
+ // The targetIdentifier is expected to not be found
+ assertFalse(targetIdentifier.isPresent());
+ }
+
private TableSyncMetadata incrementalSync(
ConversionTarget conversionTarget,
List<InternalDataFile> filesToAdd,
List<InternalDataFile> filesToRemove,
- Instant commitStart) {
+ Instant commitStart,
+ String sourceIdentifier) {
DataFilesDiff dataFilesDiff2 =
DataFilesDiff.builder().filesAdded(filesToAdd).filesRemoved(filesToRemove).build();
InternalTable state3 = getState(commitStart);
conversionTarget.beginSync(state3);
conversionTarget.syncFilesForDiff(dataFilesDiff2);
TableSyncMetadata latestState =
- TableSyncMetadata.of(state3.getLatestCommitTime(),
Collections.emptyList());
+ TableSyncMetadata.of(
+ state3.getLatestCommitTime(), Collections.emptyList(), "TEST",
sourceIdentifier);
conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();
return latestState;
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
index fe20db05..da1ec033 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionTarget.java
@@ -141,7 +141,8 @@ public class TestHudiConversionTarget {
HudiConversionTarget targetClient = getTargetClient(null);
HudiConversionTarget.CommitState mockCommitState =
initMocksForBeginSync(targetClient).getLeft();
- TableSyncMetadata metadata = TableSyncMetadata.of(COMMIT_TIME,
Collections.emptyList());
+ TableSyncMetadata metadata =
+ TableSyncMetadata.of(COMMIT_TIME, Collections.emptyList(), "TEST",
"0");
targetClient.syncMetadata(metadata);
// validate that metadata is set in commitState
verify(mockCommitState).setTableSyncMetadata(metadata);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index bd36dde9..c02d7f26 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -227,8 +227,8 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
- InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
- InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1,
dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2,
dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -322,9 +322,9 @@ public class TestIcebergSync {
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList());
- InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
- InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
- InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4);
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1,
dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2,
dataFile3);
+ InternalSnapshot snapshot3 = buildSnapshot(table2, "2", dataFile3,
dataFile4);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
@@ -398,7 +398,7 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema))
.thenReturn(icebergSchema)
@@ -461,7 +461,7 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -515,7 +515,7 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -590,7 +590,7 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues2);
InternalDataFile dataFile3 = getDataFile(3, partitionValues3);
- InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -656,7 +656,7 @@ public class TestIcebergSync {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
- InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2,
dataFile3);
+ InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1,
dataFile2, dataFile3);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
@@ -681,10 +681,111 @@ public class TestIcebergSync {
Expressions.equal(partitionField.getSourceField().getPath(),
"value1"));
}
- private InternalSnapshot buildSnapshot(InternalTable table,
InternalDataFile... dataFiles) {
+ @Test
+ public void testSourceTargetMapping() throws Exception {
+ // Prepare schemas
+ List<InternalField> fields2 = new ArrayList<>(internalSchema.getFields());
+ fields2.add(
+ InternalField.builder()
+ .name("long_field")
+
.schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build())
+ .build());
+ InternalSchema schema2 =
internalSchema.toBuilder().fields(fields2).build();
+ List<Types.NestedField> fields = new ArrayList<>(icebergSchema.columns());
+ fields.add(Types.NestedField.of(6, false, "long_field",
Types.LongType.get()));
+ Schema icebergSchema2 = new Schema(fields);
+
+ InternalTable table1 =
+ getInternalTable(tableName, basePath, internalSchema, null,
LAST_COMMIT_TIME);
+ InternalTable table2 = getInternalTable(tableName, basePath, schema2,
null, LAST_COMMIT_TIME);
+
+ // Create data files and snapshots
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+ InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
+ InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
+ InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1,
dataFile2);
+ InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2,
dataFile3);
+
+
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
+ when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
+ when(mockPartitionSpecExtractor.toIceberg(eq(null), any()))
+ .thenReturn(PartitionSpec.unpartitioned());
+
+ mockColStatsForFile(dataFile1, 2);
+ mockColStatsForFile(dataFile2, 2);
+ mockColStatsForFile(dataFile3, 1);
+
+ // Sync first snapshot
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+ Optional<String> targetIdentifier1 =
+
conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier());
+ assertTrue(targetIdentifier1.isPresent());
+ assertEquals(
+ targetIdentifier1.get(),
String.valueOf(getTable(basePath).currentSnapshot().snapshotId()));
+
+ // Sync second snapshot
+ TableFormatSync.getInstance()
+ .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+ Optional<String> targetIdentifier2 =
+
conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier());
+ assertTrue(targetIdentifier2.isPresent());
+ assertEquals(
+ targetIdentifier2.get(),
String.valueOf(getTable(basePath).currentSnapshot().snapshotId()));
+
+ // Validate that an unknown source ID returns empty
+ Optional<String> emptyTargetIdentifier =
conversionTarget.getTargetCommitIdentifier("999");
+ assertFalse(emptyTargetIdentifier.isPresent());
+ }
+
+ @Test
+ public void testGetTargetCommitIdentifierWithNullSourceIdentifier() {
+ // Prepare schemas
+ List<InternalField> fields2 = new ArrayList<>(internalSchema.getFields());
+ fields2.add(
+ InternalField.builder()
+ .name("long_field")
+
.schema(InternalSchema.builder().name("long").dataType(InternalType.LONG).build())
+ .build());
+ InternalSchema schema2 =
internalSchema.toBuilder().fields(fields2).build();
+ List<Types.NestedField> fields = new ArrayList<>(icebergSchema.columns());
+ fields.add(Types.NestedField.of(6, false, "long_field",
Types.LongType.get()));
+ Schema icebergSchema2 = new Schema(fields);
+
+ InternalTable internalTable =
+ getInternalTable(tableName, basePath, internalSchema, null,
LAST_COMMIT_TIME);
+
+ // Create data files and snapshots
+ InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList());
+ InternalSnapshot snapshot = buildSnapshot(internalTable, "0", dataFile1);
+
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
+ when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
+ when(mockPartitionSpecExtractor.toIceberg(eq(null), any()))
+ .thenReturn(PartitionSpec.unpartitioned());
+ mockColStatsForFile(dataFile1, 2);
+
+ // Mock the snapshot sync process like getSyncResult()
+ conversionTarget.beginSync(internalTable);
+ TableSyncMetadata tableSyncMetadata =
+ TableSyncMetadata.of(internalTable.getLatestCommitTime(),
snapshot.getPendingCommits());
+ conversionTarget.syncMetadata(tableSyncMetadata);
+ conversionTarget.syncSchema(internalTable.getReadSchema());
+ conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields());
+ conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles());
+ conversionTarget.completeSync();
+
+ // No crash should happen during the process
+ Optional<String> targetIdentifier =
conversionTarget.getTargetCommitIdentifier("0");
+ // The targetIdentifier is expected to not be found
+ assertFalse(targetIdentifier.isPresent());
+ }
+
+ private InternalSnapshot buildSnapshot(
+ InternalTable table, String sourceIdentifier, InternalDataFile...
dataFiles) {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles)))
+ .sourceIdentifier(sourceIdentifier)
.build();
}