This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new f9ee9489 Implement Paimon Source Incremental Sync (#780)
f9ee9489 is described below
commit f9ee9489592c37a60ebb3efd21f36b1f3b02f23e
Author: Mike Dias <[email protected]>
AuthorDate: Tue Jan 6 14:29:14 2026 +1100
Implement Paimon Source Incremental Sync (#780)
* Implement Paimon Incremental Sync
* Spotless apply
* Code review improvements
* Spotless apply
---
.../xtable/paimon/PaimonConversionSource.java | 100 ++++++++++++++++-
.../xtable/paimon/PaimonDataFileExtractor.java | 81 ++++++++++++-
.../org/apache/xtable/ITConversionController.java | 3 -
.../xtable/paimon/TestPaimonConversionSource.java | 125 +++++++++++++++++----
.../xtable/paimon/TestPaimonDataFileExtractor.java | 98 ++++++++++++++++
5 files changed, 378 insertions(+), 29 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
index 1ef6dd99..f57c625d 100644
---
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
@@ -20,6 +20,9 @@ package org.apache.xtable.paimon;
import java.io.IOException;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import lombok.extern.log4j.Log4j2;
@@ -36,6 +39,7 @@ import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;
@@ -97,8 +101,6 @@ public class PaimonConversionSource implements
ConversionSource<Snapshot> {
.table(internalTable)
.version(Long.toString(snapshot.timeMillis()))
.partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
- // TODO : Implement pending commits extraction, required for
incremental sync
- // https://github.com/apache/incubator-xtable/issues/754
.sourceIdentifier(getCommitIdentifier(snapshot))
.build();
}
@@ -114,18 +116,106 @@ public class PaimonConversionSource implements
ConversionSource<Snapshot> {
@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ InternalTable tableAtSnapshot = getTable(snapshot);
+ InternalSchema internalSchema = tableAtSnapshot.getReadSchema();
+
+ InternalFilesDiff filesDiff =
+ dataFileExtractor.extractFilesDiff(paimonTable, snapshot,
internalSchema);
+
+ return TableChange.builder()
+ .tableAsOfChange(tableAtSnapshot)
+ .filesDiff(filesDiff)
+ .sourceIdentifier(getCommitIdentifier(snapshot))
+ .build();
}
@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {
- throw new UnsupportedOperationException("Incremental Sync is not supported
yet.");
+ Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
+ long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();
+
+ log.info(
+ "Getting commits backlog for Paimon table {} from instant {}",
+ paimonTable.name(),
+ lastSyncInstant);
+
+ Iterator<Snapshot> snapshotIterator;
+ try {
+ snapshotIterator = snapshotManager.snapshots();
+ } catch (IOException e) {
+ throw new ReadException("Could not iterate over the Paimon snapshot
list", e);
+ }
+
+ List<Snapshot> snapshotsToProcess = new ArrayList<>();
+ while (snapshotIterator.hasNext()) {
+ Snapshot snapshot = snapshotIterator.next();
+ // Only include snapshots committed after the last sync
+ if (snapshot.timeMillis() > lastSyncTimeMillis) {
+ snapshotsToProcess.add(snapshot);
+ log.debug(
+ "Including snapshot {} (time={}, commitId={}) in backlog",
+ snapshot.id(),
+ snapshot.timeMillis(),
+ snapshot.commitIdentifier());
+ }
+ }
+
+ log.info("Found {} snapshots to process for incremental sync",
snapshotsToProcess.size());
+
+ return CommitsBacklog.<Snapshot>builder()
+ .commitsToProcess(snapshotsToProcess)
+ .inFlightInstants(Collections.emptyList())
+ .build();
}
@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
- return false; // Incremental sync is not supported yet
+ long timeInMillis = instant.toEpochMilli();
+
+ Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (earliestSnapshotId == null || latestSnapshotId == null) {
+ log.warn("No snapshots found in table {}", paimonTable.name());
+ return false;
+ }
+
+ Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
+ Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);
+
+ // Check 1: If instant is in the future (after latest snapshot), return
false
+ if (timeInMillis > latestSnapshot.timeMillis()) {
+ log.warn(
+ "Instant {} is in the future. Latest snapshot {} has time {}",
+ instant,
+ latestSnapshot.id(),
+ latestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 2: Has snapshot expiration affected this instant?
+ // If the earliest snapshot is after the requested instant,
+ // then snapshots have been expired and we can't do incremental sync
+ if (earliestSnapshot.timeMillis() > timeInMillis) {
+ log.warn(
+ "Incremental sync is not safe from instant {}. "
+ + "Earliest available snapshot {} (time={}) is newer than the
requested instant. "
+ + "Snapshots may have been expired.",
+ instant,
+ earliestSnapshot.id(),
+ earliestSnapshot.timeMillis());
+ return false;
+ }
+
+ // Check 3: Verify a snapshot exists at or before the instant
+ if (earliestSnapshot.timeMillis() <= timeInMillis) {
+ log.info(
+ "Incremental sync is safe from instant {} for table {}", instant,
paimonTable.name());
+ return true;
+ }
+
+ log.warn("No snapshot found at or before instant {} for table {}",
instant, paimonTable.name());
+ return false;
}
@Override
diff --git
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
index 68ccfc3e..4555b0cf 100644
---
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -18,18 +18,32 @@
package org.apache.xtable.paimon;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import lombok.extern.log4j.Log4j2;
import org.apache.paimon.Snapshot;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
+@Log4j2
public class PaimonDataFileExtractor {
private final PaimonPartitionExtractor partitionExtractor =
@@ -52,7 +66,16 @@ public class PaimonDataFileExtractor {
return result;
}
- private InternalDataFile toInternalDataFile(
+ /**
+ * Converts a Paimon ManifestEntry to an InternalDataFile. This method is
used for both full
+ * snapshot reads and incremental sync.
+ *
+ * @param table the Paimon table
+ * @param entry the manifest entry representing a data file
+ * @param internalSchema the internal schema for partition value extraction
+ * @return InternalDataFile representation
+ */
+ public InternalDataFile toInternalDataFile(
FileStoreTable table, ManifestEntry entry, InternalSchema
internalSchema) {
return InternalDataFile.builder()
.physicalPath(toFullPhysicalPath(table, entry))
@@ -84,6 +107,60 @@ public class PaimonDataFileExtractor {
return Collections.emptyList();
}
+ /**
+ * Extracts file changes (added and removed files) from delta manifests for
a given snapshot. This
+ * method reads only the delta manifests which contain the changes
introduced in this specific
+ * snapshot, making it efficient for incremental sync.
+ *
+ * @param table the Paimon table
+ * @param snapshot the snapshot to extract changes from
+ * @param internalSchema the internal schema for partition value extraction
+ * @return InternalFilesDiff containing added and removed files
+ */
+ public InternalFilesDiff extractFilesDiff(
+ FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {
+
+ ManifestList manifestList = table.store().manifestListFactory().create();
+ ManifestFile manifestFile = table.store().manifestFileFactory().create();
+
+ // Read delta manifests - these contain only the changes in this snapshot
+ List<ManifestFileMeta> deltaManifests =
manifestList.readDeltaManifests(snapshot);
+ log.debug("Found {} delta manifests for snapshot {}",
deltaManifests.size(), snapshot.id());
+
+ Set<InternalDataFile> addedFiles = new HashSet<>();
+ Set<InternalDataFile> removedFiles = new HashSet<>();
+
+ // For primary key tables, only consider top-level files (fully compacted)
+ int topLevel = table.coreOptions().numLevels() - 1;
+ boolean hasPrimaryKeys = !table.schema().primaryKeys().isEmpty();
+
+ for (ManifestFileMeta manifestMeta : deltaManifests) {
+ List<ManifestEntry> entries = manifestFile.read(manifestMeta.fileName());
+ log.debug("Processing {} manifest entries from {}", entries.size(),
manifestMeta.fileName());
+
+ for (ManifestEntry entry : entries) {
+ if (hasPrimaryKeys && entry.file().level() != topLevel) {
+ continue;
+ }
+
+ InternalDataFile dataFile = toInternalDataFile(table, entry,
internalSchema);
+ if (entry.kind() == FileKind.ADD) {
+ addedFiles.add(dataFile);
+ } else if (entry.kind() == FileKind.DELETE) {
+ removedFiles.add(dataFile);
+ }
+ }
+ }
+
+ log.info(
+ "Snapshot {} has {} files added and {} files removed",
+ snapshot.id(),
+ addedFiles.size(),
+ removedFiles.size());
+
+ return
InternalFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
+ }
+
private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot
snapshot) {
// If the table has primary keys, we read only the top level files
// which means we can only consider fully compacted files.
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 019f69ea..2da3078b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -149,9 +149,6 @@ public class ITConversionController {
List<Arguments> arguments = new ArrayList<>();
for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
for (SyncMode syncMode : SyncMode.values()) {
- if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
- continue; // Paimon does not support incremental sync yet
-
for (boolean isPartitioned : new boolean[] {true, false}) {
arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
index 4d8f8c2b..5e28e010 100644
---
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -34,9 +34,11 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.xtable.GenericTable;
import org.apache.xtable.TestPaimonTable;
import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
@@ -165,38 +167,123 @@ public class TestPaimonConversionSource {
}
@Test
- void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+ void testGetCommitsBacklogReturnsCommitsAfterLastSync() {
+ // Insert initial data to create first snapshot
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create second snapshot
testTable.insertRows(3);
- Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+ assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getTableChangeForCommit(snapshot));
+ // Get commits backlog from first snapshot time
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis()))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify we get at least the second snapshot (may get more if insertRows
creates multiple)
+ assertNotNull(backlog);
+ assertTrue(backlog.getCommitsToProcess().size() >= 1);
+
+ // Verify the last snapshot in the backlog is the second snapshot
+ assertEquals(
+ secondSnapshot.id(),
+ backlog.getCommitsToProcess().get(backlog.getCommitsToProcess().size()
- 1).id());
+
+ // Verify the first snapshot is NOT in the list of commits to process
+ assertFalse(
+ backlog.getCommitsToProcess().stream()
+ .anyMatch(snapshot -> snapshot.id() == firstSnapshot.id()),
+ "First snapshot should not be in the backlog since we're syncing from
that instant");
+ assertTrue(backlog.getInFlightInstants().isEmpty());
+ }
- assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ @Test
+ void testGetCommitsBacklogReturnsEmptyForFutureInstant() {
+ testTable.insertRows(5);
+
+ // Use a future instant
+ InstantsForIncrementalSync instantsForSync =
+ InstantsForIncrementalSync.builder()
+ .lastSyncInstant(Instant.now().plusSeconds(3600))
+ .build();
+
+ CommitsBacklog<Snapshot> backlog =
conversionSource.getCommitsBacklog(instantsForSync);
+
+ // Verify no snapshots are returned
+ assertNotNull(backlog);
+ assertTrue(backlog.getCommitsToProcess().isEmpty());
}
@Test
- void testGetCommitsBacklogThrowsUnsupportedOperationException() {
- InstantsForIncrementalSync mockInstants =
-
InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build();
+ void testGetTableChangeForCommitReturnsCorrectFilesDiff() {
+ // Insert initial data
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create second snapshot
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
- UnsupportedOperationException exception =
- assertThrows(
- UnsupportedOperationException.class,
- () -> conversionSource.getCommitsBacklog(mockInstants));
+ // Get table change for second snapshot
+ TableChange tableChange =
conversionSource.getTableChangeForCommit(secondSnapshot);
- assertEquals("Incremental Sync is not supported yet.",
exception.getMessage());
+ // Verify table change structure
+ assertNotNull(tableChange);
+ assertNotNull(tableChange.getFilesDiff());
+ assertNotNull(tableChange.getTableAsOfChange());
+ assertEquals(
+ Long.toString(secondSnapshot.commitIdentifier()),
tableChange.getSourceIdentifier());
+
+ // For append-only table, we should have added files and no removed files
+ assertTrue(tableChange.getFilesDiff().getFilesAdded().size() > 0);
+ }
+
+ @Test
+ void testIsIncrementalSyncSafeFromReturnsTrueForValidInstant() {
+ testTable.insertRows(5);
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+ Instant snapshotTime = Instant.ofEpochMilli(snapshot.timeMillis());
+
+ assertTrue(conversionSource.isIncrementalSyncSafeFrom(snapshotTime));
+ }
+
+ @Test
+ void testIsIncrementalSyncSafeFromReturnsFalseForFutureInstant() {
+ testTable.insertRows(5);
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+ // Use an instant way in the future (well after the snapshot)
+ Instant futureInstant =
Instant.ofEpochMilli(snapshot.timeMillis()).plusSeconds(3600);
+
+ assertFalse(conversionSource.isIncrementalSyncSafeFrom(futureInstant));
+ }
+
+ @Test
+ void testIsIncrementalSyncSafeFromReturnsFalseForEmptyTable() {
+ // Don't insert any data
+ Instant someInstant = Instant.now();
+
+ assertFalse(conversionSource.isIncrementalSyncSafeFrom(someInstant));
}
@Test
- void testIsIncrementalSyncSafeFromReturnsFalse() {
- Instant testInstant = Instant.now();
+ void
testIsIncrementalSyncSafeFromReturnsFalseForInstantBeforeFirstSnapshot() {
+ testTable.insertRows(5);
+ Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
- boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant);
+ Instant instantBeforeFirstSnapshot =
+ Instant.ofEpochMilli(snapshot.timeMillis()).minusSeconds(3600);
- assertFalse(result);
+
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantBeforeFirstSnapshot));
}
@Test
diff --git
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
index 9f906516..0f3ed30d 100644
---
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.table.FileStoreTable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -37,6 +38,7 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFilesDiff;
public class TestPaimonDataFileExtractor {
private static final PaimonDataFileExtractor extractor =
PaimonDataFileExtractor.getInstance();
@@ -147,6 +149,102 @@ public class TestPaimonDataFileExtractor {
}
}
+ @Test
+ void testExtractFilesDiffWithNewFiles() {
+ createUnpartitionedTable();
+
+ // Insert initial data
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create a second snapshot
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ InternalFilesDiff filesDiff =
+ extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+ // Verify we have replaced the single file on this setup
+ assertNotNull(filesDiff);
+ assertNotNull(filesDiff.getFilesAdded());
+ assertEquals(1, filesDiff.getFilesAdded().size());
+ // Note: Even for inserts, Paimon tables with primary keys (which all test
tables have)
+ // may have removed files due to compaction. The compaction merges files,
so old files are
+ // removed
+ // and new compacted files are added. This is expected behavior.
+ assertNotNull(filesDiff.getFilesRemoved());
+ assertEquals(1, filesDiff.getFilesRemoved().size());
+ }
+
+ @Test
+ void testExtractFilesDiffWithPartitionedTable() {
+ createPartitionedTable();
+
+ // Insert initial data
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ InternalFilesDiff filesDiff =
+ extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+ // Verify we have added files with partition values
+ assertNotNull(filesDiff);
+ assertTrue(filesDiff.getFilesAdded().size() > 0);
+
+ for (InternalDataFile file : filesDiff.dataFilesAdded()) {
+ assertNotNull(file.getPartitionValues());
+ }
+ }
+
+ @Test
+ void testExtractFilesDiffWithTableWithPrimaryKeys() {
+ createTableWithPrimaryKeys();
+
+ // Insert initial data
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ // Insert more data to create compaction
+ testTable.insertRows(3);
+ Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(secondSnapshot);
+
+ InternalFilesDiff filesDiff =
+ extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema);
+
+ // Verify the diff is returned (size may vary based on compaction)
+ assertNotNull(filesDiff);
+ assertNotNull(filesDiff.getFilesAdded());
+ assertNotNull(filesDiff.getFilesRemoved());
+ }
+
+ @Test
+ void testExtractFilesDiffForFirstSnapshot() {
+ createUnpartitionedTable();
+
+ // Insert data to create first snapshot
+ testTable.insertRows(5);
+ Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+ assertNotNull(firstSnapshot);
+
+ InternalFilesDiff filesDiff =
+ extractor.extractFilesDiff(paimonTable, firstSnapshot, testSchema);
+
+ // First snapshot should only have added files
+ assertNotNull(filesDiff);
+ assertTrue(filesDiff.getFilesAdded().size() > 0);
+ assertEquals(0, filesDiff.getFilesRemoved().size());
+ }
+
private void createUnpartitionedTable() {
testTable =
(TestPaimonTable)