This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6926fc3a7a [core] Support: scan by creation timestamp (#5580)
6926fc3a7a is described below

commit 6926fc3a7a7de4ede02cada8d0ed8e878815cfd7
Author: jerry <[email protected]>
AuthorDate: Thu May 22 10:49:04 2025 +0800

    [core] Support: scan by creation timestamp (#5580)
---
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  22 +++-
 .../paimon/table/source/AbstractDataTableScan.java |  47 +++++++++
 .../CreationTimestampStartingScannerTest.java      | 116 +++++++++++++++++++++
 4 files changed, 191 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b8fdc7b9ac..856245b64f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -809,6 +809,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Long</td>
             <td>End condition "watermark" for bounded streaming mode. Stream 
reading will end when a larger watermark snapshot is encountered.</td>
         </tr>
+        <tr>
+            <td><h5>scan.creation-time-millis</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>Optional timestamp used in case of "from-creation-timestamp" 
scan mode.</td>
+        </tr>
         <tr>
             <td><h5>scan.fallback-branch</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -837,7 +843,7 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
+            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
         </tr>
         <tr>
             <td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6473ca6240..88f8ca7582 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -754,6 +754,13 @@ public class CoreOptions implements Serializable {
                                     + "It is independent of snapshots, but it 
is imprecise filtering (depending on whether "
                                     + "or not compaction occurs).");
 
+    public static final ConfigOption<Long> SCAN_CREATION_TIME_MILLIS =
+            key("scan.creation-time-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of 
\"from-creation-timestamp\" scan mode.");
+
     public static final ConfigOption<Long> SCAN_SNAPSHOT_ID =
             key("scan.snapshot-id")
                     .longType()
@@ -2283,6 +2290,8 @@ public class CoreOptions implements Serializable {
                 return StartupMode.FROM_SNAPSHOT;
             } else if 
(options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
                 return StartupMode.FROM_FILE_CREATION_TIME;
+            } else if 
(options.getOptional(SCAN_CREATION_TIME_MILLIS).isPresent()) {
+                return StartupMode.FROM_CREATION_TIMESTAMP;
             } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()
                     || 
options.getOptional(INCREMENTAL_BETWEEN_TIMESTAMP).isPresent()) {
                 return StartupMode.INCREMENTAL;
@@ -2312,6 +2321,10 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_FILE_CREATION_TIME_MILLIS);
     }
 
+    public Long scanCreationTimeMills() {
+        return options.get(SCAN_CREATION_TIME_MILLIS);
+    }
+
     public Long scanBoundedWatermark() {
         return options.get(SCAN_BOUNDED_WATERMARK);
     }
@@ -2760,9 +2773,16 @@ public class CoreOptions implements Serializable {
                         + "For batch sources, produces a snapshot at timestamp 
specified by \"scan.timestamp-millis\" "
                         + "but does not read new changes."),
 
+        FROM_CREATION_TIMESTAMP(
+                "from-creation-timestamp",
+                "For streaming sources and batch sources, "
+                        + "If timestamp specified by 
\"scan.creation-time-millis\" is during in the range of earliest snapshot and 
latest snapshot: "
+                        + "mode is from-snapshot which snapshot is equal or 
later the timestamp. "
+                        + "If timestamp is earlier than earliest snapshot or 
later than latest snapshot, mode is from-file-creation-time."),
+
         FROM_FILE_CREATION_TIME(
                 "from-file-creation-time",
-                "For streaming and batch sources, produces a snapshot and 
filters the data files by creation time. "
+                "For streaming and batch sources, consumes a snapshot and 
filters the data files by creation time. "
                         + "For streaming sources, upon first startup, and 
continue to read the latest changes."),
 
         FROM_SNAPSHOT(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 42d4562608..7f8714522d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -47,6 +47,7 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
 import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
@@ -231,6 +232,15 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
             case FROM_FILE_CREATION_TIME:
                 Long fileCreationTimeMills = 
options.scanFileCreationTimeMills();
                 return new FileCreationTimeStartingScanner(snapshotManager, 
fileCreationTimeMills);
+            case FROM_CREATION_TIMESTAMP:
+                Long creationTimeMills = options.scanCreationTimeMills();
+                return createCreationTimestampStartingScanner(
+                        snapshotManager,
+                        changelogManager,
+                        creationTimeMills,
+                        options.changelogLifecycleDecoupled(),
+                        isStreaming);
+
             case FROM_SNAPSHOT:
                 if (options.scanSnapshotId() != null) {
                     return isStreaming
@@ -270,6 +280,43 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         }
     }
 
+    public static StartingScanner createCreationTimestampStartingScanner(
+            SnapshotManager snapshotManager,
+            ChangelogManager changelogManager,
+            long creationMillis,
+            boolean changelogDecoupled,
+            boolean isStreaming) {
+        Long startingSnapshotPrevId =
+                TimeTravelUtil.earlierThanTimeMills(
+                        snapshotManager,
+                        changelogManager,
+                        creationMillis,
+                        changelogDecoupled,
+                        true);
+        final StartingScanner scanner;
+        Optional<Long> startingSnapshotId =
+                Optional.ofNullable(startingSnapshotPrevId)
+                        .map(id -> id + 1)
+                        .filter(
+                                id ->
+                                        snapshotManager.snapshotExists(id)
+                                                || 
changelogManager.longLivedChangelogExists(id));
+        if (startingSnapshotId.isPresent()) {
+            scanner =
+                    isStreaming
+                            ? new ContinuousFromSnapshotStartingScanner(
+                                    snapshotManager,
+                                    changelogManager,
+                                    startingSnapshotId.get(),
+                                    changelogDecoupled)
+                            : new StaticFromSnapshotStartingScanner(
+                                    snapshotManager, startingSnapshotId.get());
+        } else {
+            scanner = new FileCreationTimeStartingScanner(snapshotManager, 
creationMillis);
+        }
+        return scanner;
+    }
+
     private StartingScanner createIncrementalStartingScanner(SnapshotManager 
snapshotManager) {
         Options conf = options.toConfiguration();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
new file mode 100644
index 0000000000..d7fee6801c
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.paimon.table.source.DataTableStreamScan.createCreationTimestampStartingScanner;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for createCreationTimestampStartingScanner. */
+public class CreationTimestampStartingScannerTest extends ScannerTestBase {
+
+    @Test
+    public void test() throws Exception {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        write.write(rowData(1, 40, 400L));
+
+        long t1 = System.currentTimeMillis();
+        commit.commit(0, write.prepareCommit(true, 0));
+        long t2 = System.currentTimeMillis();
+
+        write.write(rowData(1, 11, 100L));
+        write.write(rowData(1, 21, 200L));
+        write.write(rowData(1, 41, 400L));
+        commit.commit(0, write.prepareCommit(true, 0));
+        long t3 = System.currentTimeMillis();
+        write.close();
+        commit.close();
+
+        StartingScanner scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t1, false, 
true);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t1, false, 
false);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t2, false, 
true);
+        assertThat(scanner instanceof 
ContinuousFromSnapshotStartingScanner).isTrue();
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t2, false, 
false);
+        assertThat(scanner instanceof 
StaticFromSnapshotStartingScanner).isTrue();
+
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t3, false, 
true);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager, table.changelogManager(), t3, false, 
false);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager,
+                        table.changelogManager(),
+                        snapshotManager.earliestSnapshot().timeMillis(),
+                        true,
+                        true);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager,
+                        table.changelogManager(),
+                        snapshotManager.earliestSnapshot().timeMillis(),
+                        false,
+                        false);
+        assertThat(scanner instanceof 
FileCreationTimeStartingScanner).isTrue();
+
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager,
+                        table.changelogManager(),
+                        snapshotManager.latestSnapshot().timeMillis(),
+                        true,
+                        true);
+        assertThat(scanner instanceof 
ContinuousFromSnapshotStartingScanner).isTrue();
+        scanner =
+                createCreationTimestampStartingScanner(
+                        snapshotManager,
+                        table.changelogManager(),
+                        snapshotManager.latestSnapshot().timeMillis(),
+                        false,
+                        false);
+        assertThat(scanner instanceof 
StaticFromSnapshotStartingScanner).isTrue();
+    }
+}

Reply via email to