This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 65714ffad6 [core] SnapshotReaderImpl.toChangesPlan should use snapshot
for tags (#4841)
65714ffad6 is described below
commit 65714ffad6bbfc4604ef9904f20126438955994c
Author: jerry <[email protected]>
AuthorDate: Tue Jan 7 11:28:09 2025 +0800
[core] SnapshotReaderImpl.toChangesPlan should use snapshot for tags (#4841)
---
.../table/source/snapshot/SnapshotReaderImpl.java | 10 ++++----
.../apache/paimon/flink/BatchFileStoreITCase.java | 28 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index bf19ba10c6..43a6d3c872 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -391,14 +391,14 @@ public class SnapshotReaderImpl implements SnapshotReader
{
groupByPartFiles(plan.files(FileKind.DELETE));
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
-
- return toChangesPlan(true, plan, plan.snapshot().id() - 1,
beforeFiles, dataFiles);
+ Snapshot beforeSnapshot =
snapshotManager.snapshot(plan.snapshot().id() - 1);
+ return toChangesPlan(true, plan, beforeSnapshot, beforeFiles,
dataFiles);
}
private Plan toChangesPlan(
boolean isStreaming,
FileStoreScan.Plan plan,
- long beforeSnapshotId,
+ Snapshot beforeSnapshot,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
Snapshot snapshot = plan.snapshot();
@@ -416,7 +416,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
beforDeletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
- beforeSnapshotId, DELETION_VECTORS_INDEX,
beforeFiles.keySet())
+ beforeSnapshot, DELETION_VECTORS_INDEX,
beforeFiles.keySet())
: Collections.emptyMap();
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap =
deletionVectors
@@ -476,7 +476,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
groupByPartFiles(plan.files(FileKind.ADD));
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles =
groupByPartFiles(scan.withSnapshot(before).plan().files(FileKind.ADD));
- return toChangesPlan(false, plan, before.id(), beforeFiles, dataFiles);
+ return toChangesPlan(false, plan, before, beforeFiles, dataFiles);
}
private RecordComparator partitionComparator() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index d48b6e7712..d3108e3749 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -299,6 +299,34 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22,
222));
}
+ @Test
+ public void testIncrementBetweenReadWithSnapshotExpiration() throws
Exception {
+ String tableName = "T";
+ batchSql(String.format("INSERT INTO %s VALUES (1, 11, 111)",
tableName));
+
+ paimonTable(tableName).createTag("tag1", 1);
+
+ batchSql(String.format("INSERT INTO %s VALUES (2, 22, 222)",
tableName));
+ paimonTable(tableName).createTag("tag2", 2);
+ batchSql(String.format("INSERT INTO %s VALUES (3, 33, 333)",
tableName));
+ paimonTable(tableName).createTag("tag3", 3);
+
+ // expire snapshot 1
+ Map<String, String> expireOptions = new HashMap<>();
+ expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+ expireOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ FileStoreTable table = (FileStoreTable) paimonTable(tableName);
+ table.copy(expireOptions).newCommit("").expireSnapshots();
+ assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
+
+ assertThat(
+ batchSql(
+ String.format(
+ "SELECT * FROM %s /*+
OPTIONS('incremental-between' = 'tag1,tag2', 'deletion-vectors.enabled' =
'true') */",
+ tableName)))
+ .containsExactlyInAnyOrder(Row.of(2, 22, 222));
+ }
+
@Test
public void testSortSpillMerge() {
sql(