This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6926fc3a7a [core] Support: scan by creation timestamp (#5580)
6926fc3a7a is described below
commit 6926fc3a7a7de4ede02cada8d0ed8e878815cfd7
Author: jerry <[email protected]>
AuthorDate: Thu May 22 10:49:04 2025 +0800
[core] Support: scan by creation timestamp (#5580)
---
.../shortcodes/generated/core_configuration.html | 8 +-
.../main/java/org/apache/paimon/CoreOptions.java | 22 +++-
.../paimon/table/source/AbstractDataTableScan.java | 47 +++++++++
.../CreationTimestampStartingScannerTest.java | 116 +++++++++++++++++++++
4 files changed, 191 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b8fdc7b9ac..856245b64f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -809,6 +809,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Long</td>
<td>End condition "watermark" for bounded streaming mode. Stream
reading will end when a larger watermark snapshot is encountered.</td>
</tr>
+ <tr>
+ <td><h5>scan.creation-time-millis</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>Optional timestamp used in case of "from-creation-timestamp"
scan mode.</td>
+ </tr>
<tr>
<td><h5>scan.fallback-branch</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -837,7 +843,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
+ <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6473ca6240..88f8ca7582 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -754,6 +754,13 @@ public class CoreOptions implements Serializable {
+ "It is independent of snapshots, but it
is imprecise filtering (depending on whether "
+ "or not compaction occurs).");
+ public static final ConfigOption<Long> SCAN_CREATION_TIME_MILLIS =
+ key("scan.creation-time-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of
\"from-creation-timestamp\" scan mode.");
+
public static final ConfigOption<Long> SCAN_SNAPSHOT_ID =
key("scan.snapshot-id")
.longType()
@@ -2283,6 +2290,8 @@ public class CoreOptions implements Serializable {
return StartupMode.FROM_SNAPSHOT;
} else if
(options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
return StartupMode.FROM_FILE_CREATION_TIME;
+ } else if
(options.getOptional(SCAN_CREATION_TIME_MILLIS).isPresent()) {
+ return StartupMode.FROM_CREATION_TIMESTAMP;
} else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()
||
options.getOptional(INCREMENTAL_BETWEEN_TIMESTAMP).isPresent()) {
return StartupMode.INCREMENTAL;
@@ -2312,6 +2321,10 @@ public class CoreOptions implements Serializable {
return options.get(SCAN_FILE_CREATION_TIME_MILLIS);
}
+ public Long scanCreationTimeMills() {
+ return options.get(SCAN_CREATION_TIME_MILLIS);
+ }
+
public Long scanBoundedWatermark() {
return options.get(SCAN_BOUNDED_WATERMARK);
}
@@ -2760,9 +2773,16 @@ public class CoreOptions implements Serializable {
+ "For batch sources, produces a snapshot at timestamp
specified by \"scan.timestamp-millis\" "
+ "but does not read new changes."),
+ FROM_CREATION_TIMESTAMP(
+ "from-creation-timestamp",
+ "For streaming sources and batch sources, "
+ + "If timestamp specified by
\"scan.creation-time-millis\" is during in the range of earliest snapshot and
latest snapshot: "
+ + "mode is from-snapshot which snapshot is equal or
later the timestamp. "
+ + "If timestamp is earlier than earliest snapshot or
later than latest snapshot, mode is from-file-creation-time."),
+
FROM_FILE_CREATION_TIME(
"from-file-creation-time",
- "For streaming and batch sources, produces a snapshot and
filters the data files by creation time. "
+ "For streaming and batch sources, consumes a snapshot and
filters the data files by creation time. "
+ "For streaming sources, upon first startup, and
continue to read the latest changes."),
FROM_SNAPSHOT(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 42d4562608..7f8714522d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -47,6 +47,7 @@ import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
+import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
@@ -231,6 +232,15 @@ abstract class AbstractDataTableScan implements
DataTableScan {
case FROM_FILE_CREATION_TIME:
Long fileCreationTimeMills =
options.scanFileCreationTimeMills();
return new FileCreationTimeStartingScanner(snapshotManager,
fileCreationTimeMills);
+ case FROM_CREATION_TIMESTAMP:
+ Long creationTimeMills = options.scanCreationTimeMills();
+ return createCreationTimestampStartingScanner(
+ snapshotManager,
+ changelogManager,
+ creationTimeMills,
+ options.changelogLifecycleDecoupled(),
+ isStreaming);
+
case FROM_SNAPSHOT:
if (options.scanSnapshotId() != null) {
return isStreaming
@@ -270,6 +280,43 @@ abstract class AbstractDataTableScan implements
DataTableScan {
}
}
+ public static StartingScanner createCreationTimestampStartingScanner(
+ SnapshotManager snapshotManager,
+ ChangelogManager changelogManager,
+ long creationMillis,
+ boolean changelogDecoupled,
+ boolean isStreaming) {
+ Long startingSnapshotPrevId =
+ TimeTravelUtil.earlierThanTimeMills(
+ snapshotManager,
+ changelogManager,
+ creationMillis,
+ changelogDecoupled,
+ true);
+ final StartingScanner scanner;
+ Optional<Long> startingSnapshotId =
+ Optional.ofNullable(startingSnapshotPrevId)
+ .map(id -> id + 1)
+ .filter(
+ id ->
+ snapshotManager.snapshotExists(id)
+ ||
changelogManager.longLivedChangelogExists(id));
+ if (startingSnapshotId.isPresent()) {
+ scanner =
+ isStreaming
+ ? new ContinuousFromSnapshotStartingScanner(
+ snapshotManager,
+ changelogManager,
+ startingSnapshotId.get(),
+ changelogDecoupled)
+ : new StaticFromSnapshotStartingScanner(
+ snapshotManager, startingSnapshotId.get());
+ } else {
+ scanner = new FileCreationTimeStartingScanner(snapshotManager,
creationMillis);
+ }
+ return scanner;
+ }
+
private StartingScanner createIncrementalStartingScanner(SnapshotManager
snapshotManager) {
Options conf = options.toConfiguration();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
new file mode 100644
index 0000000000..d7fee6801c
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/CreationTimestampStartingScannerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.paimon.table.source.DataTableStreamScan.createCreationTimestampStartingScanner;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for createCreationTimestampStartingScanner. */
+public class CreationTimestampStartingScannerTest extends ScannerTestBase {
+
+ @Test
+ public void test() throws Exception {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ write.write(rowData(1, 40, 400L));
+
+ long t1 = System.currentTimeMillis();
+ commit.commit(0, write.prepareCommit(true, 0));
+ long t2 = System.currentTimeMillis();
+
+ write.write(rowData(1, 11, 100L));
+ write.write(rowData(1, 21, 200L));
+ write.write(rowData(1, 41, 400L));
+ commit.commit(0, write.prepareCommit(true, 0));
+ long t3 = System.currentTimeMillis();
+ write.close();
+ commit.close();
+
+ StartingScanner scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t1, false,
true);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t1, false,
false);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t2, false,
true);
+ assertThat(scanner instanceof
ContinuousFromSnapshotStartingScanner).isTrue();
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t2, false,
false);
+ assertThat(scanner instanceof
StaticFromSnapshotStartingScanner).isTrue();
+
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t3, false,
true);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager, table.changelogManager(), t3, false,
false);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager,
+ table.changelogManager(),
+ snapshotManager.earliestSnapshot().timeMillis(),
+ true,
+ true);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager,
+ table.changelogManager(),
+ snapshotManager.earliestSnapshot().timeMillis(),
+ false,
+ false);
+ assertThat(scanner instanceof
FileCreationTimeStartingScanner).isTrue();
+
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager,
+ table.changelogManager(),
+ snapshotManager.latestSnapshot().timeMillis(),
+ true,
+ true);
+ assertThat(scanner instanceof
ContinuousFromSnapshotStartingScanner).isTrue();
+ scanner =
+ createCreationTimestampStartingScanner(
+ snapshotManager,
+ table.changelogManager(),
+ snapshotManager.latestSnapshot().timeMillis(),
+ false,
+ false);
+ assertThat(scanner instanceof
StaticFromSnapshotStartingScanner).isTrue();
+ }
+}