This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 23d870ed Modified incremental safe check which does not depend on
snapshots list order Co-authored-by: Timothy Brown <[email protected]>
23d870ed is described below
commit 23d870ede0ef54a28c0cfdc65908bc74fcffe3ff
Author: rbokka <[email protected]>
AuthorDate: Fri Mar 7 08:55:42 2025 -0500
Modified incremental safe check which does not depend on snapshots list
order
Co-authored-by: Timothy Brown <[email protected]>
---
.../xtable/iceberg/IcebergConversionSource.java | 35 ++++++++----------
.../java/org/apache/xtable/TestIcebergTable.java | 2 +-
.../xtable/iceberg/ITIcebergConversionSource.java | 41 ++++++++++++++++------
3 files changed, 45 insertions(+), 33 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 5f210f1f..d4a506e7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -246,31 +246,24 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
public boolean isIncrementalSyncSafeFrom(Instant instant) {
long timeInMillis = instant.toEpochMilli();
Table iceTable = getSourceTable();
- boolean doesInstantOfAgeExists = false;
- Long targetSnapshotId = null;
- for (Snapshot snapshot : iceTable.snapshots()) {
- if (snapshot.timestampMillis() <= timeInMillis) {
- doesInstantOfAgeExists = true;
- targetSnapshotId = snapshot.snapshotId();
- } else {
- break;
+ Snapshot currentSnapshot = iceTable.currentSnapshot();
+
+ while (currentSnapshot != null && currentSnapshot.timestampMillis() >
timeInMillis) {
+ Long parentSnapshotId = currentSnapshot.parentId();
+ if (parentSnapshotId == null) {
+ // no more snapshots in the chain and did not find targetSnapshot
+ return false;
}
- }
- if (!doesInstantOfAgeExists) {
- return false;
- }
- // Go from latest snapshot until targetSnapshotId through parent reference.
- // nothing has to be null in this chain to guarantee safety of incremental
sync.
- Long currentSnapshotId = iceTable.currentSnapshot().snapshotId();
- while (currentSnapshotId != null && currentSnapshotId != targetSnapshotId)
{
- Snapshot currentSnapshot = iceTable.snapshot(currentSnapshotId);
- if (currentSnapshot == null) {
- // The snapshot is expired.
+
+ Snapshot parentSnapshot = iceTable.snapshot(parentSnapshotId);
+ if (parentSnapshot == null) {
+ // chain is broken due to expired snapshot
+ log.info("Expired snapshot id: {}", parentSnapshotId);
return false;
}
- currentSnapshotId = currentSnapshot.parentId();
+ currentSnapshot = parentSnapshot;
}
- return true;
+ return currentSnapshot != null;
}
@Override
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index 0c8336fe..3b944340 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -307,7 +307,7 @@ public class TestIcebergTable implements
GenericTable<Record, String> {
return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
}
- public Long getLastCommitTimestamp() {
+ public long getLastCommitTimestamp() {
return getLatestSnapshot().timestampMillis();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
index 210b6f9d..20026cb5 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
@@ -146,7 +146,7 @@ public class ITIcebergConversionSource {
List<TableChange> allTableChanges = new ArrayList<>();
testIcebergTable.insertRows(50);
- Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
List<Record> records1 = testIcebergTable.insertRows(50);
@@ -204,7 +204,7 @@ public class ITIcebergConversionSource {
List<TableChange> allTableChanges = new ArrayList<>();
List<Record> records1 = testIcebergTable.insertRows(50);
- Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
List<Record> records2 = testIcebergTable.insertRows(50);
@@ -264,7 +264,7 @@ public class ITIcebergConversionSource {
List<TableChange> allTableChanges = new ArrayList<>();
List<Record> records1 = testIcebergTable.insertRows(50);
- Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ long timestamp1 = testIcebergTable.getLastCommitTimestamp();
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
List<Record> records2 = testIcebergTable.insertRows(50);
@@ -325,7 +325,7 @@ public class ITIcebergConversionSource {
List<TableChange> allTableChanges = new ArrayList<>();
List<Record> records1 = testIcebergTable.insertRows(50);
- Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ long timestamp1 = testIcebergTable.getLastCommitTimestamp();
testIcebergTable.upsertRows(records1.subList(0, 20));
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
@@ -383,8 +383,8 @@ public class ITIcebergConversionSource {
TestIcebergTable.forStandardSchemaAndPartitioning(
tableName, "level", tempDir, hadoopConf)) {
// Insert 50 rows to INFO partition.
- List<Record> commit1Rows =
testIcebergTable.insertRecordsForPartition(50, "INFO");
- Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+ List<Record> firstCommitRows =
testIcebergTable.insertRecordsForPartition(50, "INFO");
+ long timestampAfterFirstCommit =
testIcebergTable.getLastCommitTimestamp();
SourceTable tableConfig =
SourceTable.builder()
.name(testIcebergTable.getTableName())
@@ -393,23 +393,42 @@ public class ITIcebergConversionSource {
.build();
// Upsert all rows inserted before, so all files are replaced.
- testIcebergTable.upsertRows(commit1Rows.subList(0, 50));
- long snapshotIdAfterCommit2 =
testIcebergTable.getLatestSnapshot().snapshotId();
+ testIcebergTable.upsertRows(firstCommitRows.subList(0, 50));
+ long timestampAfterSecondCommit =
testIcebergTable.getLastCommitTimestamp();
+ long snapshotIdAfterSecondCommit =
testIcebergTable.getLatestSnapshot().snapshotId();
// Insert 50 rows to different (ERROR) partition.
testIcebergTable.insertRecordsForPartition(50, "ERROR");
+ long timestampAfterThirdCommit =
testIcebergTable.getLastCommitTimestamp();
if (shouldExpireSnapshots) {
// Expire snapshotAfterCommit2.
- testIcebergTable.expireSnapshot(snapshotIdAfterCommit2);
+ testIcebergTable.expireSnapshot(snapshotIdAfterSecondCommit);
}
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
if (shouldExpireSnapshots) {
-
assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
+ // Since the second snapshot is expired, we cannot safely perform
incremental sync from the
+ // first two commits
+ assertFalse(
+ conversionSource.isIncrementalSyncSafeFrom(
+ Instant.ofEpochMilli(timestampAfterFirstCommit)));
+ assertFalse(
+ conversionSource.isIncrementalSyncSafeFrom(
+ Instant.ofEpochMilli(timestampAfterSecondCommit)));
} else {
-
assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
+ // The full history is still present so incremental sync is safe from
any of these commits
+ assertTrue(
+ conversionSource.isIncrementalSyncSafeFrom(
+ Instant.ofEpochMilli(timestampAfterFirstCommit)));
+ assertTrue(
+ conversionSource.isIncrementalSyncSafeFrom(
+ Instant.ofEpochMilli(timestampAfterSecondCommit)));
}
+ // Table always has the last commit so incremental sync is safe
+ assertTrue(
+ conversionSource.isIncrementalSyncSafeFrom(
+ Instant.ofEpochMilli(timestampAfterThirdCommit)));
// Table doesn't have instant of this older commit, hence it is not safe.
Instant instantAsOfHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));