mikedias commented on code in PR #780:
URL: https://github.com/apache/incubator-xtable/pull/780#discussion_r2659997725
##########
xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java:
##########
@@ -114,18 +116,125 @@ private Snapshot getLastSnapshot() {
@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ InternalTable tableAtSnapshot = getTable(snapshot);
+ InternalSchema internalSchema = tableAtSnapshot.getReadSchema();
+
+ InternalFilesDiff filesDiff =
+ dataFileExtractor.extractFilesDiff(paimonTable, snapshot,
internalSchema);
+
+ return TableChange.builder()
+ .tableAsOfChange(tableAtSnapshot)
+ .filesDiff(filesDiff)
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
}
@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
+ long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();
+
+ log.info(
+ "Getting commits backlog for Paimon table {} from instant {}",
+ paimonTable.name(),
+ lastSyncInstant);
+
+ Iterator<Snapshot> snapshotIterator;
+ try {
+ snapshotIterator = snapshotManager.snapshots();
+ } catch (IOException e) {
+ throw new ReadException("Could not iterate over the Paimon snapshot
list", e);
+ }
+
+ List<Snapshot> snapshotsToProcess = new ArrayList<>();
+ while (snapshotIterator.hasNext()) {
+ Snapshot snapshot = snapshotIterator.next();
+ // Only include snapshots committed after the last sync
+ if (snapshot.timeMillis() > lastSyncTimeMillis) {
+ snapshotsToProcess.add(snapshot);
+ log.debug(
+ "Including snapshot {} (time={}, commitId={}) in backlog",
+ snapshot.id(),
+ snapshot.timeMillis(),
+ snapshot.commitIdentifier());
+ }
+ }
+
+ log.info("Found {} snapshots to process for incremental sync",
snapshotsToProcess.size());
+
+ return CommitsBacklog.<Snapshot>builder()
+ .commitsToProcess(snapshotsToProcess)
+ .inFlightInstants(Collections.emptyList())
+ .build();
}
@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
- return false; // Incremental sync is not supported yet
+ long timeInMillis = instant.toEpochMilli();
+
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (earliestSnapshotId == null || latestSnapshotId == null) {
+ log.warn("No snapshots found in table {}", paimonTable.name());
+ return false;
+ }
+
+ Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
+ Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+
+ // Check 1: If instant is in the future (after latest snapshot), return
false
+ if (timeInMillis > latestSnapshot.timeMillis()) {
+ log.warn(
+ "Instant {} is in the future. Latest snapshot {} has time {}",
+ instant,
+ latestSnapshot.id(),
+ latestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 2: Has snapshot expiration affected this instant?
+ // If the earliest snapshot is newer than the requested instant,
+ // then snapshots have been expired and we can't do incremental sync
+ if (earliestSnapshot.timeMillis() > timeInMillis) {
+ log.warn(
+ "Incremental sync is not safe from instant {}. "
+ + "Earliest available snapshot {} (time={}) is newer than the
requested instant. "
+ + "Snapshots may have been expired.",
+ instant,
+ earliestSnapshot.id(),
+ earliestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 3: Verify a snapshot exists at or before the instant
Review Comment:
done, much better now!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]