This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ecba53fd [core] Introduce incremental-to-auto-tag for reading 
changes between auto tags and let incremental-between be tag-first (#4808)
f4ecba53fd is described below

commit f4ecba53fd2ed433485a5938e0ac9f93b0a88145
Author: yuzelin <[email protected]>
AuthorDate: Mon Jan 6 22:12:25 2025 +0800

    [core] Introduce incremental-to-auto-tag for reading changes between auto 
tags and let incremental-between be tag-first (#4808)
---
 docs/content/flink/sql-query.md                    |  17 +++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  14 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  22 +++-
 .../paimon/table/source/AbstractDataTableScan.java | 129 ++++++++++++++-------
 .../snapshot/EmptyResultStartingScanner.java       |  34 ++++++
 .../snapshot/IncrementalTagStartingScanner.java    |  83 +++++++++++--
 .../org/apache/paimon/tag/TagPeriodHandler.java    |   7 ++
 .../apache/paimon/table/IncrementalTableTest.java  |  94 +++++++++++++++
 9 files changed, 347 insertions(+), 59 deletions(-)

diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md
index 89136b0b06..6cba9f930f 100644
--- a/docs/content/flink/sql-query.md
+++ b/docs/content/flink/sql-query.md
@@ -105,6 +105,23 @@ If you want see `DELETE` records, you can use audit_log 
table:
 SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
 ```
 
+### Batch Incremental between Auto-created Tags
+
+You can use `incremental-between` to query incremental changes between two 
tags. But for auto-created tag, the tag may
+not be created in-time because of data delay.
+
+For example, assume that tags '2024-12-01', '2024-12-02' and '2024-12-04' are 
auto created daily. Data for 12/03 are delayed
+and ingested with data for 12/04. Now if you want to query the incremental 
changes between tags, and you don't know the tag 
+of 12/03 is not created, you will use `incremental-between` with 
'2024-12-01,2024-12-02', '2024-12-02,2024-12-03' and 
+'2024-12-03,2024-12-04' respectively, then you will get an error that the tag 
'2024-12-03' doesn't exist.
+
+We introduced a new option `incremental-to-auto-tag` for this scenario. You 
can only specify the end tag, and Paimon will 
+find an earlier tag and return changes between them. If the tag doesn't exist 
or the earlier tag doesn't exist, return empty. 
+
+For example, when you query 'incremental-to-auto-tag=2024-12-01' or 
'incremental-to-auto-tag=2024-12-03', the result is 
+empty; Query 'incremental-to-auto-tag=2024-12-02', the result is change 
between 12/01 and 12/02; Query 'incremental-to-auto-tag=2024-12-04', 
+the result is change between 12/02 and 12/04.
+
 ## Streaming Query
 
 By default, Streaming read produces the latest snapshot on the table upon 
first startup,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 5e9a8139dd..003d5ba4c2 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -392,6 +392,12 @@ under the License.
             <td>String</td>
             <td>Read incremental changes between start timestamp (exclusive) 
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and 
timestamp t2.</td>
         </tr>
+        <tr>
+            <td><h5>incremental-to-auto-tag</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Used to specify the auto-created tag to reading incremental 
changes.</td>
+        </tr>
         <tr>
             <td><h5>local-merge-buffer-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index eca39855b5..0beea0a8f2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1107,6 +1107,13 @@ public class CoreOptions implements Serializable {
                             "Read incremental changes between start timestamp 
(exclusive) and end timestamp, "
                                     + "for example, 't1,t2' means changes 
between timestamp t1 and timestamp t2.");
 
+    public static final ConfigOption<String> INCREMENTAL_TO_AUTO_TAG =
+            key("incremental-to-auto-tag")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Used to specify the auto-created tag to reading 
incremental changes.");
+
     public static final ConfigOption<Boolean> END_INPUT_CHECK_PARTITION_EXPIRE 
=
             key("end-input.check-partition-expire")
                     .booleanType()
@@ -2157,6 +2164,10 @@ public class CoreOptions implements Serializable {
         return options.get(INCREMENTAL_BETWEEN_SCAN_MODE);
     }
 
+    public String incrementalToAutoTag() {
+        return options.get(INCREMENTAL_TO_AUTO_TAG);
+    }
+
     public Integer scanManifestParallelism() {
         return options.get(SCAN_MANIFEST_PARALLELISM);
     }
@@ -2808,7 +2819,8 @@ public class CoreOptions implements Serializable {
         }
 
         if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
-                        || options.contains(INCREMENTAL_BETWEEN))
+                        || options.contains(INCREMENTAL_BETWEEN)
+                        || options.contains(INCREMENTAL_TO_AUTO_TAG))
                 && !options.contains(SCAN_MODE)) {
             options.set(SCAN_MODE, StartupMode.INCREMENTAL);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index f0f9284ed7..3336d1e5c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -62,6 +62,7 @@ import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
 import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
@@ -272,7 +273,8 @@ public class SchemaValidation {
                             SCAN_FILE_CREATION_TIME_MILLIS,
                             SCAN_TAG_NAME,
                             INCREMENTAL_BETWEEN_TIMESTAMP,
-                            INCREMENTAL_BETWEEN),
+                            INCREMENTAL_BETWEEN,
+                            INCREMENTAL_TO_AUTO_TAG),
                     Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT) {
             checkExactOneOptionExistInMode(
@@ -288,14 +290,16 @@ public class SchemaValidation {
                             SCAN_TIMESTAMP,
                             SCAN_FILE_CREATION_TIME_MILLIS,
                             INCREMENTAL_BETWEEN_TIMESTAMP,
-                            INCREMENTAL_BETWEEN),
+                            INCREMENTAL_BETWEEN,
+                            INCREMENTAL_TO_AUTO_TAG),
                     Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.INCREMENTAL) {
             checkExactOneOptionExistInMode(
                     options,
                     options.startupMode(),
                     INCREMENTAL_BETWEEN,
-                    INCREMENTAL_BETWEEN_TIMESTAMP);
+                    INCREMENTAL_BETWEEN_TIMESTAMP,
+                    INCREMENTAL_TO_AUTO_TAG);
             checkOptionsConflict(
                     options,
                     Arrays.asList(
@@ -304,7 +308,10 @@ public class SchemaValidation {
                             SCAN_FILE_CREATION_TIME_MILLIS,
                             SCAN_TIMESTAMP,
                             SCAN_TAG_NAME),
-                    Arrays.asList(INCREMENTAL_BETWEEN, 
INCREMENTAL_BETWEEN_TIMESTAMP));
+                    Arrays.asList(
+                            INCREMENTAL_BETWEEN,
+                            INCREMENTAL_BETWEEN_TIMESTAMP,
+                            INCREMENTAL_TO_AUTO_TAG));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
             checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, 
options.startupMode());
             checkOptionsConflict(
@@ -315,7 +322,8 @@ public class SchemaValidation {
                             SCAN_FILE_CREATION_TIME_MILLIS,
                             SCAN_TAG_NAME,
                             INCREMENTAL_BETWEEN_TIMESTAMP,
-                            INCREMENTAL_BETWEEN),
+                            INCREMENTAL_BETWEEN,
+                            INCREMENTAL_TO_AUTO_TAG),
                     Collections.singletonList(SCAN_SNAPSHOT_ID));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) {
             checkOptionExistInMode(
@@ -329,7 +337,8 @@ public class SchemaValidation {
                             SCAN_TIMESTAMP_MILLIS,
                             SCAN_TAG_NAME,
                             INCREMENTAL_BETWEEN_TIMESTAMP,
-                            INCREMENTAL_BETWEEN),
+                            INCREMENTAL_BETWEEN,
+                            INCREMENTAL_TO_AUTO_TAG),
                     Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS));
         } else {
             checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, 
options.startupMode());
@@ -341,6 +350,7 @@ public class SchemaValidation {
             checkOptionNotExistInMode(
                     options, INCREMENTAL_BETWEEN_TIMESTAMP, 
options.startupMode());
             checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, 
options.startupMode());
+            checkOptionNotExistInMode(options, INCREMENTAL_TO_AUTO_TAG, 
options.startupMode());
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 24c6943f54..a5810bfc24 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -20,12 +20,14 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
@@ -44,21 +46,29 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
 import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
+import org.apache.paimon.tag.Tag;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
 public abstract class AbstractDataTableScan implements DataTableScan {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDataTableScan.class);
+
     private final CoreOptions options;
     protected final SnapshotReader snapshotReader;
 
@@ -199,50 +209,87 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                         : new 
StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
             case INCREMENTAL:
                 checkArgument(!isStreaming, "Cannot read incremental in 
streaming mode.");
-                Pair<String, String> incrementalBetween = 
options.incrementalBetween();
-                CoreOptions.IncrementalBetweenScanMode scanType =
-                        options.incrementalBetweenScanMode();
-                ScanMode scanMode;
-                switch (scanType) {
-                    case AUTO:
-                        scanMode =
-                                options.changelogProducer() == 
ChangelogProducer.NONE
-                                        ? ScanMode.DELTA
-                                        : ScanMode.CHANGELOG;
-                        break;
-                    case DELTA:
-                        scanMode = ScanMode.DELTA;
-                        break;
-                    case CHANGELOG:
-                        scanMode = ScanMode.CHANGELOG;
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown incremental scan type " + 
scanType.name());
-                }
-                if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) 
!= null) {
-                    try {
-                        return new IncrementalStartingScanner(
-                                snapshotManager,
-                                Long.parseLong(incrementalBetween.getLeft()),
-                                Long.parseLong(incrementalBetween.getRight()),
-                                scanMode);
-                    } catch (NumberFormatException e) {
-                        return new IncrementalTagStartingScanner(
-                                snapshotManager,
-                                incrementalBetween.getLeft(),
-                                incrementalBetween.getRight());
-                    }
-                } else {
-                    return new IncrementalTimeStampStartingScanner(
-                            snapshotManager,
-                            Long.parseLong(incrementalBetween.getLeft()),
-                            Long.parseLong(incrementalBetween.getRight()),
-                            scanMode);
-                }
+                return createIncrementalStartingScanner(snapshotManager);
             default:
                 throw new UnsupportedOperationException(
                         "Unknown startup mode " + startupMode.name());
         }
     }
+
+    private StartingScanner createIncrementalStartingScanner(SnapshotManager 
snapshotManager) {
+        CoreOptions.IncrementalBetweenScanMode scanType = 
options.incrementalBetweenScanMode();
+        ScanMode scanMode;
+        switch (scanType) {
+            case AUTO:
+                scanMode =
+                        options.changelogProducer() == ChangelogProducer.NONE
+                                ? ScanMode.DELTA
+                                : ScanMode.CHANGELOG;
+                break;
+            case DELTA:
+                scanMode = ScanMode.DELTA;
+                break;
+            case CHANGELOG:
+                scanMode = ScanMode.CHANGELOG;
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown incremental scan type " + scanType.name());
+        }
+
+        Options conf = options.toConfiguration();
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+        if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
+            Pair<String, String> incrementalBetween = 
options.incrementalBetween();
+            Optional<Tag> startTag = 
tagManager.get(incrementalBetween.getLeft());
+            Optional<Tag> endTag = 
tagManager.get(incrementalBetween.getRight());
+            if (startTag.isPresent() && endTag.isPresent()) {
+                Snapshot start = startTag.get().trimToSnapshot();
+                Snapshot end = endTag.get().trimToSnapshot();
+
+                LOG.info(
+                        "{} start and end are parsed to tag with snapshot id 
{} to {}.",
+                        INCREMENTAL_BETWEEN.key(),
+                        start.id(),
+                        end.id());
+
+                if (end.id() <= start.id()) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Tag end %s with snapshot id %s should be 
larger than tag start %s with snapshot id %s",
+                                    incrementalBetween.getRight(),
+                                    end.id(),
+                                    incrementalBetween.getLeft(),
+                                    start.id()));
+                }
+                return new IncrementalTagStartingScanner(snapshotManager, 
start, end);
+            } else {
+                long startId, endId;
+                try {
+                    startId = Long.parseLong(incrementalBetween.getLeft());
+                    endId = Long.parseLong(incrementalBetween.getRight());
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Didn't find two tags for start '%s' and 
end '%s', and they are not two snapshot Ids. "
+                                            + "Please set two tags or two 
snapshot Ids.",
+                                    incrementalBetween.getLeft(), 
incrementalBetween.getRight()));
+                }
+                return new IncrementalStartingScanner(snapshotManager, 
startId, endId, scanMode);
+            }
+        } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
+            Pair<String, String> incrementalBetween = 
options.incrementalBetween();
+            return new IncrementalTimeStampStartingScanner(
+                    snapshotManager,
+                    Long.parseLong(incrementalBetween.getLeft()),
+                    Long.parseLong(incrementalBetween.getRight()),
+                    scanMode);
+        } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
+            String endTag = options.incrementalToAutoTag();
+            return IncrementalTagStartingScanner.create(snapshotManager, 
endTag, options);
+        } else {
+            throw new UnsupportedOperationException("Unknown incremental read 
mode.");
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
new file mode 100644
index 0000000000..fc38e272d5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.utils.SnapshotManager;
+
+/** This scanner always return an empty result. */
+public class EmptyResultStartingScanner extends AbstractStartingScanner {
+
+    EmptyResultStartingScanner(SnapshotManager snapshotManager) {
+        super(snapshotManager);
+    }
+
+    @Override
+    public Result scan(SnapshotReader snapshotReader) {
+        return new NoSnapshot();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index e08ac9f44c..55212b1c9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -18,10 +18,21 @@
 
 package org.apache.paimon.table.source.snapshot;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.tag.Tag;
+import org.apache.paimon.tag.TagPeriodHandler;
+import org.apache.paimon.tag.TagTimeExtractor;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import java.time.LocalDateTime;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
 /** {@link StartingScanner} for incremental changes by tag. */
 public class IncrementalTagStartingScanner extends AbstractStartingScanner {
 
@@ -29,18 +40,10 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
     private final Snapshot end;
 
     public IncrementalTagStartingScanner(
-            SnapshotManager snapshotManager, String startTagName, String 
endTagName) {
+            SnapshotManager snapshotManager, Snapshot start, Snapshot end) {
         super(snapshotManager);
-        TagManager tagManager =
-                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
-        start = tagManager.getOrThrow(startTagName).trimToSnapshot();
-        end = tagManager.getOrThrow(endTagName).trimToSnapshot();
-        if (end.id() <= start.id()) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Tag end %s with snapshot id %s should be larger 
than tag start %s with snapshot id %s",
-                            endTagName, end.id(), startTagName, start.id()));
-        }
+        this.start = start;
+        this.end = end;
         this.startingSnapshotId = start.id();
     }
 
@@ -48,4 +51,62 @@ public class IncrementalTagStartingScanner extends 
AbstractStartingScanner {
     public Result scan(SnapshotReader reader) {
         return 
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
     }
+
+    public static AbstractStartingScanner create(
+            SnapshotManager snapshotManager, String endTagName, CoreOptions 
options) {
+        TagTimeExtractor extractor = 
TagTimeExtractor.createForAutoTag(options);
+        checkNotNull(
+                extractor,
+                "Table's tag creation mode doesn't support '%s' scan mode.",
+                CoreOptions.INCREMENTAL_TO_AUTO_TAG);
+        TagPeriodHandler periodHandler = TagPeriodHandler.create(options);
+        checkArgument(
+                periodHandler.isAutoTag(endTagName),
+                "Specified tag '%s' is not an auto-created tag.",
+                endTagName);
+
+        TagManager tagManager =
+                new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+
+        Optional<Tag> endTag = tagManager.get(endTagName);
+        if (!endTag.isPresent()) {
+            return new EmptyResultStartingScanner(snapshotManager);
+        }
+
+        Snapshot end = endTag.get().trimToSnapshot();
+
+        Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+        checkState(earliestSnapshot != null, "No tags can be found.");
+
+        LocalDateTime earliestTime =
+                extractor
+                        .extract(earliestSnapshot.timeMillis(), 
earliestSnapshot.watermark())
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "Cannot get valid tag time 
from the earliest snapshot."));
+        LocalDateTime earliestTagTime = 
periodHandler.normalizeToPreviousTag(earliestTime);
+
+        LocalDateTime endTagTime = periodHandler.tagToTime(endTagName);
+        LocalDateTime previousTagTime = 
periodHandler.previousTagTime(endTagTime);
+
+        Snapshot start = null;
+        while (previousTagTime.isAfter(earliestTagTime)
+                || previousTagTime.isEqual(earliestTagTime)) {
+            String previousTagName = periodHandler.timeToTag(previousTagTime);
+            Optional<Tag> previousTag = tagManager.get(previousTagName);
+            if (previousTag.isPresent()) {
+                start = previousTag.get().trimToSnapshot();
+                break;
+            } else {
+                previousTagTime = 
periodHandler.previousTagTime(previousTagTime);
+            }
+        }
+
+        if (start == null) {
+            return new EmptyResultStartingScanner(snapshotManager);
+        }
+
+        return new IncrementalTagStartingScanner(snapshotManager, start, end);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index c0fbe718c8..4934e26b22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -88,6 +88,8 @@ public interface TagPeriodHandler {
 
     LocalDateTime nextTagTime(LocalDateTime time);
 
+    LocalDateTime previousTagTime(LocalDateTime time);
+
     boolean isAutoTag(String tagName);
 
     /** Base implementation of {@link TagPeriodHandler}. */
@@ -127,6 +129,11 @@ public interface TagPeriodHandler {
             return time.plus(onePeriod());
         }
 
+        @Override
+        public LocalDateTime previousTagTime(LocalDateTime time) {
+            return time.minus(onePeriod());
+        }
+
         @Override
         public boolean isAutoTag(String tagName) {
             try {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 43fbe2b646..0214f0d99e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -20,18 +20,28 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.TagManager;
 
 import org.junit.jupiter.api.Test;
 
+import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
 import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -301,4 +311,88 @@ public class IncrementalTableTest extends TableTestBase {
         assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")))
                 .containsExactlyInAnyOrder(GenericRow.of(1, 1, 2));
     }
+
+    @Test
+    public void testIncrementalToTagFirst() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .option("bucket", "1")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        write(table, GenericRow.of(1, BinaryString.fromString("a")));
+        write(table, GenericRow.of(2, BinaryString.fromString("b")));
+        write(table, GenericRow.of(3, BinaryString.fromString("c")));
+
+        table.createTag("1", 1);
+        table.createTag("3", 2);
+
+        assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,3")))
+                .containsExactlyInAnyOrder(GenericRow.of(2, 
BinaryString.fromString("b")));
+    }
+
+    @Test
+    public void testIncrementalToAutoTag() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .option("bucket", "1")
+                        .option("tag.automatic-creation", "watermark")
+                        .option("tag.creation-period", "daily")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+        TableWriteImpl<?> write = table.newWrite(commitUser);
+        TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
+        TagManager tagManager = table.tagManager();
+
+        write.write(GenericRow.of(1, BinaryString.fromString("a")));
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 0);
+        commit.commit(
+                new ManifestCommittable(
+                        0,
+                        utcMills("2024-12-02T10:00:00"),
+                        Collections.emptyMap(),
+                        commitMessages));
+
+        write.write(GenericRow.of(2, BinaryString.fromString("b")));
+        commitMessages = write.prepareCommit(false, 1);
+        commit.commit(
+                new ManifestCommittable(
+                        1,
+                        utcMills("2024-12-03T10:00:00"),
+                        Collections.emptyMap(),
+                        commitMessages));
+
+        write.write(GenericRow.of(3, BinaryString.fromString("c")));
+        commitMessages = write.prepareCommit(false, 2);
+        commit.commit(
+                new ManifestCommittable(
+                        2,
+                        utcMills("2024-12-05T10:00:00"),
+                        Collections.emptyMap(),
+                        commitMessages));
+
+        assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", 
"2024-12-02", "2024-12-04");
+
+        assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, 
"2024-12-01"))).isEmpty();
+        assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02")))
+                .containsExactly(GenericRow.of(2, 
BinaryString.fromString("b")));
+        assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, 
"2024-12-03"))).isEmpty();
+        assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-04")))
+                .containsExactly(GenericRow.of(3, 
BinaryString.fromString("c")));
+    }
+
+    private static long utcMills(String timestamp) {
+        return 
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
+    }
 }

Reply via email to