This is an automated email from the ASF dual-hosted git repository. ashvin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new 23d870ed Modified incremental safe check which does not depend on snapshots list order Co-authored-by: Timothy Brown <t...@onehouse.ai> 23d870ed is described below commit 23d870ede0ef54a28c0cfdc65908bc74fcffe3ff Author: rbokka <rbo...@microsoft.com> AuthorDate: Fri Mar 7 08:55:42 2025 -0500 Modified incremental safe check which does not depend on snapshots list order Co-authored-by: Timothy Brown <t...@onehouse.ai> --- .../xtable/iceberg/IcebergConversionSource.java | 35 ++++++++---------- .../java/org/apache/xtable/TestIcebergTable.java | 2 +- .../xtable/iceberg/ITIcebergConversionSource.java | 41 ++++++++++++++++------ 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 5f210f1f..d4a506e7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -246,31 +246,24 @@ public class IcebergConversionSource implements ConversionSource<Snapshot> { public boolean isIncrementalSyncSafeFrom(Instant instant) { long timeInMillis = instant.toEpochMilli(); Table iceTable = getSourceTable(); - boolean doesInstantOfAgeExists = false; - Long targetSnapshotId = null; - for (Snapshot snapshot : iceTable.snapshots()) { - if (snapshot.timestampMillis() <= timeInMillis) { - doesInstantOfAgeExists = true; - targetSnapshotId = snapshot.snapshotId(); - } else { - break; + Snapshot currentSnapshot = iceTable.currentSnapshot(); + + while (currentSnapshot != null && currentSnapshot.timestampMillis() > timeInMillis) { + Long parentSnapshotId = currentSnapshot.parentId(); + if (parentSnapshotId == null) { + // no more snapshots in the chain and did not find targetSnapshot + return false; } - } - if (!doesInstantOfAgeExists) { - return false; - } - // Go from latest snapshot until targetSnapshotId through parent reference. - // nothing has to be null in this chain to guarantee safety of incremental sync. - Long currentSnapshotId = iceTable.currentSnapshot().snapshotId(); - while (currentSnapshotId != null && currentSnapshotId != targetSnapshotId) { - Snapshot currentSnapshot = iceTable.snapshot(currentSnapshotId); - if (currentSnapshot == null) { - // The snapshot is expired. + + Snapshot parentSnapshot = iceTable.snapshot(parentSnapshotId); + if (parentSnapshot == null) { + // chain is broken due to expired snapshot + log.info("Expired snapshot id: {}", parentSnapshotId); return false; } - currentSnapshotId = currentSnapshot.parentId(); + currentSnapshot = parentSnapshot; } - return true; + return currentSnapshot != null; } @Override diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java index 0c8336fe..3b944340 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java @@ -307,7 +307,7 @@ public class TestIcebergTable implements GenericTable<Record, String> { return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField()); } - public Long getLastCommitTimestamp() { + public long getLastCommitTimestamp() { return getLatestSnapshot().timestampMillis(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java index 210b6f9d..20026cb5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java @@ -146,7 +146,7 @@ public class ITIcebergConversionSource { List<TableChange> allTableChanges = new ArrayList<>(); testIcebergTable.insertRows(50); - Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); + long timestamp1 = testIcebergTable.getLastCommitTimestamp(); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); List<Record> records1 = testIcebergTable.insertRows(50); @@ -204,7 +204,7 @@ public class ITIcebergConversionSource { List<TableChange> allTableChanges = new ArrayList<>(); List<Record> records1 = testIcebergTable.insertRows(50); - Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); + long timestamp1 = testIcebergTable.getLastCommitTimestamp(); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); List<Record> records2 = testIcebergTable.insertRows(50); @@ -264,7 +264,7 @@ public class ITIcebergConversionSource { List<TableChange> allTableChanges = new ArrayList<>(); List<Record> records1 = testIcebergTable.insertRows(50); - Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); + long timestamp1 = testIcebergTable.getLastCommitTimestamp(); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); List<Record> records2 = testIcebergTable.insertRows(50); @@ -325,7 +325,7 @@ public class ITIcebergConversionSource { List<TableChange> allTableChanges = new ArrayList<>(); List<Record> records1 = testIcebergTable.insertRows(50); - Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); + long timestamp1 = testIcebergTable.getLastCommitTimestamp(); testIcebergTable.upsertRows(records1.subList(0, 20)); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); @@ -383,8 +383,8 @@ public class ITIcebergConversionSource { TestIcebergTable.forStandardSchemaAndPartitioning( tableName, "level", tempDir, hadoopConf)) { // Insert 50 rows to INFO partition. - List<Record> commit1Rows = testIcebergTable.insertRecordsForPartition(50, "INFO"); - Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); + List<Record> firstCommitRows = testIcebergTable.insertRecordsForPartition(50, "INFO"); + long timestampAfterFirstCommit = testIcebergTable.getLastCommitTimestamp(); SourceTable tableConfig = SourceTable.builder() .name(testIcebergTable.getTableName()) @@ -393,23 +393,42 @@ public class ITIcebergConversionSource { .build(); // Upsert all rows inserted before, so all files are replaced. - testIcebergTable.upsertRows(commit1Rows.subList(0, 50)); - long snapshotIdAfterCommit2 = testIcebergTable.getLatestSnapshot().snapshotId(); + testIcebergTable.upsertRows(firstCommitRows.subList(0, 50)); + long timestampAfterSecondCommit = testIcebergTable.getLastCommitTimestamp(); + long snapshotIdAfterSecondCommit = testIcebergTable.getLatestSnapshot().snapshotId(); // Insert 50 rows to different (ERROR) partition. testIcebergTable.insertRecordsForPartition(50, "ERROR"); + long timestampAfterThirdCommit = testIcebergTable.getLastCommitTimestamp(); if (shouldExpireSnapshots) { // Expire snapshotAfterCommit2. - testIcebergTable.expireSnapshot(snapshotIdAfterCommit2); + testIcebergTable.expireSnapshot(snapshotIdAfterSecondCommit); } IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(tableConfig); if (shouldExpireSnapshots) { - assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1))); + // Since the second snapshot is expired, we cannot safely perform incremental sync from the + // first two commits + assertFalse( + conversionSource.isIncrementalSyncSafeFrom( + Instant.ofEpochMilli(timestampAfterFirstCommit))); + assertFalse( + conversionSource.isIncrementalSyncSafeFrom( + Instant.ofEpochMilli(timestampAfterSecondCommit))); } else { - assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1))); + // The full history is still present so incremental sync is safe from any of these commits + assertTrue( + conversionSource.isIncrementalSyncSafeFrom( + Instant.ofEpochMilli(timestampAfterFirstCommit))); + assertTrue( + conversionSource.isIncrementalSyncSafeFrom( + Instant.ofEpochMilli(timestampAfterSecondCommit))); } + // Table always has the last commit so incremental sync is safe + assertTrue( + conversionSource.isIncrementalSyncSafeFrom( + Instant.ofEpochMilli(timestampAfterThirdCommit))); // Table doesn't have instant of this older commit, hence it is not safe. Instant instantAsOfHourAgo = Instant.now().minus(1, ChronoUnit.HOURS); assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));