This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bf7ddccac [core] Throw exception when travel to timestamp before the 
earliest snapshot (#5423)
6bf7ddccac is described below

commit 6bf7ddccac44e285f338b17e25507c2ed7d110dd
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Apr 9 10:28:32 2025 +0800

    [core] Throw exception when travel to timestamp before the earliest 
snapshot (#5423)
---
 .../StaticFromSnapshotStartingScanner.java         |  6 ++----
 .../StaticFromTimestampStartingScanner.java        | 18 ++++++++++--------
 .../org/apache/paimon/utils/SnapshotManager.java   |  4 ++--
 .../apache/paimon/utils/SnapshotManagerTest.java   | 14 ++++++--------
 .../org/apache/paimon/flink/TimeTravelITCase.java  | 16 +++++++++++-----
 .../apache/paimon/spark/SparkTimeTravelITCase.java | 22 ++++++++++++++++------
 .../spark/SparkTimeTravelWithDataFrameITCase.java  | 21 +++++++++++++--------
 7 files changed, 60 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index 19b9da2440..3621259a6a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -52,12 +52,10 @@ public class StaticFromSnapshotStartingScanner extends 
ReadPlanStartingScanner {
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
         if (earliestSnapshotId == null || latestSnapshotId == null) {
-            LOG.warn("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return null;
+            throw new IllegalArgumentException("There is currently no 
snapshot.");
         }
 
-        // Checks earlier whether the specified scan snapshot id is valid and 
throws the correct
-        // exception.
+        // Checks earlier whether the specified scan snapshot id is valid.
         checkArgument(
                 startingSnapshotId >= earliestSnapshotId && startingSnapshotId 
<= latestSnapshotId,
                 "The specified scan snapshotId %s is out of available 
snapshotId range [%s, %s].",
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index a5087890a9..0f20938992 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -43,19 +43,21 @@ public class StaticFromTimestampStartingScanner extends 
ReadPlanStartingScanner
         super(snapshotManager);
         this.startupMillis = startupMillis;
         Snapshot snapshot = timeTravelToTimestamp(snapshotManager, 
startupMillis);
-        if (snapshot != null) {
-            this.startingSnapshotId = snapshot.id();
+        if (snapshot == null) {
+            Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+            throw new IllegalArgumentException(
+                    String.format(
+                            "There is currently no snapshot earlier than or 
equal to timestamp [%s], the earliest snapshot's timestamp is [%s]",
+                            startupMillis,
+                            earliestSnapshot == null
+                                    ? "null"
+                                    : 
String.valueOf(earliestSnapshot.timeMillis())));
         }
+        this.startingSnapshotId = snapshot.id();
     }
 
     @Override
     public SnapshotReader configure(SnapshotReader snapshotReader) {
-        if (startingSnapshotId == null) {
-            LOG.debug(
-                    "There is currently no snapshot earlier than or equal to 
timestamp[{}]",
-                    startupMillis);
-            return null;
-        }
         return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 42c8b2ba71..09dcc567b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -264,7 +264,7 @@ public class SnapshotManager implements Serializable {
     }
 
     /**
-     * Returns a {@link Snapshot} whoes commit time is earlier than or equal 
to given timestamp
+     * Returns a {@link Snapshot} whose commit time is earlier than or equal 
to given timestamp
      * mills. If there is no such a snapshot, returns null.
      */
     public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
@@ -275,7 +275,7 @@ public class SnapshotManager implements Serializable {
 
         Snapshot earliestSnapShot = earliestSnapshot(latest);
         if (earliestSnapShot == null || earliestSnapShot.timeMillis() > 
timestampMills) {
-            return earliestSnapShot;
+            return null;
         }
         long earliest = earliestSnapShot.id();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 45ec66000c..ed5badb764 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -176,19 +176,17 @@ public class SnapshotManagerTest {
         }
 
         if (isRaceCondition) {
-            // The earliest snapshot has expired, so always return the second 
snapshot
-            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L).timeMillis())
-                    .isEqualTo(millis + 1000L);
-            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999).timeMillis())
-                    .isEqualTo(millis + 1000L);
+            // The earliest snapshot has expired, so always return the second 
snapshot, smaller than
+            // the second snapshot return null
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L)).isEqualTo(null);
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999)).isEqualTo(null);
             assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1000).timeMillis())
                     .isEqualTo(millis + 1000L);
             assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1001).timeMillis())
                     .isEqualTo(millis + 1000L);
         } else {
-            // there is no snapshot smaller than "millis - 1L" return the 
earliest snapshot
-            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L).timeMillis())
-                    .isEqualTo(millis);
+            // there is no snapshot smaller than "millis - 1L" return null
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L)).isEqualTo(null);
 
             // smaller than the second snapshot return the first snapshot
             assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999).timeMillis())
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
index 1c9aa0547c..f3707f3258 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TimeTravelITCase.java
@@ -27,7 +27,9 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
 
