This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 23d870ed Modified incremental safe check which does not depend on 
snapshots list order Co-authored-by: Timothy Brown <t...@onehouse.ai>
23d870ed is described below

commit 23d870ede0ef54a28c0cfdc65908bc74fcffe3ff
Author: rbokka <rbo...@microsoft.com>
AuthorDate: Fri Mar 7 08:55:42 2025 -0500

    Modified incremental safe check which does not depend on snapshots list 
order
    Co-authored-by: Timothy Brown <t...@onehouse.ai>
---
 .../xtable/iceberg/IcebergConversionSource.java    | 35 ++++++++----------
 .../java/org/apache/xtable/TestIcebergTable.java   |  2 +-
 .../xtable/iceberg/ITIcebergConversionSource.java  | 41 ++++++++++++++++------
 3 files changed, 45 insertions(+), 33 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 5f210f1f..d4a506e7 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -246,31 +246,24 @@ public class IcebergConversionSource implements 
ConversionSource<Snapshot> {
   public boolean isIncrementalSyncSafeFrom(Instant instant) {
     long timeInMillis = instant.toEpochMilli();
     Table iceTable = getSourceTable();
-    boolean doesInstantOfAgeExists = false;
-    Long targetSnapshotId = null;
-    for (Snapshot snapshot : iceTable.snapshots()) {
-      if (snapshot.timestampMillis() <= timeInMillis) {
-        doesInstantOfAgeExists = true;
-        targetSnapshotId = snapshot.snapshotId();
-      } else {
-        break;
+    Snapshot currentSnapshot = iceTable.currentSnapshot();
+
+    while (currentSnapshot != null && currentSnapshot.timestampMillis() > 
timeInMillis) {
+      Long parentSnapshotId = currentSnapshot.parentId();
+      if (parentSnapshotId == null) {
+        // no more snapshots in the chain and did not find targetSnapshot
+        return false;
       }
-    }
-    if (!doesInstantOfAgeExists) {
-      return false;
-    }
-    // Go from latest snapshot until targetSnapshotId through parent reference.
-    // nothing has to be null in this chain to guarantee safety of incremental 
sync.
-    Long currentSnapshotId = iceTable.currentSnapshot().snapshotId();
-    while (currentSnapshotId != null && currentSnapshotId != targetSnapshotId) 
{
-      Snapshot currentSnapshot = iceTable.snapshot(currentSnapshotId);
-      if (currentSnapshot == null) {
-        // The snapshot is expired.
+
+      Snapshot parentSnapshot = iceTable.snapshot(parentSnapshotId);
+      if (parentSnapshot == null) {
+        // chain is broken due to expired snapshot
+        log.info("Expired snapshot id: {}", parentSnapshotId);
         return false;
       }
-      currentSnapshotId = currentSnapshot.parentId();
+      currentSnapshot = parentSnapshot;
     }
-    return true;
+    return currentSnapshot != null;
   }
 
   @Override
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index 0c8336fe..3b944340 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -307,7 +307,7 @@ public class TestIcebergTable implements 
GenericTable<Record, String> {
     return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
   }
 
-  public Long getLastCommitTimestamp() {
+  public long getLastCommitTimestamp() {
     return getLatestSnapshot().timestampMillis();
   }
 
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
index 210b6f9d..20026cb5 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java
@@ -146,7 +146,7 @@ public class ITIcebergConversionSource {
       List<TableChange> allTableChanges = new ArrayList<>();
 
       testIcebergTable.insertRows(50);
-      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      long timestamp1 = testIcebergTable.getLastCommitTimestamp();
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
       List<Record> records1 = testIcebergTable.insertRows(50);
@@ -204,7 +204,7 @@ public class ITIcebergConversionSource {
       List<TableChange> allTableChanges = new ArrayList<>();
 
       List<Record> records1 = testIcebergTable.insertRows(50);
-      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      long timestamp1 = testIcebergTable.getLastCommitTimestamp();
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
       List<Record> records2 = testIcebergTable.insertRows(50);
@@ -264,7 +264,7 @@ public class ITIcebergConversionSource {
       List<TableChange> allTableChanges = new ArrayList<>();
 
       List<Record> records1 = testIcebergTable.insertRows(50);
-      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      long timestamp1 = testIcebergTable.getLastCommitTimestamp();
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
 
       List<Record> records2 = testIcebergTable.insertRows(50);
@@ -325,7 +325,7 @@ public class ITIcebergConversionSource {
       List<TableChange> allTableChanges = new ArrayList<>();
 
       List<Record> records1 = testIcebergTable.insertRows(50);
-      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      long timestamp1 = testIcebergTable.getLastCommitTimestamp();
 
       testIcebergTable.upsertRows(records1.subList(0, 20));
       allActiveFiles.add(testIcebergTable.getAllActiveFiles());
@@ -383,8 +383,8 @@ public class ITIcebergConversionSource {
         TestIcebergTable.forStandardSchemaAndPartitioning(
             tableName, "level", tempDir, hadoopConf)) {
       // Insert 50 rows to INFO partition.
-      List<Record> commit1Rows = 
testIcebergTable.insertRecordsForPartition(50, "INFO");
-      Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
+      List<Record> firstCommitRows = 
testIcebergTable.insertRecordsForPartition(50, "INFO");
+      long timestampAfterFirstCommit = 
testIcebergTable.getLastCommitTimestamp();
       SourceTable tableConfig =
           SourceTable.builder()
               .name(testIcebergTable.getTableName())
@@ -393,23 +393,42 @@ public class ITIcebergConversionSource {
               .build();
 
       // Upsert all rows inserted before, so all files are replaced.
-      testIcebergTable.upsertRows(commit1Rows.subList(0, 50));
-      long snapshotIdAfterCommit2 = 
testIcebergTable.getLatestSnapshot().snapshotId();
+      testIcebergTable.upsertRows(firstCommitRows.subList(0, 50));
+      long timestampAfterSecondCommit = 
testIcebergTable.getLastCommitTimestamp();
+      long snapshotIdAfterSecondCommit = 
testIcebergTable.getLatestSnapshot().snapshotId();
 
       // Insert 50 rows to different (ERROR) partition.
       testIcebergTable.insertRecordsForPartition(50, "ERROR");
+      long timestampAfterThirdCommit = 
testIcebergTable.getLastCommitTimestamp();
 
       if (shouldExpireSnapshots) {
         // Expire snapshotAfterCommit2.
-        testIcebergTable.expireSnapshot(snapshotIdAfterCommit2);
+        testIcebergTable.expireSnapshot(snapshotIdAfterSecondCommit);
       }
       IcebergConversionSource conversionSource =
           sourceProvider.getConversionSourceInstance(tableConfig);
       if (shouldExpireSnapshots) {
-        
assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
+        // Since the second snapshot is expired, we cannot safely perform 
incremental sync from the
+        // first two commits
+        assertFalse(
+            conversionSource.isIncrementalSyncSafeFrom(
+                Instant.ofEpochMilli(timestampAfterFirstCommit)));
+        assertFalse(
+            conversionSource.isIncrementalSyncSafeFrom(
+                Instant.ofEpochMilli(timestampAfterSecondCommit)));
       } else {
-        
assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
+        // The full history is still present so incremental sync is safe from 
any of these commits
+        assertTrue(
+            conversionSource.isIncrementalSyncSafeFrom(
+                Instant.ofEpochMilli(timestampAfterFirstCommit)));
+        assertTrue(
+            conversionSource.isIncrementalSyncSafeFrom(
+                Instant.ofEpochMilli(timestampAfterSecondCommit)));
       }
+      // Table always has the last commit so incremental sync is safe
+      assertTrue(
+          conversionSource.isIncrementalSyncSafeFrom(
+              Instant.ofEpochMilli(timestampAfterThirdCommit)));
       // Table doesn't have instant of this older commit, hence it is not safe.
       Instant instantAsOfHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
       
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));

Reply via email to