This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 75166c2cb1 [core] Support diff scan mode in incremental query (#5321)
75166c2cb1 is described below

commit 75166c2cb1c7a506f287717ba3505018a3689aa1
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 21 19:06:48 2025 +0800

    [core] Support diff scan mode in incremental query (#5321)
---
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  23 +---
 .../paimon/table/source/AbstractDataTableScan.java | 123 ++++++++++++---------
 .../snapshot/EmptyResultStartingScanner.java       |   2 +-
 ...r.java => IncrementalDeltaStartingScanner.java} |  37 +++++--
 ...er.java => IncrementalDiffStartingScanner.java} |  68 ++++++++++--
 .../IncrementalTimeStampStartingScanner.java       |  76 -------------
 .../table/source/snapshot/TimeTravelUtil.java      |   4 +-
 ...va => IncrementalDeltaStartingScannerTest.java} |  10 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  40 ++++++-
 10 files changed, 208 insertions(+), 177 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 746d83efbb..15d2ce819d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -408,7 +408,7 @@ under the License.
             <td><h5>incremental-between-scan-mode</h5></td>
             <td style="word-wrap: break-word;">auto</td>
             <td><p>Enum</p></td>
-            <td>Scan kind when Read incremental changes between start snapshot 
(exclusive) and end snapshot (inclusive). <br /><br />Possible 
values:<ul><li>"auto": Scan changelog files for the table which produces 
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan 
newly changed files between snapshots.</li><li>"changelog": Scan changelog 
files between snapshots.</li></ul></td>
+            <td>Scan kind when Read incremental changes between start snapshot 
(exclusive) and end snapshot (inclusive). <br /><br />Possible 
values:<ul><li>"auto": Scan changelog files for the table which produces 
changelog files. Otherwise, scan newly changed files.</li><li>"delta": Scan 
newly changed files between snapshots.</li><li>"changelog": Scan changelog 
files between snapshots.</li><li>"diff": Get diff by comparing data of end 
snapshot with data of start snapshot.</li></ul></td>
         </tr>
         <tr>
             <td><h5>incremental-between-timestamp</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0b3e24eb5d..e250293446 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2925,7 +2925,8 @@ public class CoreOptions implements Serializable {
                 "auto",
                 "Scan changelog files for the table which produces changelog 
files. Otherwise, scan newly changed files."),
         DELTA("delta", "Scan newly changed files between snapshots."),
-        CHANGELOG("changelog", "Scan changelog files between snapshots.");
+        CHANGELOG("changelog", "Scan changelog files between snapshots."),
+        DIFF("diff", "Get diff by comparing data of end snapshot with data of 
start snapshot.");
 
         private final String value;
         private final String description;
@@ -2944,26 +2945,6 @@ public class CoreOptions implements Serializable {
         public InlineElement getDescription() {
             return text(description);
         }
-
-        public String getValue() {
-            return value;
-        }
-
-        @VisibleForTesting
-        public static IncrementalBetweenScanMode fromValue(String value) {
-            for (IncrementalBetweenScanMode formatType : 
IncrementalBetweenScanMode.values()) {
-                if (formatType.value.equals(value)) {
-                    return formatType;
-                }
-            }
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Invalid format type %s, only support [%s]",
-                            value,
-                            StringUtils.join(
-                                    
Arrays.stream(IncrementalBetweenScanMode.values()).iterator(),
-                                    ",")));
-        }
     }
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 83c2ea5c72..37cd0e378f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -34,12 +34,12 @@ import 
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartin
 import 
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner;
 import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
+import org.apache.paimon.table.source.snapshot.EmptyResultStartingScanner;
 import org.apache.paimon.table.source.snapshot.FileCreationTimeStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
