This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 71918811d [core] Introduce incremental-between read by timestamp 
(#1470)
71918811d is described below

commit 71918811d0014e2948f5cd4382f55f52c256e46f
Author: pongandnoon <[email protected]>
AuthorDate: Thu Jul 6 11:01:52 2023 +0800

    [core] Introduce incremental-between read by timestamp (#1470)
---
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  26 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  28 ++-
 .../table/source/AbstractInnerTableScan.java       |  15 +-
 .../snapshot/IncrementalStartingScanner.java       |  11 +-
 .../IncrementalTimeStampStartingScanner.java       |  52 +++++
 .../table/IncrementalTimeStampTableTest.java       | 220 +++++++++++++++++++++
 7 files changed, 341 insertions(+), 19 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6cb820349..9df8991c8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,12 @@ under the License.
             <td>String</td>
             <td>Read incremental changes between start snapshot (exclusive) 
and end snapshot, for example, '5,10' means changes between snapshot 5 and 
snapshot 10.</td>
         </tr>
+        <tr>
+            <td><h5>incremental-between-timestamp</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Read incremental changes between start timestamp (exclusive) 
and end timestamp, for example, 't1,t2' means changes between timestamp t1 and 
timestamp t2.</td>
+        </tr>
         <tr>
             <td><h5>local-sort.max-num-file-handles</h5></td>
             <td style="word-wrap: break-word;">128</td>
@@ -366,7 +372,7 @@ under the License.
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
+            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
         </tr>
         <tr>
             <td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 0a1db378f..13325d536 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -697,6 +697,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Read incremental changes between start snapshot 
(exclusive) and end snapshot, "
                                     + "for example, '5,10' means changes 
between snapshot 5 and snapshot 10.");
+    public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP =
+            key("incremental-between-timestamp")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Read incremental changes between start timestamp 
(exclusive) and end timestamp, "
+                                    + "for example, 't1,t2' means changes 
between timestamp t1 and timestamp t2.");
 
     public static final String STATS_MODE_SUFFIX = "stats-mode";
 
@@ -965,7 +972,8 @@ public class CoreOptions implements Serializable {
             } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
                     || options.getOptional(SCAN_TAG_NAME).isPresent()) {
                 return StartupMode.FROM_SNAPSHOT;
-            } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()) {
+            } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()
+                    || 
options.getOptional(INCREMENTAL_BETWEEN_TIMESTAMP).isPresent()) {
                 return StartupMode.INCREMENTAL;
             } else {
                 return StartupMode.LATEST_FULL;
@@ -996,14 +1004,17 @@ public class CoreOptions implements Serializable {
     public Pair<String, String> incrementalBetween() {
         String str = options.get(INCREMENTAL_BETWEEN);
         if (str == null) {
-            return null;
+            str = options.get(INCREMENTAL_BETWEEN_TIMESTAMP);
+            if (str == null) {
+                return null;
+            }
         }
 
         String[] split = str.split(",");
         if (split.length != 2) {
             throw new IllegalArgumentException(
-                    "The incremental-between must specific start snapshot 
(exclusive) and end snapshot,"
-                            + " for example, '5,10' means changes between 
snapshot 5 and snapshot 10. But is: "
+                    "The incremental-between or incremental-between-timestamp  
must specific start(exclusive) and end snapshot or timestamp,"
+                            + " for example, 'incremental-between'='5,10' 
means changes between snapshot 5 and snapshot 10. But is: "
                             + str);
         }
         return Pair.of(split[0], split[1]);
@@ -1192,7 +1203,8 @@ public class CoreOptions implements Serializable {
                         + "produces a snapshot specified by 
\"scan.snapshot-id\" but does not read new changes."),
 
         INCREMENTAL(
-                "incremental", "Read incremental changes between start 
snapshot and end snapshot.");
+                "incremental",
+                "Read incremental changes between start and end snapshot or 
timestamp.");
 
         private final String value;
         private final String description;
@@ -1449,7 +1461,9 @@ public class CoreOptions implements Serializable {
             options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
         }
 
-        if (options.contains(INCREMENTAL_BETWEEN) && 
!options.contains(SCAN_MODE)) {
+        if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
+                        || options.contains(INCREMENTAL_BETWEEN))
+                && !options.contains(SCAN_MODE)) {
             options.set(SCAN_MODE, StartupMode.INCREMENTAL);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index fee830427..044f6b1f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
 import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -191,31 +192,48 @@ public class SchemaValidation {
                     options, SCAN_TIMESTAMP_MILLIS, 
CoreOptions.StartupMode.FROM_TIMESTAMP);
             checkOptionsConflict(
                     options,
-                    Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME, 
INCREMENTAL_BETWEEN),
+                    Arrays.asList(
+                            SCAN_SNAPSHOT_ID,
+                            SCAN_TAG_NAME,
+                            INCREMENTAL_BETWEEN_TIMESTAMP,
+                            INCREMENTAL_BETWEEN),
                     Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT) {
             checkExactOneOptionExistInMode(
                     options, options.startupMode(), SCAN_SNAPSHOT_ID, 
SCAN_TAG_NAME);
             checkOptionsConflict(
                     options,
-                    Arrays.asList(SCAN_TIMESTAMP_MILLIS, INCREMENTAL_BETWEEN),
+                    Arrays.asList(
+                            SCAN_TIMESTAMP_MILLIS,
+                            INCREMENTAL_BETWEEN_TIMESTAMP,
+                            INCREMENTAL_BETWEEN),
                     Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.INCREMENTAL) {
-            checkOptionExistInMode(options, INCREMENTAL_BETWEEN, 
options.startupMode());
+            checkExactOneOptionExistInMode(
+                    options,
+                    options.startupMode(),
+                    INCREMENTAL_BETWEEN,
+                    INCREMENTAL_BETWEEN_TIMESTAMP);
             checkOptionsConflict(
                     options,
                     Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS, 
SCAN_TAG_NAME),
-                    Collections.singletonList(INCREMENTAL_BETWEEN));
+                    Arrays.asList(INCREMENTAL_BETWEEN, 
INCREMENTAL_BETWEEN_TIMESTAMP));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
             checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, 
options.startupMode());
             checkOptionsConflict(
                     options,
-                    Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME, 
INCREMENTAL_BETWEEN),
+                    Arrays.asList(
+                            SCAN_TIMESTAMP_MILLIS,
+                            SCAN_TAG_NAME,
+                            INCREMENTAL_BETWEEN_TIMESTAMP,
+                            INCREMENTAL_BETWEEN),
                     Collections.singletonList(SCAN_SNAPSHOT_ID));
         } else {
             checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, 
options.startupMode());
             checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, 
