This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 73cf2aaaa8 [core] Incremental query should return empty result for 
some corner cases (#5339)
73cf2aaaa8 is described below

commit 73cf2aaaa82f48bc51a6465798b4ef42044ab8dd
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 25 13:27:09 2025 +0800

    [core] Incremental query should return empty result for some corner cases 
(#5339)
---
 .../paimon/table/source/AbstractDataTableScan.java | 27 ++++++--
 .../snapshot/IncrementalDeltaStartingScanner.java  | 38 +++---------
 .../snapshot/IncrementalDiffStartingScanner.java   | 18 +++---
 .../apache/paimon/table/IncrementalTableTest.java  | 72 +++++++++++++++++++++-
 .../IncrementalDeltaStartingScannerTest.java       | 19 ++----
 5 files changed, 113 insertions(+), 61 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 37cd0e378f..17a5b161b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -257,13 +257,28 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
                                     incrementalBetween.getLeft(), 
incrementalBetween.getRight()));
                 }
 
+                checkArgument(
+                        endId >= startId,
+                        "Ending snapshotId should >= starting snapshotId %s.",
+                        endId,
+                        startId);
+
+                if (snapshotManager.earliestSnapshot() == null) {
+                    LOG.warn("There is currently no snapshot. Waiting for 
snapshot generation.");
+                    return new EmptyResultStartingScanner(snapshotManager);
+                }
+
+                if (startId == endId) {
+                    return new EmptyResultStartingScanner(snapshotManager);
+                }
+
                 CoreOptions.IncrementalBetweenScanMode scanMode =
                         options.incrementalBetweenScanMode();
                 return scanMode == DIFF
                         ? IncrementalDiffStartingScanner.betweenSnapshotIds(
                                 startId, endId, snapshotManager)
-                        : new IncrementalDeltaStartingScanner(
-                                snapshotManager, startId, endId, 
toSnapshotScanMode(scanMode));
+                        : IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                                startId, endId, snapshotManager, 
toSnapshotScanMode(scanMode));
             }
         } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
             Pair<Long, Long> incrementalBetween = 
options.incrementalBetweenTimestamp();
@@ -277,11 +292,13 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
             long startTimestamp = incrementalBetween.getLeft();
             long endTimestamp = incrementalBetween.getRight();
             checkArgument(
-                    endTimestamp > startTimestamp,
-                    "Ending timestamp %s should be larger than starting 
timestamp %s.",
+                    endTimestamp >= startTimestamp,
+                    "Ending timestamp %s should be >= starting timestamp %s.",
                     endTimestamp,
                     startTimestamp);