-import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
-import org.apache.paimon.table.source.snapshot.IncrementalTagStartingScanner;
-import 
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalDeltaStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalDiffStartingScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
@@ -61,7 +61,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DIFF;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
@@ -228,56 +228,22 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
     }
 
     private StartingScanner createIncrementalStartingScanner(SnapshotManager 
snapshotManager) {
-        CoreOptions.IncrementalBetweenScanMode scanType = 
options.incrementalBetweenScanMode();
-        ScanMode scanMode;
-        switch (scanType) {
-            case AUTO:
-                scanMode =
-                        options.changelogProducer() == ChangelogProducer.NONE
-                                ? ScanMode.DELTA
-                                : ScanMode.CHANGELOG;
-                break;
-            case DELTA:
-                scanMode = ScanMode.DELTA;
-                break;
-            case CHANGELOG:
-                scanMode = ScanMode.CHANGELOG;
-                break;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unknown incremental scan type " + scanType.name());
-        }
-
         Options conf = options.toConfiguration();
-        TagManager tagManager =
-                new TagManager(
-                        snapshotManager.fileIO(),
-                        snapshotManager.tablePath(),
-                        snapshotManager.branch());
+
         if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
             Pair<String, String> incrementalBetween = 
options.incrementalBetween();
+
+            TagManager tagManager =
+                    new TagManager(
+                            snapshotManager.fileIO(),
+                            snapshotManager.tablePath(),
+                            snapshotManager.branch());
             Optional<Tag> startTag = 
tagManager.get(incrementalBetween.getLeft());
             Optional<Tag> endTag = 
tagManager.get(incrementalBetween.getRight());
-            if (startTag.isPresent() && endTag.isPresent()) {
-                Snapshot start = startTag.get().trimToSnapshot();
-                Snapshot end = endTag.get().trimToSnapshot();
 
-                LOG.info(
-                        "{} start and end are parsed to tag with snapshot id 
{} to {}.",
-                        INCREMENTAL_BETWEEN.key(),
-                        start.id(),
-                        end.id());
-
-                if (end.id() <= start.id()) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "Tag end %s with snapshot id %s should be 
larger than tag start %s with snapshot id %s",
-                                    incrementalBetween.getRight(),
-                                    end.id(),
-                                    incrementalBetween.getLeft(),
-                                    start.id()));
-                }
-                return new IncrementalTagStartingScanner(snapshotManager, 
start, end);
+            if (startTag.isPresent() && endTag.isPresent()) {
+                return IncrementalDiffStartingScanner.betweenTags(
+                        startTag.get(), endTag.get(), snapshotManager, 
incrementalBetween);
             } else {
                 long startId, endId;
                 try {
@@ -290,20 +256,67 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
                                             + "Please set two tags or two 
snapshot Ids.",
                                     incrementalBetween.getLeft(), 
incrementalBetween.getRight()));
                 }
-                return new IncrementalStartingScanner(snapshotManager, 
startId, endId, scanMode);
+
+                CoreOptions.IncrementalBetweenScanMode scanMode =
+                        options.incrementalBetweenScanMode();
+                return scanMode == DIFF
+                        ? IncrementalDiffStartingScanner.betweenSnapshotIds(
+                                startId, endId, snapshotManager)
+                        : new IncrementalDeltaStartingScanner(
+                                snapshotManager, startId, endId, 
toSnapshotScanMode(scanMode));
             }
         } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
             Pair<Long, Long> incrementalBetween = 