options.startupMode());
             checkOptionNotExistInMode(options, SCAN_TAG_NAME, 
options.startupMode());
+            checkOptionNotExistInMode(
+                    options, INCREMENTAL_BETWEEN_TIMESTAMP, 
options.startupMode());
             checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, 
options.startupMode());
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 3a3d7ee62..374050af6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -34,6 +34,7 @@ import 
org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
 import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
+import 
org.apache.paimon.table.source.snapshot.IncrementalTimeStampStartingScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
@@ -134,10 +135,16 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
                         : new 
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
             case INCREMENTAL:
                 checkArgument(!isStreaming, "Cannot read incremental in 
streaming mode.");
-                Pair<String, String> incremental = 
options.incrementalBetween();
-                return new IncrementalStartingScanner(
-                        Long.parseLong(incremental.getLeft()),
-                        Long.parseLong(incremental.getRight()));
+                Pair<String, String> incrementalBetween = 
options.incrementalBetween();
+                if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) 
!= null) {
+                    return new IncrementalStartingScanner(
+                            Long.parseLong(incrementalBetween.getLeft()),
+                            Long.parseLong(incrementalBetween.getRight()));
+                } else {
+                    return new IncrementalTimeStampStartingScanner(
+                            Long.parseLong(incrementalBetween.getLeft()),
+                            Long.parseLong(incrementalBetween.getRight()));
+                }
             default:
                 throw new UnsupportedOperationException(
                         "Unknown startup mode " + startupMode.name());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 74972f2ab..40d40c830 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -33,11 +33,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/** {@link StartingScanner} for incremental changes. */