-            if (startTimestamp > latestSnapshot.timeMillis()
+
+            if (startTimestamp == endTimestamp
+                    || startTimestamp > latestSnapshot.timeMillis()
                     || endTimestamp < earliestSnapshot.timeMillis()) {
                 return new EmptyResultStartingScanner(snapshotManager);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 6f18dde393..80a56e4f0c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -43,7 +43,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -72,11 +71,6 @@ public class IncrementalDeltaStartingScanner extends 
AbstractStartingScanner {
 
     @Override
     public Result scan(SnapshotReader reader) {
-        // Check the validity of scan staring snapshotId.
-        Optional<Result> checkResult = checkScanSnapshotIdValidity();
-        if (checkResult.isPresent()) {
-            return checkResult.get();
-        }
         Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
ConcurrentHashMap<>();
         ManifestsReader manifestsReader = reader.manifestsReader();
 
@@ -153,39 +147,23 @@ public class IncrementalDeltaStartingScanner extends 
AbstractStartingScanner {
         return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, 
result));
     }
 
-    /**
-     * Check the validity of staring snapshotId early.
-     *
-     * @return If the check passes return empty.
-     */
-    private Optional<Result> checkScanSnapshotIdValidity() {
-        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-
-        if (earliestSnapshotId == null || latestSnapshotId == null) {
-            LOG.warn("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return Optional.of(new NoSnapshot());
-        }
-
-        checkArgument(
-                startingSnapshotId <= endingSnapshotId,
-                "Starting snapshotId %s must less than ending snapshotId %s.",
-                startingSnapshotId,
-                endingSnapshotId);
+    public static StartingScanner betweenSnapshotIds(
+            long startId, long endId, SnapshotManager snapshotManager, 
ScanMode scanMode) {
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
 
         // because of the left open right closed rule of 
IncrementalStartingScanner that is
         // different from StaticFromStartingScanner, so we should allow 
starting snapshotId to be
         // equal to the earliestSnapshotId - 1.
         checkArgument(
-                startingSnapshotId >= earliestSnapshotId - 1
-                        && endingSnapshotId <= latestSnapshotId,
+                startId >= earliestSnapshotId - 1 && endId <= latestSnapshotId,
                 "The specified scan snapshotId range [%s, %s] is out of 
available snapshotId range [%s, %s].",
-                startingSnapshotId,
-                endingSnapshotId,
+                startId,
+                endId,
                 earliestSnapshotId,
                 latestSnapshotId);
 
-        return Optional.empty();
+        return new IncrementalDeltaStartingScanner(snapshotManager, startId, 
endId, scanMode);
     }
 
     public static IncrementalDeltaStartingScanner betweenTimestamps(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 942ee23ffa..37c6498e95 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -67,7 +67,7 @@ public class IncrementalDiffStartingScanner extends 
AbstractStartingScanner {
         return 
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
     }
 
-    public static IncrementalDiffStartingScanner betweenTags(
+    public static StartingScanner betweenTags(
             Tag startTag,
             Tag endTag,
             SnapshotManager snapshotManager,
@@ -82,24 +82,22 @@ public class IncrementalDiffStartingScanner extends 
AbstractStartingScanner {
                 end.id());
 
         checkArgument(
-                end.id() > start.id(),
-                "Tag end %s with snapshot id %s should be larger than tag 
start %s with snapshot id %s",
+                end.id() >= start.id(),
+                "Tag end %s with snapshot id %s should be >= tag start %s with 
snapshot id %s",
                 incrementalBetween.getRight(),
                 end.id(),
                 incrementalBetween.getLeft(),
                 start.id());
 
+        if (start.id() == end.id()) {
+            return new EmptyResultStartingScanner(snapshotManager);
+        }
+
         return new IncrementalDiffStartingScanner(snapshotManager, start, end);
     }
 
-    public static IncrementalDiffStartingScanner betweenSnapshotIds(
+    public static StartingScanner betweenSnapshotIds(
             long startId, long endId, SnapshotManager snapshotManager) {
-        checkArgument(
-                endId > startId,
-                "Ending snapshotId should be larger than starting snapshotId 
%s.",
-                endId,
-                startId);
-
         Snapshot start = snapshotManager.snapshot(startId);
         Snapshot end = snapshotManager.snapshot(endId);
         return new IncrementalDiffStartingScanner(snapshotManager, start, end);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 416f375bdc..4810e9f4f5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.Test;
@@ -42,6 +43,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
@@ -287,7 +289,7 @@ public class IncrementalTableTest extends TableTestBase {
 
         assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, 
"TAG2,TAG1")))
                 .hasMessageContaining(
-                        "Tag end TAG1 with snapshot id 1 should be larger than 
tag start TAG2 with snapshot id 2");
+                        "Tag end TAG1 with snapshot id 1 should be >= tag 
start TAG2 with snapshot id 2");
     }
 
     @Test
@@ -406,6 +408,74 @@ public class IncrementalTableTest extends TableTestBase {
                 .containsExactly(GenericRow.of(3, 
BinaryString.fromString("c")));
     }
 
+    @Test
+    public void testIncrementalEmptyResult() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .option("bucket", "1")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        // no snapshot
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,2"))).isEmpty();
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP, 
"2025-01-01,2025-01-02")))
+                .isEmpty();
+
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        SnapshotManager snapshotManager = table.snapshotManager();
+
+        write.write(GenericRow.of(1, BinaryString.fromString("a")));
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 0);
+        commit.commit(0, commitMessages);
+
+        write.write(GenericRow.of(2, BinaryString.fromString("b")));
+        commitMessages = write.prepareCommit(false, 1);
+        commit.commit(1, commitMessages);
+
+        table.createTag("tag1", 1);
+
+        long earliestTimestamp = 
snapshotManager.earliestSnapshot().timeMillis();
+        long latestTimestamp = snapshotManager.latestSnapshot().timeMillis();
+
+        // same tag
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, 
"tag1,tag1"))).isEmpty();
+
+        // same snapshot
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,1"))).isEmpty();
+
+        // same timestamp
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP, 
"2025-01-01,2025-01-01")))
+                .isEmpty();
+
+        // startTimestamp > latestSnapshot.timeMillis()
+        assertThat(
+                        read(
+                                table,
+                                Pair.of(
+                                        INCREMENTAL_BETWEEN_TIMESTAMP,
+                                        String.format(
+                                                "%s,%s",
+                                                latestTimestamp + 1, 
latestTimestamp + 2))))
+                .isEmpty();
+
+        // endTimestamp < earliestSnapshot.timeMillis()
+        assertThat(
+                        read(
+                                table,
+                                Pair.of(
+                                        INCREMENTAL_BETWEEN_TIMESTAMP,
+                                        String.format(
+                                                "%s,%s",
+                                                earliestTimestamp - 2, 
earliestTimestamp - 1))))
+                .isEmpty();
+    }
+
     private static long utcMills(String timestamp) {
         return 
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 100b80c93f..b19d686577 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -110,25 +110,14 @@ public class IncrementalDeltaStartingScannerTest extends 
ScannerTestBase {
         assertThatNoException()
                 .isThrownBy(
                         () ->
-                                new IncrementalDeltaStartingScanner(
-                                                snapshotManager, 0, 4, 
ScanMode.DELTA)
+                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                                                0, 4, snapshotManager, 
ScanMode.DELTA)
                                         .scan(snapshotReader));
 
-        // Starting snapshotId must less than ending snapshotId.
         assertThatThrownBy(
                         () ->
-                                new IncrementalDeltaStartingScanner(
-                                                snapshotManager, 4, 3, 
ScanMode.DELTA)
-                                        .scan(snapshotReader))
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalArgumentException.class,
-                                "Starting snapshotId 4 must less than ending 
snapshotId 3."));
-
-        assertThatThrownBy(
-                        () ->
-                                new IncrementalDeltaStartingScanner(
-                                                snapshotManager, 1, 5, 
ScanMode.DELTA)
+                                
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+                                                1, 5, snapshotManager, 
ScanMode.DELTA)
                                         .scan(snapshotReader))
                 .satisfies(
                         anyCauseMatches(

Reply via email to