options.incrementalBetweenTimestamp();
-            return new IncrementalTimeStampStartingScanner(
-                    snapshotManager,
-                    incrementalBetween.getLeft(),
-                    incrementalBetween.getRight(),
-                    scanMode);
+
+            Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+            Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+            if (earliestSnapshot == null || latestSnapshot == null) {
+                return new EmptyResultStartingScanner(snapshotManager);
+            }
+
+            long startTimestamp = incrementalBetween.getLeft();
+            long endTimestamp = incrementalBetween.getRight();
+            checkArgument(
+                    endTimestamp > startTimestamp,
+                    "Ending timestamp %s should be larger than starting 
timestamp %s.",
+                    endTimestamp,
+                    startTimestamp);
+            if (startTimestamp > latestSnapshot.timeMillis()
+                    || endTimestamp < earliestSnapshot.timeMillis()) {
+                return new EmptyResultStartingScanner(snapshotManager);
+            }
+
+            CoreOptions.IncrementalBetweenScanMode scanMode = 
options.incrementalBetweenScanMode();
+
+            return scanMode == DIFF
+                    ? IncrementalDiffStartingScanner.betweenTimestamps(
+                            startTimestamp, endTimestamp, snapshotManager)
+                    : IncrementalDeltaStartingScanner.betweenTimestamps(
+                            startTimestamp,
+                            endTimestamp,
+                            snapshotManager,
+                            toSnapshotScanMode(scanMode));
         } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
             String endTag = options.incrementalToAutoTag();
-            return IncrementalTagStartingScanner.create(snapshotManager, 
endTag, options);
+            return 
IncrementalDiffStartingScanner.toEndAutoTag(snapshotManager, endTag, options);
         } else {
             throw new UnsupportedOperationException("Unknown incremental read 
mode.");
         }
     }
