mikedias commented on code in PR #780:
URL: https://github.com/apache/incubator-xtable/pull/780#discussion_r2659997725


##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -114,18 +116,125 @@ private Snapshot getLastSnapshot() {
 
   @Override
   public TableChange getTableChangeForCommit(Snapshot snapshot) {
-    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+    InternalTable tableAtSnapshot = getTable(snapshot);
+    InternalSchema internalSchema = tableAtSnapshot.getReadSchema();
+
+    InternalFilesDiff filesDiff =
+        dataFileExtractor.extractFilesDiff(paimonTable, snapshot, 
internalSchema);
+
+    return TableChange.builder()
+        .tableAsOfChange(tableAtSnapshot)
+        .filesDiff(filesDiff)
+        .sourceIdentifier(getCommitIdentifier(snapshot))
+        .build();
   }
 
   @Override
   public CommitsBacklog<Snapshot> getCommitsBacklog(
       InstantsForIncrementalSync instantsForIncrementalSync) {
-    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+    Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
+    long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();
+
+    log.info(
+        "Getting commits backlog for Paimon table {} from instant {}",
+        paimonTable.name(),
+        lastSyncInstant);
+
+    Iterator<Snapshot> snapshotIterator;
+    try {
+      snapshotIterator = snapshotManager.snapshots();
+    } catch (IOException e) {
+      throw new ReadException("Could not iterate over the Paimon snapshot 
list", e);
+    }
+
+    List<Snapshot> snapshotsToProcess = new ArrayList<>();
+    while (snapshotIterator.hasNext()) {
+      Snapshot snapshot = snapshotIterator.next();
+      // Only include snapshots committed after the last sync
+      if (snapshot.timeMillis() > lastSyncTimeMillis) {
+        snapshotsToProcess.add(snapshot);
+        log.debug(
+            "Including snapshot {} (time={}, commitId={}) in backlog",
+            snapshot.id(),
+            snapshot.timeMillis(),
+            snapshot.commitIdentifier());
+      }
+    }
+
+    log.info("Found {} snapshots to process for incremental sync", 
snapshotsToProcess.size());
+
+    return CommitsBacklog.<Snapshot>builder()
+        .commitsToProcess(snapshotsToProcess)
+        .inFlightInstants(Collections.emptyList())
+        .build();
   }
 
   @Override
   public boolean isIncrementalSyncSafeFrom(Instant instant) {
-    return false; // Incremental sync is not supported yet
+    long timeInMillis = instant.toEpochMilli();
+
+    Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+    Long latestSnapshotId = snapshotManager.latestSnapshotId();
+    if (earliestSnapshotId == null || latestSnapshotId == null) {
+      log.warn("No snapshots found in table {}", paimonTable.name());
+      return false;
+    }
+
+    Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
+    Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+
+    // Check 1: If instant is in the future (after latest snapshot), return 
false
+    if (timeInMillis > latestSnapshot.timeMillis()) {
+      log.warn(
+          "Instant {} is in the future. Latest snapshot {} has time {}",
+          instant,
+          latestSnapshot.id(),
+          latestSnapshot.timeMillis());
+      return false;
+    }
+
+    // Check 2: Has snapshot expiration affected this instant?
+    // If the earliest snapshot is newer than the requested instant,
+    // then snapshots have been expired and we can't do incremental sync
+    if (earliestSnapshot.timeMillis() > timeInMillis) {
+      log.warn(
+          "Incremental sync is not safe from instant {}. "
+              + "Earliest available snapshot {} (time={}) is newer than the 
requested instant. "
+              + "Snapshots may have been expired.",
+          instant,
+          earliestSnapshot.id(),
+          earliestSnapshot.timeMillis());
+      return false;
+    }
+
+    // Check 3: Verify a snapshot exists at or before the instant

Review Comment:
   done, much better now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to