+/** {@link StartingScanner} for incremental changes by snapshot. */
 public class IncrementalStartingScanner implements StartingScanner {
 
-    private final long start;
-    private final long end;
+    private long start;
+    private long end;
 
     public IncrementalStartingScanner(long start, long end) {
         this.start = start;
@@ -46,6 +46,11 @@ public class IncrementalStartingScanner implements 
StartingScanner {
 
     @Override
     public Result scan(SnapshotManager manager, SnapshotReader reader) {
+        long earliestSnapshotId = manager.earliestSnapshotId();
+        long latestSnapshotId = manager.latestSnapshotId();
+        start = (start < earliestSnapshotId) ? earliestSnapshotId - 1 : start;
+        end = (end > latestSnapshotId) ? latestSnapshotId : end;
+
         Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
HashMap<>();
         for (long i = start + 1; i < end + 1; i++) {
             List<DataSplit> splits = readDeltaSplits(reader, 
manager.snapshot(i));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
new file mode 100644
index 000000000..ed8613a26
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTimeStampStartingScanner.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+
+/** {@link StartingScanner} for incremental changes by timestamp. */
+public class IncrementalTimeStampStartingScanner implements StartingScanner {
+
+    private final long startTimestamp;
+    private final long endTimestamp;
+
+    public IncrementalTimeStampStartingScanner(long startTimestamp, long 
endTimestamp) {
+        this.startTimestamp = startTimestamp;
+        this.endTimestamp = endTimestamp;
+    }
+
+    @Override
+    public Result scan(SnapshotManager manager, SnapshotReader reader) {
+        Snapshot earliestSnapshot = 
manager.snapshot(manager.earliestSnapshotId());
+        Snapshot latestSnapshot = manager.latestSnapshot();
+        if (startTimestamp > latestSnapshot.timeMillis()
+                || endTimestamp < earliestSnapshot.timeMillis()) {
+            return new NoSnapshot();
+        }
+        Snapshot startSnapshot = 
manager.earlierOrEqualTimeMills(startTimestamp);
+        Long startSnapshotId =
+                (startSnapshot == null) ? earliestSnapshot.id() - 1 : 
startSnapshot.id();
+        Snapshot endSnapshot = manager.earlierOrEqualTimeMills(endTimestamp);
+        Long endSnapshotId = (endSnapshot == null) ? latestSnapshot.id() : 
endSnapshot.id();
+        IncrementalStartingScanner incrementalStartingScanner =
+                new IncrementalStartingScanner(startSnapshotId, endSnapshotId);
+        return incrementalStartingScanner.scan(manager, reader);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
new file mode 100644
index 000000000..56486ea8a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#INCREMENTAL_BETWEEN_TIMESTAMP}. */
+public class IncrementalTimeStampTableTest extends TableTestBase {
+
+    @Test
+    public void testPrimaryKeyTable() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+
+        Long timestampEarliest = System.currentTimeMillis();
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append
+        write(
+                table,
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(1, 2, 2),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 1, 2));
+        Long timestampSnapshot2 = snapshotManager.snapshot(2).timeMillis();
+
+        // snapshot 3: compact
+        compact(table, row(1), 0);
+
+        // snapshot 4: append
+        write(
+                table,
+                GenericRow.of(1, 1, 3),
+                GenericRow.of(1, 2, 3),
+                GenericRow.of(2, 1, 3),
+                GenericRow.of(2, 2, 1));
+
+        // snapshot 5: append
+        write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4), 
GenericRow.of(2, 1, 4));
+        Long timestampSnapshot4 = snapshotManager.snapshot(5).timeMillis();
+
+        // snapshot 6: append
+        write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5), 
GenericRow.of(2, 1, 5));
+
+        List<InternalRow> result1 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampEarliest - 1, 
timestampEarliest)));
+        assertThat(result1).isEmpty();
+
+        List<InternalRow> result2 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampEarliest, 
timestampSnapshot2)));
+        assertThat(result2)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 2),
+                        GenericRow.of(1, 2, 2),
+                        GenericRow.of(1, 3, 1),
+                        GenericRow.of(1, 4, 1),
+                        GenericRow.of(2, 1, 2));
+
+        List<InternalRow> result3 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampSnapshot2, 
timestampSnapshot4)));
+        assertThat(result3)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 4),
+                        GenericRow.of(1, 2, 4),
+                        GenericRow.of(2, 1, 4),
+                        GenericRow.of(2, 2, 1));
+    }
+
+    @Test
+    public void testAppendTable() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        SnapshotManager snapshotManager = new 
SnapshotManager(LocalFileIO.create(), tablePath);
+        Long timestampEarliest = System.currentTimeMillis();
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append
+        write(
+                table,
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(1, 2, 2),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 1, 2));
+        Long timestampSnapshot2 = snapshotManager.snapshot(2).timeMillis();
+
+        // snapshot 3: append
+        write(
+                table,
+                GenericRow.of(1, 1, 3),
+                GenericRow.of(1, 2, 3),
+                GenericRow.of(2, 1, 3),
+                GenericRow.of(2, 2, 1));
+
+        // snapshot 4: append
+        write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4), 
GenericRow.of(2, 1, 4));
+
+        // snapshot 5: append
+        write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5), 
GenericRow.of(2, 1, 5));
+
+        Long timestampSnapshot4 = snapshotManager.snapshot(4).timeMillis();
+
+        List<InternalRow> result1 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampEarliest - 1, 
timestampEarliest)));
+        assertThat(result1).isEmpty();
+
+        List<InternalRow> result2 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampEarliest, 
timestampSnapshot2)));
+        assertThat(result2)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 1),
+                        GenericRow.of(1, 1, 2),
+                        GenericRow.of(1, 2, 1),
+                        GenericRow.of(1, 2, 2),
+                        GenericRow.of(1, 3, 1),
+                        GenericRow.of(1, 4, 1),
+                        GenericRow.of(2, 1, 1),
+                        GenericRow.of(2, 1, 2));
+
+        List<InternalRow> result3 =
+                read(
+                        table,
+                        Pair.of(
+                                INCREMENTAL_BETWEEN_TIMESTAMP,
+                                String.format("%s,%s", timestampSnapshot2, 
timestampSnapshot4)));
+        assertThat(result3)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 3),
+                        GenericRow.of(1, 2, 3),
+                        GenericRow.of(2, 1, 3),
+                        GenericRow.of(2, 2, 1),
+                        GenericRow.of(1, 1, 4),
+                        GenericRow.of(1, 2, 4),
+                        GenericRow.of(2, 1, 4));
+    }
+}

Reply via email to