This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d244098 [FLINK-29232] File store continuous reading support 
from_timestamp scan mode
0d244098 is described below

commit 0d2440984f6ca9c4e26caca7034280e9c3726337
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Tue Nov 22 09:52:24 2022 +0800

    [FLINK-29232] File store continuous reading support from_timestamp scan mode
    
    This closes #388
---
 .../store/connector/AbstractTableStoreFactory.java |  8 --
 .../store/connector/source/FileStoreSource.java    | 34 ++-------
 .../store/connector/source/FlinkSourceBuilder.java | 16 ++--
 .../store/connector/ContinuousFileStoreITCase.java | 86 ++++++++++++++++++++--
 .../org/apache/flink/table/store/CoreOptions.java  |  8 ++
 .../table/store/file/utils/SnapshotManager.java    | 20 +++++
 .../store/table/AppendOnlyFileStoreTable.java      |  2 +-
 .../table/ChangelogValueCountFileStoreTable.java   |  2 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  2 +-
 .../table/store/table/FileStoreTableFactory.java   | 19 +++++
 .../table/store/table/source/DataTableScan.java    | 17 ++++-
 .../store/table/source/SnapshotEnumerator.java     | 47 +++++++++++-
 12 files changed, 198 insertions(+), 63 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
index cb09a06d..eb638104 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
-import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.connector.source.MetadataTableSource;
 import org.apache.flink.table.store.connector.source.TableStoreSource;
@@ -54,7 +53,6 @@ import java.util.Set;
 
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM;
 import static 
org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE;
 import static 
org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory;
@@ -133,12 +131,6 @@ public abstract class AbstractTableStoreFactory
             throw new ValidationException(
                     "File store continuous reading dose not support eventual 
consistency mode.");
         }
-        LogStartupMode startupMode = options.get(LOG_SCAN);
-        if (startupMode == LogStartupMode.FROM_TIMESTAMP) {
-            throw new ValidationException(
-                    "File store continuous reading dose not support 
from_timestamp scan mode, "
-                            + "you can add timestamp filters instead.");
-        }
     }
 
     static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context 