+
+    private ScanMode toSnapshotScanMode(CoreOptions.IncrementalBetweenScanMode 
scanMode) {
+        switch (scanMode) {
+            case AUTO:
+                return options.changelogProducer() == ChangelogProducer.NONE
+                        ? ScanMode.DELTA
+                        : ScanMode.CHANGELOG;
+            case DELTA:
+                return ScanMode.DELTA;
+            case CHANGELOG:
+                return ScanMode.CHANGELOG;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported incremental scan mode " + 
scanMode.name());
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
index fc38e272d5..eb7c5c0ef2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
@@ -23,7 +23,7 @@ import org.apache.paimon.utils.SnapshotManager;
 /** This scanner always return an empty result. */
 public class EmptyResultStartingScanner extends AbstractStartingScanner {
 
-    EmptyResultStartingScanner(SnapshotManager snapshotManager) {
+    public EmptyResultStartingScanner(SnapshotManager snapshotManager) {
         super(snapshotManager);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
similarity index 84%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 9bfb54f2cf..6f18dde393 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
@@ -52,15 +51,18 @@ import java.util.stream.LongStream;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** {@link StartingScanner} for incremental changes by snapshot. */
-public class IncrementalStartingScanner extends AbstractStartingScanner {
+/**
+ * Get incremental data by reading delta or changelog files from snapshots 
between start and end.
+ */
+public class IncrementalDeltaStartingScanner extends AbstractStartingScanner {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalStartingScanner.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(IncrementalDeltaStartingScanner.class);
 
     private final long endingSnapshotId;
     private final ScanMode scanMode;
 
-    public IncrementalStartingScanner(
+    public IncrementalDeltaStartingScanner(
             SnapshotManager snapshotManager, long start, long end, ScanMode 
scanMode) {
         super(snapshotManager);
         this.startingSnapshotId = start;
@@ -89,13 +91,13 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                             Snapshot snapshot = snapshotManager.snapshot(id);
                             switch (scanMode) {
                                 case DELTA:
-                                    if (snapshot.commitKind() != 
CommitKind.APPEND) {
+                                    if (snapshot.commitKind() != 
Snapshot.CommitKind.APPEND) {
                                         // ignore COMPACT and OVERWRITE
                                         return Collections.emptyList();
                                     }
                                     break;
                                 case CHANGELOG:
-                                    if (snapshot.commitKind() == 
CommitKind.OVERWRITE) {
+                                    if (snapshot.commitKind() == 
Snapshot.CommitKind.OVERWRITE) {
                                         // ignore OVERWRITE
                                         return Collections.emptyList();
                                     }
@@ -156,7 +158,7 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
      *
      * @return If the check passes return empty.
      */
-    public Optional<Result> checkScanSnapshotIdValidity() {
+    private Optional<Result> checkScanSnapshotIdValidity() {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
@@ -185,4 +187,23 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
 
         return Optional.empty();
     }
+
+    public static IncrementalDeltaStartingScanner betweenTimestamps(
+            long startTimestamp,
+            long endTimestamp,
+            SnapshotManager snapshotManager,
+            ScanMode scanMode) {
+        Snapshot startingSnapshot = 
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+        Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+        // if earliestSnapShot.timeMillis() > startTimestamp we should include 
the earliestSnapShot
+        long startId =
+                (startingSnapshot == null || earliestSnapshot.timeMillis() > 
startTimestamp)
+                        ? earliestSnapshot.id() - 1
+                        : startingSnapshot.id();
+
+        Snapshot endSnapshot = 
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
+        long endId = endSnapshot == null ? 
snapshotManager.latestSnapshot().id() : endSnapshot.id();
+
+        return new IncrementalDeltaStartingScanner(snapshotManager, startId, 
endId, scanMode);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
similarity index 60%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 388b36fb28..942ee23ffa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -35,24 +35,25 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** {@link StartingScanner} for incremental changes by tag. */
-public class IncrementalTagStartingScanner extends AbstractStartingScanner {
+/** Get incremental data by {@link SnapshotReader#readIncrementalDiff}. */
+public class IncrementalDiffStartingScanner extends AbstractStartingScanner {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalTagStartingScanner.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalDiffStartingScanner.class);
 
     private final Snapshot start;
     private final Snapshot end;
 
-    public IncrementalTagStartingScanner(
+    public IncrementalDiffStartingScanner(
             SnapshotManager snapshotManager, Snapshot start, Snapshot end) {
         super(snapshotManager);
         this.start = start;
         this.end = end;
         this.startingSnapshotId = start.id();
 
-        TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery(
+        TimeTravelUtil.checkRescaleBucketForIncrementalDiffQuery(
                 new SchemaManager(
                         snapshotManager.fileIO(),
                         snapshotManager.tablePath(),
@@ -66,7 +67,60 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
         return 
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
     }
 
-    public static AbstractStartingScanner create(
+    public static IncrementalDiffStartingScanner betweenTags(
+            Tag startTag,
+            Tag endTag,
+            SnapshotManager snapshotManager,
+            Pair<String, String> incrementalBetween) {
+        Snapshot start = startTag.trimToSnapshot();
+        Snapshot end = endTag.trimToSnapshot();
+
+        LOG.info(
+                "{} start and end are parsed to tag with snapshot id {} to 
{}.",
+                INCREMENTAL_BETWEEN.key(),
+                start.id(),
+                end.id());
+
+        checkArgument(
+                end.id() > start.id(),
+                "Tag end %s with snapshot id %s should be larger than tag 
start %s with snapshot id %s",
+                incrementalBetween.getRight(),
+                end.id(),
+                incrementalBetween.getLeft(),
+                start.id());
+
+        return new IncrementalDiffStartingScanner(snapshotManager, start, end);
+    }
+
+    public static IncrementalDiffStartingScanner betweenSnapshotIds(
+            long startId, long endId, SnapshotManager snapshotManager) {
+        checkArgument(
+                endId > startId,
+                "Ending snapshotId should be larger than starting snapshotId 
%s.",
+                endId,
+                startId);
+
+        Snapshot start = snapshotManager.snapshot(startId);
+        Snapshot end = snapshotManager.snapshot(endId);
+        return new IncrementalDiffStartingScanner(snapshotManager, start, end);
+    }
+
+    public static IncrementalDiffStartingScanner betweenTimestamps(
+            long startTimestamp, long endTimestamp, SnapshotManager 
snapshotManager) {
+        Snapshot startSnapshot = 
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
+        if (startSnapshot == null) {
+            startSnapshot = snapshotManager.earliestSnapshot();
+        }
+
+        Snapshot endSnapshot = 
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
+        if (endSnapshot == null) {
+            endSnapshot = snapshotManager.latestSnapshot();
+        }
+
+        return new IncrementalDiffStartingScanner(snapshotManager, 
startSnapshot, endSnapshot);
+    }
+
+    public static AbstractStartingScanner toEndAutoTag(
             SnapshotManager snapshotManager, String endTagName, CoreOptions 
options) {
         TagPeriodHandler periodHandler = TagPeriodHandler.create(options);
         checkArgument(
@@ -104,6 +158,6 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
         LOG.info("Found start tag {} .", 
periodHandler.timeToTag(previousTags.get(0).getRight()));
         Snapshot start = previousTags.get(0).getLeft().trimToSnapshot();
 
-        return new IncrementalTagStartingScanner(snapshotManager, start, end);
+        return new IncrementalDiffStartingScanner(snapshotManager, start, end);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
deleted file mode 100644
index 26ece79d08..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.ScanMode;
-import org.apache.paimon.utils.SnapshotManager;
-
-/** {@link StartingScanner} for incremental changes by timestamp. */
-public class IncrementalTimeStampStartingScanner extends 
AbstractStartingScanner {
-
-    private final long startTimestamp;
-    private final long endTimestamp;
-    private final ScanMode scanMode;
-
-    public IncrementalTimeStampStartingScanner(
-            SnapshotManager snapshotManager,
-            long startTimestamp,
-            long endTimestamp,
-            ScanMode scanMode) {
-        super(snapshotManager);
-        this.startTimestamp = startTimestamp;
-        this.endTimestamp = endTimestamp;
-        this.scanMode = scanMode;
-        Snapshot startingSnapshot = 
snapshotManager.earlierOrEqualTimeMills(startTimestamp);
-        if (startingSnapshot != null) {
-            this.startingSnapshotId = startingSnapshot.id();
-        }
-    }
-
-    @Override
-    public Result scan(SnapshotReader reader) {
-        Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
-        if (earliestSnapshot == null) {
-            return new NoSnapshot();
-        }
-
-        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
-        if (startTimestamp > latestSnapshot.timeMillis()
-                || endTimestamp < earliestSnapshot.timeMillis()) {
-            return new NoSnapshot();
-        }
-        // in org.apache.paimon.utils.SnapshotManager.earlierOrEqualTimeMills
-        // 1. if earliestSnapshotId or latestSnapshotId is null 
startingSnapshotId will be null
-        // 2. if earliestSnapShot.timeMillis() > startTimestamp 
startingSnapshotId will be
-        // earliestSnapShotId
-        // if  earliestSnapShot.timeMillis() > startTimestamp we should 
include the earliestSnapShot
-        // data
-        Long startSnapshotId =
-                (startingSnapshotId == null || earliestSnapshot.timeMillis() > 
startTimestamp)
-                        ? earliestSnapshot.id() - 1
-                        : startingSnapshotId;
-        Snapshot endSnapshot = 
snapshotManager.earlierOrEqualTimeMills(endTimestamp);
-        Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : 
endSnapshot.id();
-        IncrementalStartingScanner incrementalStartingScanner =
-                new IncrementalStartingScanner(
-                        snapshotManager, startSnapshotId, endSnapshotId, 
scanMode);
-        return incrementalStartingScanner.scan(reader);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 140309a198..cf6feaa3c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -238,7 +238,7 @@ public class TimeTravelUtil {
         }
     }
 
-    public static void checkRescaleBucketForIncrementalTagQuery(
+    public static void checkRescaleBucketForIncrementalDiffQuery(
             SchemaManager schemaManager, Snapshot start, Snapshot end) {
         if (start.schemaId() != end.schemaId()) {
             int startBucketNumber = bucketNumber(schemaManager, 
start.schemaId());
@@ -248,7 +248,7 @@ public class TimeTravelUtil {
                         start.id(),
                         end.id(),
                         String.format(
-                                "The bucket number of two tags are different 
(%s, %s), which is not supported in incremental tag query.",
+                                "The bucket number of two snapshots are 
different (%s, %s), which is not supported in incremental diff query.",
                                 startBucketNumber, endBucketNumber));
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
similarity index 94%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 2da2f942d2..100b80c93f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -42,8 +42,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link IncrementalStartingScanner}. */
-public class IncrementalStartingScannerTest extends ScannerTestBase {
+/** Tests for {@link IncrementalDeltaStartingScanner}. */
+public class IncrementalDeltaStartingScannerTest extends ScannerTestBase {
 
     @Test
     public void testScan() throws Exception {
@@ -110,14 +110,14 @@ public class IncrementalStartingScannerTest extends 
ScannerTestBase {
         assertThatNoException()
                 .isThrownBy(
                         () ->
-                                new IncrementalStartingScanner(
+                                new IncrementalDeltaStartingScanner(
                                                 snapshotManager, 0, 4, 
ScanMode.DELTA)
                                         .scan(snapshotReader));
 
         // Starting snapshotId must less than ending snapshotId.
         assertThatThrownBy(
                         () ->
-                                new IncrementalStartingScanner(
+                                new IncrementalDeltaStartingScanner(
                                                 snapshotManager, 4, 3, 
ScanMode.DELTA)
                                         .scan(snapshotReader))
                 .satisfies(
@@ -127,7 +127,7 @@ public class IncrementalStartingScannerTest extends 
ScannerTestBase {
 
         assertThatThrownBy(
                         () ->
-                                new IncrementalStartingScanner(
+                                new IncrementalDeltaStartingScanner(
                                                 snapshotManager, 1, 5, 
ScanMode.DELTA)
                                         .scan(snapshotReader))
                 .satisfies(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index b16ef18deb..8d55c0e647 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -30,6 +30,7 @@ import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
@@ -694,7 +695,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                     .satisfies(
                             anyCauseMatches(
                                     
TimeTravelUtil.InconsistentTagBucketException.class,
-                                    "The bucket number of two tags are 
different (1, 2), which is not supported in incremental tag query."));
+                                    "The bucket number of two snapshots are 
different (1, 2), which is not supported in incremental diff query."));
         }
     }
 
@@ -735,4 +736,41 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM T /*+ 
OPTIONS('incremental-between-timestamp'='0,1') */"))
                 .isEmpty();
     }
+
+    @Test
+    public void testIncrementScanMode() throws Exception {
+        sql(
+                "CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, 
v STRING) WITH ('changelog-producer' = 'lookup')");
+
+        // snapshot 1,2
+        sql("INSERT INTO test_scan_mode VALUES (1, 'A')");
+        // snapshot 3,4
+        sql("INSERT INTO test_scan_mode VALUES (2, 'B')");
+
+        // snapshot 5,6
+        String dataId =
+                TestValuesTableFactory.registerData(
+                        Collections.singletonList(Row.ofKind(RowKind.DELETE, 
2, "B")));
+        sEnv.executeSql(
+                "CREATE TEMPORARY TABLE source (id INT, v STRING) "
+                        + "WITH ('connector' = 'values', 'bounded' = 'true', 
'data-id' = '"
+                        + dataId
+                        + "')");
+        sEnv.executeSql("INSERT INTO test_scan_mode SELECT * FROM 
source").await();
+
+        //  snapshot 7,8
+        sql("INSERT INTO test_scan_mode VALUES (3, 'C')");
+
+        List<Row> result =
+                sql(
+                        "SELECT * FROM `test_scan_mode$audit_log` "
+                                + "/*+ 
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='diff') 
*/");
+        assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 3, "C"));
+
+        result =
+                sql(
+                        "SELECT * FROM `test_scan_mode$audit_log` "
+                                + "/*+ 
OPTIONS('incremental-between'='1,8','incremental-between-scan-mode'='delta') 
*/");
+        assertThat(result).containsExactlyInAnyOrder(Row.of("-D", 2, "B"), 
Row.of("+I", 3, "C"));
+    }
 }


Reply via email to