This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 65714ffad6 [core] SnapshotReaderImpl.toChangesPlan should use snapshot 
for tags (#4841)
65714ffad6 is described below

commit 65714ffad6bbfc4604ef9904f20126438955994c
Author: jerry <[email protected]>
AuthorDate: Tue Jan 7 11:28:09 2025 +0800

    [core] SnapshotReaderImpl.toChangesPlan should use snapshot for tags (#4841)
---
 .../table/source/snapshot/SnapshotReaderImpl.java  | 10 ++++----
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 28 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index bf19ba10c6..43a6d3c872 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -391,14 +391,14 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                 groupByPartFiles(plan.files(FileKind.DELETE));
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
-
-        return toChangesPlan(true, plan, plan.snapshot().id() - 1, 
beforeFiles, dataFiles);
+        Snapshot beforeSnapshot = 
snapshotManager.snapshot(plan.snapshot().id() - 1);
+        return toChangesPlan(true, plan, beforeSnapshot, beforeFiles, 
dataFiles);
     }
 
     private Plan toChangesPlan(
             boolean isStreaming,
             FileStoreScan.Plan plan,
-            long beforeSnapshotId,
+            Snapshot beforeSnapshot,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
         Snapshot snapshot = plan.snapshot();
@@ -416,7 +416,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
beforDeletionIndexFilesMap =
                 deletionVectors
                         ? indexFileHandler.scan(
-                                beforeSnapshotId, DELETION_VECTORS_INDEX, 
beforeFiles.keySet())
+                                beforeSnapshot, DELETION_VECTORS_INDEX, 
beforeFiles.keySet())
                         : Collections.emptyMap();
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap =
                 deletionVectors
@@ -476,7 +476,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 groupByPartFiles(plan.files(FileKind.ADD));
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
                 
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
-        return toChangesPlan(false, plan, before.id(), beforeFiles, dataFiles);
+        return toChangesPlan(false, plan, before, beforeFiles, dataFiles);
     }
 
     private RecordComparator partitionComparator() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index d48b6e7712..d3108e3749 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -299,6 +299,34 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
     }
 
+    @Test
+    public void testIncrementBetweenReadWithSnapshotExpiration() throws 
Exception {
+        String tableName = "T";
+        batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111)", 
tableName));
+
+        paimonTable(tableName).createTag("tag1", 1);
+
+        batchSql(String.format("INSERT INTO %s VALUES (2, 22, 222)", 
tableName));
+        paimonTable(tableName).createTag("tag2", 2);
+        batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)", 
tableName));
+        paimonTable(tableName).createTag("tag3", 3);
+
+        // expire snapshot 1
+        Map<String, String> expireOptions = new HashMap<>();
+        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+        expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+        FileStoreTable table = (FileStoreTable) paimonTable(tableName);
+        table.copy(expireOptions).newCommit("").expireSnapshots();
+        assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s /*+ 
OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' = 
'true') */",
+                                        tableName)))
+                .containsExactlyInAnyOrder(Row.of(2, 22, 222));
+    }
+
     @Test
     public void testSortSpillMerge() {
         sql(

Reply via email to