+import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT case for flink time travel. */
 public class TimeTravelITCase extends CatalogITCaseBase {
@@ -109,13 +111,17 @@ public class TimeTravelITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testTravelToNonExistedTimestamp() {
+    public void testTravelToTimestampBeforeTheEarliestSnapshot() {
         sql("CREATE TABLE t (k INT, v STRING)");
         sql("INSERT INTO t VALUES(1, 'hello'), (2, 'world')");
-        assertThat(
-                        sql("SELECT * FROM t FOR SYSTEM_TIME AS OF TIMESTAMP 
'1900-01-01 00:00:00'")
-                                .toString())
-                .isEqualTo("[+I[1, hello], +I[2, world]]");
+        assertThatThrownBy(
+                        () ->
+                                sql("SELECT * FROM t FOR SYSTEM_TIME AS OF 
TIMESTAMP '1900-01-01 00:00:00'")
+                                        .toString())
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "There is currently no snapshot earlier than 
or equal to timestamp"));
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 16a38c3a33..8e13cb18f5 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -151,19 +151,29 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
     public void testTravelToNonExistedVersion() {
         spark.sql("CREATE TABLE t (k INT, v STRING)");
 
-        assertThat(spark.sql("SELECT * FROM t VERSION AS OF 
2").collectAsList()).isEmpty();
+        assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF 
2").collectAsList())
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class, "There is 
currently no snapshot."));
     }
 
     @Test
-    public void testTravelToNonExistedTimestamp() {
+    public void testTravelToTimestampBeforeTheEarliestSnapshot() {
         long anchor = System.currentTimeMillis() / 1000;
 
         spark.sql("CREATE TABLE t (k INT, v STRING)");
 
-        assertThat(
-                        spark.sql(String.format("SELECT * FROM t TIMESTAMP AS 
OF %s", anchor))
-                                .collectAsList())
-                .isEmpty();
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                                String.format(
+                                                        "SELECT * FROM t 
TIMESTAMP AS OF %s",
+                                                        anchor))
+                                        .collectAsList())
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "There is currently no snapshot earlier than 
or equal to timestamp"));
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
index 9f613c1b92..518813b35e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelWithDataFrameITCase.java
@@ -125,14 +125,19 @@ public class SparkTimeTravelWithDataFrameITCase extends 
SparkReadTestBase {
     }
 
     @Test
-    public void testTravelToNonExistedTimestamp() {
-        Dataset<Row> dataset =
-                spark.read()
-                        .format("paimon")
-                        .option("path", tablePath1.toString())
-                        .option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
-                        .load();
-        assertThat(dataset.collectAsList().toString()).isEqualTo("[[1,2,1], 
[5,6,3]]");
+    public void testTravelToTimestampBeforeTheEarliestSnapshot() {
+        assertThatThrownBy(
+                        () ->
+                                spark.read()
+                                        .format("paimon")
+                                        .option("path", tablePath1.toString())
+                                        
.option(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), 0)
+                                        .load()
+                                        .collectAsList())
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class,
+                                "There is currently no snapshot earlier than 
or equal to timestamp [0]"));
     }
 
     @Test

Reply via email to