context) {
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index fff7589d..573e5f98 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -22,20 +22,19 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
+import org.apache.flink.table.store.table.source.SnapshotEnumerator;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 import static 
org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
-import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** {@link Source} of file store. */
 public class FileStoreSource extends FlinkSource {
@@ -48,13 +47,10 @@ public class FileStoreSource extends FlinkSource {
 
     private final long discoveryInterval;
 
-    private final boolean latestContinuous;
-
     public FileStoreSource(
             FileStoreTable table,
             boolean isContinuous,
             long discoveryInterval,
-            boolean latestContinuous,
             @Nullable int[][] projectedFields,
             @Nullable Predicate predicate,
             @Nullable Long limit) {
@@ -62,7 +58,6 @@ public class FileStoreSource extends FlinkSource {
         this.table = table;
         this.isContinuous = isContinuous;
         this.discoveryInterval = discoveryInterval;
-        this.latestContinuous = latestContinuous;
     }
 
     @Override
@@ -83,28 +78,9 @@ public class FileStoreSource extends FlinkSource {
         Long snapshotId;
         Collection<FileStoreSourceSplit> splits;
         if (checkpoint == null) {
-            // TODO refactor split initialization logic into split enumerator 
or snapshot enumerator
-            // first, create new enumerator, plan splits
-            if (latestContinuous) {
-                checkArgument(
-                        isContinuous,
-                        "The latest continuous can only be true when 
isContinuous is true.");
-                snapshotId = snapshotManager.latestSnapshotId();
-                splits = new ArrayList<>();
-            } else {
-                DataTableScan.DataFilePlan plan;
-                if (table.options().changelogProducer()
-                                == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
-                        && isContinuous) {
-                    // Read the results of the last full compaction.
-                    // Only full compaction results will appear on the max 
level.
-                    plan = scan.withLevel(table.options().numLevels() - 
1).plan();
-                } else {
-                    plan = scan.plan();
-                }
-                snapshotId = plan.snapshotId;
-                splits = new 
FileStoreSourceSplitGenerator().createSplits(plan);
-            }
+            DataFilePlan plan = isContinuous ? 
SnapshotEnumerator.startup(scan) : scan.plan();
+            snapshotId = plan.snapshotId;
+            splits = new FileStoreSourceSplitGenerator().createSplits(plan);
         } else {
             // restore from checkpoint
             snapshotId = checkpoint.currentSnapshotId();
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index 802e0ac3..d5197542 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -117,15 +117,9 @@ public class FlinkSourceBuilder {
         return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
     }
 
-    private FileStoreSource buildFileSource(boolean isContinuous, boolean 
continuousScanLatest) {
+    private FileStoreSource buildFileSource(boolean isContinuous) {
         return new FileStoreSource(
-                table,
-                isContinuous,
-                discoveryIntervalMills(),
-                continuousScanLatest,
-                projectedFields,
-                predicate,
-                limit);
+                table, isContinuous, discoveryIntervalMills(), 
projectedFields, predicate, limit);
     }
 
     private Source<RowData, ?, ?> buildSource() {
@@ -150,20 +144,20 @@ public class FlinkSourceBuilder {
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);
             if (logSourceProvider == null) {
-                return buildFileSource(true, startupMode == 
LogStartupMode.LATEST);
+                return buildFileSource(true);
             } else {
                 if (startupMode != LogStartupMode.FULL) {
                     return logSourceProvider.createSource(null);
                 }
                 return HybridSource.<RowData, 
StaticFileStoreSplitEnumerator>builder(
-                                buildFileSource(false, false))
+                                buildFileSource(false))
                         .addSource(
                                 new LogHybridSourceFactory(logSourceProvider),
                                 Boundedness.CONTINUOUS_UNBOUNDED)
                         .build();
             }
         } else {
-            return buildFileSource(false, false);
+            return buildFileSource(false);
         }
     }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 635993ca..1a7945ae 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -18,15 +18,22 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
@@ -114,6 +121,77 @@ public class ContinuousFileStoreITCase extends 
FileStoreTableITCase {
                 .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", 
"11", "12"));
     }
 
+    @Test
+    public void testContinuousFromTimestamp() throws Exception {
+        String sql =
+                "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 
'log.scan.timestamp-millis'='%s') */";
+
+        // empty table
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(streamSqlIter(sql, 0));
+        batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
+        batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", 
"5", "6"));
+        iterator.close();
+
+        SnapshotManager snapshotManager =
+                new SnapshotManager(
+                        new Path(path, 
"default_catalog.catalog/default_database.db/T1"));
+        List<Snapshot> snapshots =
+                new 
ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots()));
+        snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis));
+        Snapshot first = snapshots.get(0);
+        Snapshot second = snapshots.get(1);
+
+        // before second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() 
- 1));
+        batchSql("INSERT INTO T1 VALUES ('13', '14', '15')");
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from second snapshot
+        iterator = BlockingIterator.of(streamSqlIter(sql, 
second.timeMillis()));
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of("7", "8", "9"), Row.of("10", "11", "12"), 
Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from start
+        iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 
1));
+        assertThat(iterator.collect(5))
+                .containsExactlyInAnyOrder(
+                        Row.of("1", "2", "3"),
+                        Row.of("4", "5", "6"),
+                        Row.of("7", "8", "9"),
+                        Row.of("10", "11", "12"),
+                        Row.of("13", "14", "15"));
+        iterator.close();
+
+        // from end
+        iterator =
+                BlockingIterator.of(
+                        streamSqlIter(
+                                sql,
+                                snapshotManager
+                                                
.snapshot(snapshotManager.latestSnapshotId())
+                                                .timeMillis()
+                                        + 1));
+        batchSql("INSERT INTO T1 VALUES ('16', '17', '18')");
+        assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16", 
"17", "18"));
+        iterator.close();
+    }
+
+    @Test
+    public void testLackStartupTimestamp() {
+        assertThatThrownBy(
+                        () ->
+                                streamSqlIter(
+                                        "SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='from-timestamp') */"))
+                .hasMessageContaining("Unable to create a source for reading 
table");
+    }
+
     @Test
     public void testIgnoreOverwrite() throws TimeoutException {
         BlockingIterator<Row, Row> iterator =
@@ -147,12 +225,4 @@ public class ContinuousFileStoreITCase extends 
FileStoreTableITCase {
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
                 "File store continuous reading dose not support eventual 
consistency mode");
     }
-
-    @Test
-    public void testUnsupportedStartupTimestamp() {
-        assertThatThrownBy(
-                () -> streamSqlIter("SELECT * FROM T1 /*+ 
OPTIONS('log.scan'='from-timestamp') */"),
-                "File store continuous reading dose not support from_timestamp 
scan mode, "
-                        + "you can add timestamp filters instead.");
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 44164ace..f54e8461 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -527,6 +527,14 @@ public class CoreOptions implements Serializable {
         return options.get(CHANGELOG_PRODUCER);
     }
 
+    public LogStartupMode logStartupMode() {
+        return options.get(LOG_SCAN);
+    }
+
+    public Long logScanTimestampMills() {
+        return options.get(LOG_SCAN_TIMESTAMP_MILLS);
+    }
+
     public Duration changelogProducerFullCompactionTriggerInterval() {
         return 
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
index 40e72c0b..515c32ed 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java
@@ -90,6 +90,26 @@ public class SnapshotManager {
         }
     }
 
+    /**
+     * Returns a snapshot earlier than the timestamp mills. A non-existent 
snapshot may be returned
+     * if all snapshots are later than the timestamp mills.
+     */
+    public @Nullable Long earlierThanTimeMills(long timestampMills) {
+        Long earliest = earliestSnapshotId();
+        Long latest = latestSnapshotId();
+        if (earliest == null || latest == null) {
+            return null;
+        }
+
+        for (long i = latest; i >= earliest; i--) {
+            long commitTime = snapshot(i).timeMillis();
+            if (commitTime < timestampMills) {
+                return i;
+            }
+        }
+        return earliest - 1;
+    }
+
     public long snapshotCount() throws IOException {
         return listVersionedFiles(snapshotDirectory(), 
SNAPSHOT_PREFIX).count();
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 665a241a..c7e15ef4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -66,7 +66,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public DataTableScan newScan() {
         AppendOnlyFileStoreScan scan = store.newScan();
-        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory(), 
options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new AppendOnlySplitGenerator(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 05294e60..8696b9db 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -83,7 +83,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public DataTableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
-        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory(), 
options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index baef2dfa..06563306 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -146,7 +146,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public DataTableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
-        return new DataTableScan(scan, tableSchema, store.pathFactory()) {
+        return new DataTableScan(scan, tableSchema, store.pathFactory(), 
options()) {
             @Override
             protected SplitGenerator splitGenerator(FileStorePathFactory 
pathFactory) {
                 return new MergeTreeSplitGenerator(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index f9bb7881..d33c138e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.LogStartupMode;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -60,6 +61,11 @@ public class FileStoreTableFactory {
         Configuration newOptions = 
Configuration.fromMap(tableSchema.options());
         dynamicOptions.toMap().forEach(newOptions::setString);
         newOptions.set(PATH, tablePath.toString());
+
+        // validate merged options
+        validateOptions(new CoreOptions(newOptions));
+
+        // copy a new table store to contain dynamic options
         tableSchema = tableSchema.copy(newOptions.toMap());
 
         SchemaManager schemaManager = new SchemaManager(tablePath);
@@ -73,4 +79,17 @@ public class FileStoreTableFactory {
             }
         }
     }
+
+    private static void validateOptions(CoreOptions options) {
+        if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) {
+            if (options.logScanTimestampMills() == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "%s can not be null when you use %s for %s",
+                                CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                CoreOptions.LOG_SCAN.key()));
+            }
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
index e1ed455f..e155ec1f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java
@@ -20,12 +20,14 @@ package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -42,14 +44,19 @@ public abstract class DataTableScan implements TableScan {
     private final FileStoreScan scan;
     private final TableSchema tableSchema;
     private final FileStorePathFactory pathFactory;
+    private final CoreOptions options;
 
     private boolean isIncremental = false;
 
     protected DataTableScan(
-            FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory 
pathFactory) {
+            FileStoreScan scan,
+            TableSchema tableSchema,
+            FileStorePathFactory pathFactory,
+            CoreOptions options) {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.pathFactory = pathFactory;
+        this.options = options;
     }
 
     public DataTableScan withSnapshot(long snapshotId) {
@@ -142,6 +149,14 @@ public abstract class DataTableScan implements TableScan {
 
     protected abstract void withNonPartitionFilter(Predicate predicate);
 
+    public CoreOptions options() {
+        return options;
+    }
+
+    public SnapshotManager snapshotManager() {
+        return new SnapshotManager(pathFactory.root());
+    }
+
     /** Scanning plan containing snapshot ID and input splits. */
     public static class DataFilePlan implements Plan {
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
index edbaa1bc..067ef812 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -22,14 +22,18 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 
+import static 
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+
 /** Enumerator to enumerate incremental snapshots. */
 public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.EnumeratorResult> {
 
@@ -85,7 +89,7 @@ public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.Enumerato
                 continue;
             }
 
-            DataTableScan.DataFilePlan plan = 
scan.withSnapshot(nextSnapshotId).plan();
+            DataFilePlan plan = scan.withSnapshot(nextSnapshotId).plan();
             EnumeratorResult result = new EnumeratorResult(nextSnapshotId, 
plan);
             LOG.debug("Find snapshot id {}.", nextSnapshotId);
 
@@ -99,11 +103,48 @@ public class SnapshotEnumerator implements 
Callable<SnapshotEnumerator.Enumerato
 
         public final long snapshotId;
 
-        public final DataTableScan.DataFilePlan plan;
+        public final DataFilePlan plan;
 
-        private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan 
plan) {
+        private EnumeratorResult(long snapshotId, DataFilePlan plan) {
             this.snapshotId = snapshotId;
             this.plan = plan;
         }
     }
+
+    /** Startup snapshot enumerator, this is the first plan for continuous 
reading. */
+    public static DataFilePlan startup(DataTableScan scan) {
+        CoreOptions options = scan.options();
+        SnapshotManager snapshotManager = scan.snapshotManager();
+        CoreOptions.LogStartupMode startupMode = options.logStartupMode();
+        switch (startupMode) {
+            case FULL:
+                DataFilePlan plan;
+                if (options.changelogProducer() == FULL_COMPACTION) {
+                    // Read the results of the last full compaction.
+                    // Only full compaction results will appear on the max 
level.
+                    plan = scan.withLevel(options.numLevels() - 1).plan();
+                } else {
+                    plan = scan.plan();
+                }
+                return plan;
+            case LATEST:
+                return new DataFilePlan(
+                        snapshotManager.latestSnapshotId(), 
Collections.emptyList());
+            case FROM_TIMESTAMP:
+                Long timestampMills = options.logScanTimestampMills();
+                if (timestampMills == null) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "%s can not be null when you use %s for 
%s",
+                                    CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(),
+                                    CoreOptions.LogStartupMode.FROM_TIMESTAMP,
+                                    CoreOptions.LOG_SCAN.key()));
+                }
+                return new DataFilePlan(
+                        snapshotManager.earlierThanTimeMills(timestampMills),
+                        Collections.emptyList());
+            default:
+                throw new UnsupportedOperationException("Unsupported startup 
mode: " + startupMode);
+        }
+    }
 }

Reply via email to