This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new f9ee9489 Implement Paimon Source Incremental Sync (#780)
f9ee9489 is described below

commit f9ee9489592c37a60ebb3efd21f36b1f3b02f23e
Author: Mike Dias <[email protected]>
AuthorDate: Tue Jan 6 14:29:14 2026 +1100

    Implement Paimon Source Incremental Sync (#780)
    
    * Implement Paimon Incremental Sync
    
    * Spotless apply
    
    * Code review improvements
    
    * Spotless apply
---
 .../xtable/paimon/PaimonConversionSource.java      | 100 ++++++++++++++++-
 .../xtable/paimon/PaimonDataFileExtractor.java     |  81 ++++++++++++-
 .../org/apache/xtable/ITConversionController.java  |   3 -
 .../xtable/paimon/TestPaimonConversionSource.java  | 125 +++++++++++++++++----
 .../xtable/paimon/TestPaimonDataFileExtractor.java |  98 ++++++++++++++++
 5 files changed, 378 insertions(+), 29 deletions(-)

diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
index 1ef6dd99..f57c625d 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
@@ -20,6 +20,9 @@ package org.apache.xtable.paimon;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import lombok.extern.log4j.Log4j2;
@@ -36,6 +39,7 @@ import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.spi.extractor.ConversionSource;
@@ -97,8 +101,6 @@ public class PaimonConversionSource implements 
ConversionSource<Snapshot> {
         .table(internalTable)
         .version(Long.toString(snapshot.timeMillis()))
         .partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
-        // TODO : Implement pending commits extraction, required for 
incremental sync
-        // https://github.com/apache/incubator-xtable/issues/754
         .sourceIdentifier(getCommitIdentifier(snapshot))
         .build();
   }
@@ -114,18 +116,106 @@ public class PaimonConversionSource implements 
ConversionSource<Snapshot> {
 
   @Override
   public TableChange getTableChangeForCommit(Snapshot snapshot) {
-    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+    InternalTable tableAtSnapshot = getTable(snapshot);
+    InternalSchema internalSchema = tableAtSnapshot.getReadSchema();
+
+    InternalFilesDiff filesDiff =
+        dataFileExtractor.extractFilesDiff(paimonTable, snapshot, 
internalSchema);
+
+    return TableChange.builder()
+        .tableAsOfChange(tableAtSnapshot)
+        .filesDiff(filesDiff)
+        .sourceIdentifier(getCommitIdentifier(snapshot))
+        .build();
   }
 
   @Override
   public CommitsBacklog<Snapshot> getCommitsBacklog(
       InstantsForIncrementalSync instantsForIncrementalSync) {
-    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+    Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
+    long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();
+
+    log.info(
+        "Getting commits backlog for Paimon table {} from instant {}",
+        paimonTable.name(),
+        lastSyncInstant);
+
+    Iterator<Snapshot> snapshotIterator;
+    try {
+      snapshotIterator = snapshotManager.snapshots();
+    } catch (IOException e) {
+      throw new ReadException("Could not iterate over the Paimon snapshot 
list", e);
+    }
+
+    List<Snapshot> snapshotsToProcess = new ArrayList<>();
+    while (snapshotIterator.hasNext()) {
+      Snapshot snapshot = snapshotIterator.next();
+      // Only include snapshots committed after the last sync
+      if (snapshot.timeMillis() > lastSyncTimeMillis) {
+        snapshotsToProcess.add(snapshot);
+        log.debug(
+            "Including snapshot {} (time={}, commitId={}) in backlog",
+            snapshot.id(),
+            snapshot.timeMillis(),
+            snapshot.commitIdentifier());
+      }
+    }
+
+    log.info("Found {} snapshots to process for incremental sync", 
snapshotsToProcess.size());
+
+    return CommitsBacklog.<Snapshot>builder()
+        .commitsToProcess(snapshotsToProcess)
+        .inFlightInstants(Collections.emptyList())
+        .build();
   }
 
   @Override
   public boolean isIncrementalSyncSafeFrom(Instant instant) {
-    return false; // Incremental sync is not supported yet
+    long timeInMillis = instant.toEpochMilli();
+
+    Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+    Long latestSnapshotId = snapshotManager.latestSnapshotId();
+    if (earliestSnapshotId == null || latestSnapshotId == null) {
+      log.warn("No snapshots found in table {}", paimonTable.name());
+      return false;
+    }
+
+    Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
+    Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+
+    // Check 1: If instant is in the future (after latest snapshot), return 
false
+    if (timeInMillis > latestSnapshot.timeMillis()) {
+      log.warn(
+          "Instant {} is in the future. Latest snapshot {} has time {}",
+          instant,
+          latestSnapshot.id(),
+          latestSnapshot.timeMillis());
+      return false;
+    }
+
+    // Check 2: Has snapshot expiration affected this instant?
+    // If the earliest snapshot is after the requested instant,
+    // then snapshots have been expired and we can't do incremental sync
+    if (earliestSnapshot.timeMillis() > timeInMillis) {
+      log.warn(
+          "Incremental sync is not safe from instant {}. "
+              + "Earliest available snapshot {} (time={}) is newer than the 
requested instant. "
+              + "Snapshots may have been expired.",
+          instant,
+          earliestSnapshot.id(),
+          earliestSnapshot.timeMillis());
+      return false;
+    }
+
+    // Check 3: Verify a snapshot exists at or before the instant
+    if (earliestSnapshot.timeMillis() <= timeInMillis) {
+      log.info(
+          "Incremental sync is safe from instant {} for table {}", instant, 
paimonTable.name());
+      return true;
+    }
+
+    log.warn("No snapshot found at or before instant {} for table {}", 
instant, paimonTable.name());
+    return false;
   }
 
   @Override
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
index 68ccfc3e..4555b0cf 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -18,18 +18,32 @@
  
 package org.apache.xtable.paimon;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import lombok.extern.log4j.Log4j2;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.stat.ColumnStat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 
+@Log4j2
 public class PaimonDataFileExtractor {
 
   private final PaimonPartitionExtractor partitionExtractor =
@@ -52,7 +66,16 @@ public class PaimonDataFileExtractor {
     return result;
   }
 
-  private InternalDataFile toInternalDataFile(
+  /**
+   * Converts a Paimon ManifestEntry to an InternalDataFile. This method is 
used for both full
+   * snapshot reads and incremental sync.
+   *
+   * @param table the Paimon table
+   * @param entry the manifest entry representing a data file
+   * @param internalSchema the internal schema for partition value extraction
+   * @return InternalDataFile representation
+   */
+  public InternalDataFile toInternalDataFile(
       FileStoreTable table, ManifestEntry entry, InternalSchema 
internalSchema) {
     return InternalDataFile.builder()
         .physicalPath(toFullPhysicalPath(table, entry))
@@ -84,6 +107,60 @@ public class PaimonDataFileExtractor {
     return Collections.emptyList();
   }
 
+  /**
+   * Extracts file changes (added and removed files) from delta manifests for 
a given snapshot. This
+   * method reads only the delta manifests which contain the changes 
introduced in this specific
+   * snapshot, making it efficient for incremental sync.
+   *
+   * @param table the Paimon table
+   * @param snapshot the snapshot to extract changes from
+   * @param internalSchema the internal schema for partition value extraction
+   * @return InternalFilesDiff containing added and removed files
+   */
+  public InternalFilesDiff extractFilesDiff(
+      FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {
+
+    ManifestList manifestList = table.store().manifestListFactory().create();
+    ManifestFile manifestFile = table.store().manifestFileFactory().create();
+
+    // Read delta manifests - these contain only the changes in this snapshot
+    List<ManifestFileMeta> deltaManifests = 
manifestList.readDeltaManifests(snapshot);
+    log.debug("Found {} delta manifests for snapshot {}", 
deltaManifests.size(), snapshot.id());
+
+    Set<InternalDataFile> addedFiles = new HashSet<>();
+    Set<InternalDataFile> removedFiles = new HashSet<>();
+
+    // For primary key tables, only consider top-level files (fully compacted)
+    int topLevel = table.coreOptions().numLevels() - 1;
+    boolean hasPrimaryKeys = !table.schema().primaryKeys().isEmpty();
+
+    for (ManifestFileMeta manifestMeta : deltaManifests) {
+      List<ManifestEntry> entries = manifestFile.read(manifestMeta.fileName());
+      log.debug("Processing {} manifest entries from {}", entries.size(), 
manifestMeta.fileName());
+
+      for (ManifestEntry entry : entries) {
+        if (hasPrimaryKeys && entry.file().level() != topLevel) {
+          continue;
+        }
+
+        InternalDataFile dataFile = toInternalDataFile(table, entry, 
internalSchema);
+        if (entry.kind() == FileKind.ADD) {
+          addedFiles.add(dataFile);
+        } else if (entry.kind() == FileKind.DELETE) {
+          removedFiles.add(dataFile);
+        }
+      }
+    }
+
+    log.info(
+        "Snapshot {} has {} files added and {} files removed",
+        snapshot.id(),
+        addedFiles.size(),
+        removedFiles.size());
+
+    return 
InternalFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+  }
+
   private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot 
snapshot) {
     // If the table has primary keys, we read only the top level files
     // which means we can only consider fully compacted files.
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 019f69ea..2da3078b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -149,9 +149,6 @@ public class ITConversionController {
     List<Arguments> arguments = new ArrayList<>();
     for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
       for (SyncMode syncMode : SyncMode.values()) {
-        if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
-          continue; // Paimon does not support incremental sync yet
-
         for (boolean isPartitioned : new boolean[] {true, false}) {
           arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
         }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
index 4d8f8c2b..5e28e010 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -34,9 +34,11 @@ import org.junit.jupiter.api.io.TempDir;
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestPaimonTable;
 import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.CommitsBacklog;
 import org.apache.xtable.model.InstantsForIncrementalSync;
 import org.apache.xtable.model.InternalSnapshot;
 import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.TableChange;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
@@ -165,38 +167,123 @@ public class TestPaimonConversionSource {
   }
 
   @Test
-  void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+  void testGetCommitsBacklogReturnsCommitsAfterLastSync() {
+    // Insert initial data to create first snapshot
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    // Insert more data to create second snapshot
     testTable.insertRows(3);
-    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
+    assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
 
-    UnsupportedOperationException exception =
-        assertThrows(
-            UnsupportedOperationException.class,
-            () -> conversionSource.getTableChangeForCommit(snapshot));
+    // Get commits backlog from first snapshot time
+    InstantsForIncrementalSync instantsForSync =
+        InstantsForIncrementalSync.builder()
+            .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis()))
+            .build();
+
+    CommitsBacklog<Snapshot> backlog = 
conversionSource.getCommitsBacklog(instantsForSync);
+
+    // Verify we get at least the second snapshot (may get more if insertRows 
creates multiple)
+    assertNotNull(backlog);
+    assertTrue(backlog.getCommitsToProcess().size() >= 1);
+
+    // Verify the last snapshot in the backlog is the second snapshot
+    assertEquals(
+        secondSnapshot.id(),
+        backlog.getCommitsToProcess().get(backlog.getCommitsToProcess().size() 
- 1).id());
+
+    // Verify the first snapshot is NOT in the list of commits to process
+    assertFalse(
+        backlog.getCommitsToProcess().stream()
+            .anyMatch(snapshot -> snapshot.id() == firstSnapshot.id()),
+        "First snapshot should not be in the backlog since we're syncing from 
that instant");
+    assertTrue(backlog.getInFlightInstants().isEmpty());
+  }
 
-    assertEquals("Incremental Sync is not supported yet.", 
exception.getMessage());
+  @Test
+  void testGetCommitsBacklogReturnsEmptyForFutureInstant() {
+    testTable.insertRows(5);
+
+    // Use a future instant
+    InstantsForIncrementalSync instantsForSync =
+        InstantsForIncrementalSync.builder()
+            .lastSyncInstant(Instant.now().plusSeconds(3600))
+            .build();
+
+    CommitsBacklog<Snapshot> backlog = 
conversionSource.getCommitsBacklog(instantsForSync);
+
+    // Verify no snapshots are returned
+    assertNotNull(backlog);
+    assertTrue(backlog.getCommitsToProcess().isEmpty());
   }
 
   @Test
-  void testGetCommitsBacklogThrowsUnsupportedOperationException() {
-    InstantsForIncrementalSync mockInstants =
-        
InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build();
+  void testGetTableChangeForCommitReturnsCorrectFilesDiff() {
+    // Insert initial data
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    // Insert more data to create second snapshot
+    testTable.insertRows(3);
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
 
-    UnsupportedOperationException exception =
-        assertThrows(
-            UnsupportedOperationException.class,
-            () -> conversionSource.getCommitsBacklog(mockInstants));
+    // Get table change for second snapshot
+    TableChange tableChange = 
conversionSource.getTableChangeForCommit(secondSnapshot);
 
-    assertEquals("Incremental Sync is not supported yet.", 
exception.getMessage());
+    // Verify table change structure
+    assertNotNull(tableChange);
+    assertNotNull(tableChange.getFilesDiff());
+    assertNotNull(tableChange.getTableAsOfChange());
+    assertEquals(
+        Long.toString(secondSnapshot.commitIdentifier()), 
tableChange.getSourceIdentifier());
+
+    // For append-only table, we should have added files and no removed files
+    assertTrue(tableChange.getFilesDiff().getFilesAdded().size() > 0);
+  }
+
+  @Test
+  void testIsIncrementalSyncSafeFromReturnsTrueForValidInstant() {
+    testTable.insertRows(5);
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+    Instant snapshotTime = Instant.ofEpochMilli(snapshot.timeMillis());
+
+    assertTrue(conversionSource.isIncrementalSyncSafeFrom(snapshotTime));
+  }
+
+  @Test
+  void testIsIncrementalSyncSafeFromReturnsFalseForFutureInstant() {
+    testTable.insertRows(5);
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+    // Use an instant way in the future (well after the snapshot)
+    Instant futureInstant = 
Instant.ofEpochMilli(snapshot.timeMillis()).plusSeconds(3600);
+
+    assertFalse(conversionSource.isIncrementalSyncSafeFrom(futureInstant));
+  }
+
+  @Test
+  void testIsIncrementalSyncSafeFromReturnsFalseForEmptyTable() {
+    // Don't insert any data
+    Instant someInstant = Instant.now();
+
+    assertFalse(conversionSource.isIncrementalSyncSafeFrom(someInstant));
   }
 
   @Test
-  void testIsIncrementalSyncSafeFromReturnsFalse() {
-    Instant testInstant = Instant.now();
+  void 
testIsIncrementalSyncSafeFromReturnsFalseForInstantBeforeFirstSnapshot() {
+    testTable.insertRows(5);
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
 
-    boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant);
+    Instant instantBeforeFirstSnapshot =
+        Instant.ofEpochMilli(snapshot.timeMillis()).minusSeconds(3600);
 
-    assertFalse(result);
+    
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantBeforeFirstSnapshot));
   }
 
   @Test
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
index 9f906516..0f3ed30d 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.table.FileStoreTable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -37,6 +38,7 @@ import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
 
 public class TestPaimonDataFileExtractor {
   private static final PaimonDataFileExtractor extractor = 
PaimonDataFileExtractor.getInstance();
@@ -147,6 +149,102 @@ public class TestPaimonDataFileExtractor {
     }
   }
 
+  @Test
+  void testExtractFilesDiffWithNewFiles() {
+    createUnpartitionedTable();
+
+    // Insert initial data
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    // Insert more data to create a second snapshot
+    testTable.insertRows(3);
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
+
+    InternalFilesDiff filesDiff =
+        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+    // Verify we have replaced the single file on this setup
+    assertNotNull(filesDiff);
+    assertNotNull(filesDiff.getFilesAdded());
+    assertEquals(1, filesDiff.getFilesAdded().size());
+    // Note: Even for inserts, Paimon tables with primary keys (which all test 
tables have)
+    // may have removed files due to compaction. The compaction merges files, 
so old files are
+    // removed
+    // and new compacted files are added. This is expected behavior.
+    assertNotNull(filesDiff.getFilesRemoved());
+    assertEquals(1, filesDiff.getFilesRemoved().size());
+  }
+
+  @Test
+  void testExtractFilesDiffWithPartitionedTable() {
+    createPartitionedTable();
+
+    // Insert initial data
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    // Insert more data
+    testTable.insertRows(3);
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
+
+    InternalFilesDiff filesDiff =
+        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+    // Verify we have added files with partition values
+    assertNotNull(filesDiff);
+    assertTrue(filesDiff.getFilesAdded().size() > 0);
+
+    for (InternalDataFile file : filesDiff.dataFilesAdded()) {
+      assertNotNull(file.getPartitionValues());
+    }
+  }
+
+  @Test
+  void testExtractFilesDiffWithTableWithPrimaryKeys() {
+    createTableWithPrimaryKeys();
+
+    // Insert initial data
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    // Insert more data to create compaction
+    testTable.insertRows(3);
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
+
+    InternalFilesDiff filesDiff =
+        extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+    // Verify the diff is returned (size may vary based on compaction)
+    assertNotNull(filesDiff);
+    assertNotNull(filesDiff.getFilesAdded());
+    assertNotNull(filesDiff.getFilesRemoved());
+  }
+
+  @Test
+  void testExtractFilesDiffForFirstSnapshot() {
+    createUnpartitionedTable();
+
+    // Insert data to create first snapshot
+    testTable.insertRows(5);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    InternalFilesDiff filesDiff =
+        extractor.extractFilesDiff(paimonTable, firstSnapshot, testSchema);
+
+    // First snapshot should only have added files
+    assertNotNull(filesDiff);
+    assertTrue(filesDiff.getFilesAdded().size() > 0);
+    assertEquals(0, filesDiff.getFilesRemoved().size());
+  }
+
   private void createUnpartitionedTable() {
     testTable =
         (TestPaimonTable)

Reply via email to