This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 0d244098 [FLINK-29232] File store continuous reading support from_timestamp scan mode 0d244098 is described below commit 0d2440984f6ca9c4e26caca7034280e9c3726337 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Nov 22 09:52:24 2022 +0800 [FLINK-29232] File store continuous reading support from_timestamp scan mode This closes #388 --- .../store/connector/AbstractTableStoreFactory.java | 8 -- .../store/connector/source/FileStoreSource.java | 34 ++------- .../store/connector/source/FlinkSourceBuilder.java | 16 ++-- .../store/connector/ContinuousFileStoreITCase.java | 86 ++++++++++++++++++++-- .../org/apache/flink/table/store/CoreOptions.java | 8 ++ .../table/store/file/utils/SnapshotManager.java | 20 +++++ .../store/table/AppendOnlyFileStoreTable.java | 2 +- .../table/ChangelogValueCountFileStoreTable.java | 2 +- .../table/ChangelogWithKeyFileStoreTable.java | 2 +- .../table/store/table/FileStoreTableFactory.java | 19 +++++ .../table/store/table/source/DataTableScan.java | 17 ++++- .../store/table/source/SnapshotEnumerator.java | 47 +++++++++++- 12 files changed, 198 insertions(+), 63 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java index cb09a06d..eb638104 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java @@ -33,7 +33,6 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.CoreOptions.LogChangelogMode; import org.apache.flink.table.store.CoreOptions.LogConsistency; -import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.apache.flink.table.store.connector.sink.TableStoreSink; import org.apache.flink.table.store.connector.source.MetadataTableSource; import org.apache.flink.table.store.connector.source.TableStoreSource; @@ -54,7 +53,6 @@ import java.util.Set; import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM; import static org.apache.flink.table.store.connector.FlinkConnectorOptions.NONE; import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory; @@ -133,12 +131,6 @@ public abstract class AbstractTableStoreFactory throw new ValidationException( "File store continuous reading dose not support eventual consistency mode."); } - LogStartupMode startupMode = options.get(LOG_SCAN); - if (startupMode == LogStartupMode.FROM_TIMESTAMP) { - throw new ValidationException( - "File store continuous reading dose not support from_timestamp scan mode, " - + "you can add timestamp filters instead."); - } } static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) { diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java index fff7589d..573e5f98 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java @@ -22,20 +22,19 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.table.source.DataTableScan; +import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan; +import org.apache.flink.table.store.table.source.SnapshotEnumerator; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.Collection; import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT; -import static org.apache.flink.util.Preconditions.checkArgument; /** {@link Source} of file store. */ public class FileStoreSource extends FlinkSource { @@ -48,13 +47,10 @@ public class FileStoreSource extends FlinkSource { private final long discoveryInterval; - private final boolean latestContinuous; - public FileStoreSource( FileStoreTable table, boolean isContinuous, long discoveryInterval, - boolean latestContinuous, @Nullable int[][] projectedFields, @Nullable Predicate predicate, @Nullable Long limit) { @@ -62,7 +58,6 @@ public class FileStoreSource extends FlinkSource { this.table = table; this.isContinuous = isContinuous; this.discoveryInterval = discoveryInterval; - this.latestContinuous = latestContinuous; } @Override @@ -83,28 +78,9 @@ public class FileStoreSource extends FlinkSource { Long snapshotId; Collection<FileStoreSourceSplit> splits; if (checkpoint == null) { - // TODO refactor split initialization logic into split enumerator or snapshot enumerator - // first, create new enumerator, plan splits - if (latestContinuous) { - checkArgument( - isContinuous, - "The latest continuous can only be true when isContinuous is true."); - snapshotId = snapshotManager.latestSnapshotId(); - splits = new ArrayList<>(); - } else { - DataTableScan.DataFilePlan plan; - if (table.options().changelogProducer() - == CoreOptions.ChangelogProducer.FULL_COMPACTION - && isContinuous) { - // Read the results of the last full compaction. - // Only full compaction results will appear on the max level. - plan = scan.withLevel(table.options().numLevels() - 1).plan(); - } else { - plan = scan.plan(); - } - snapshotId = plan.snapshotId; - splits = new FileStoreSourceSplitGenerator().createSplits(plan); - } + DataFilePlan plan = isContinuous ? SnapshotEnumerator.startup(scan) : scan.plan(); + snapshotId = plan.snapshotId; + splits = new FileStoreSourceSplitGenerator().createSplits(plan); } else { // restore from checkpoint snapshotId = checkpoint.currentSnapshotId(); diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java index 802e0ac3..d5197542 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java @@ -117,15 +117,9 @@ public class FlinkSourceBuilder { return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis(); } - private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) { + private FileStoreSource buildFileSource(boolean isContinuous) { return new FileStoreSource( - table, - isContinuous, - discoveryIntervalMills(), - continuousScanLatest, - projectedFields, - predicate, - limit); + table, isContinuous, discoveryIntervalMills(), projectedFields, predicate, limit); } private Source<RowData, ?, ?> buildSource() { @@ -150,20 +144,20 @@ public class FlinkSourceBuilder { LogStartupMode startupMode = conf.get(LOG_SCAN); if (logSourceProvider == null) { - return buildFileSource(true, startupMode == LogStartupMode.LATEST); + return buildFileSource(true); } else { if (startupMode != LogStartupMode.FULL) { return logSourceProvider.createSource(null); } return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder( - buildFileSource(false, false)) + buildFileSource(false)) .addSource( new LogHybridSourceFactory(logSourceProvider), Boundedness.CONTINUOUS_UNBOUNDED) .build(); } } else { - return buildFileSource(false, false); + return buildFileSource(false); } } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java index 635993ca..1a7945ae 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java @@ -18,15 +18,22 @@ package org.apache.flink.table.store.connector; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.utils.BlockingIterator; +import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.concurrent.TimeoutException; @@ -114,6 +121,77 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase { .containsExactlyInAnyOrder(Row.of("7", "8", "9"), Row.of("10", "11", "12")); } + @Test + public void testContinuousFromTimestamp() throws Exception { + String sql = + "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */"; + + // empty table + BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(sql, 0)); + batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); + batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); + assertThat(iterator.collect(2)) + .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); + iterator.close(); + + SnapshotManager snapshotManager = + new SnapshotManager( + new Path(path, "default_catalog.catalog/default_database.db/T1")); + List<Snapshot> snapshots = + new ArrayList<>(ImmutableList.copyOf(snapshotManager.snapshots())); + snapshots.sort(Comparator.comparingLong(Snapshot::timeMillis)); + Snapshot first = snapshots.get(0); + Snapshot second = snapshots.get(1); + + // before second snapshot + iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis() - 1)); + batchSql("INSERT INTO T1 VALUES ('13', '14', '15')"); + assertThat(iterator.collect(3)) + .containsExactlyInAnyOrder( + Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15")); + iterator.close(); + + // from second snapshot + iterator = BlockingIterator.of(streamSqlIter(sql, second.timeMillis())); + assertThat(iterator.collect(3)) + .containsExactlyInAnyOrder( + Row.of("7", "8", "9"), Row.of("10", "11", "12"), Row.of("13", "14", "15")); + iterator.close(); + + // from start + iterator = BlockingIterator.of(streamSqlIter(sql, first.timeMillis() - 1)); + assertThat(iterator.collect(5)) + .containsExactlyInAnyOrder( + Row.of("1", "2", "3"), + Row.of("4", "5", "6"), + Row.of("7", "8", "9"), + Row.of("10", "11", "12"), + Row.of("13", "14", "15")); + iterator.close(); + + // from end + iterator = + BlockingIterator.of( + streamSqlIter( + sql, + snapshotManager + .snapshot(snapshotManager.latestSnapshotId()) + .timeMillis() + + 1)); + batchSql("INSERT INTO T1 VALUES ('16', '17', '18')"); + assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("16", "17", "18")); + iterator.close(); + } + + @Test + public void testLackStartupTimestamp() { + assertThatThrownBy( + () -> + streamSqlIter( + "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */")) + .hasMessageContaining("Unable to create a source for reading table"); + } + @Test public void testIgnoreOverwrite() throws TimeoutException { BlockingIterator<Row, Row> iterator = @@ -147,12 +225,4 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase { "SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */"), "File store continuous reading dose not support eventual consistency mode"); } - - @Test - public void testUnsupportedStartupTimestamp() { - assertThatThrownBy( - () -> streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */"), - "File store continuous reading dose not support from_timestamp scan mode, " - + "you can add timestamp filters instead."); - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index 44164ace..f54e8461 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -527,6 +527,14 @@ public class CoreOptions implements Serializable { return options.get(CHANGELOG_PRODUCER); } + public LogStartupMode logStartupMode() { + return options.get(LOG_SCAN); + } + + public Long logScanTimestampMills() { + return options.get(LOG_SCAN_TIMESTAMP_MILLS); + } + public Duration changelogProducerFullCompactionTriggerInterval() { return options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java index 40e72c0b..515c32ed 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SnapshotManager.java @@ -90,6 +90,26 @@ public class SnapshotManager { } } + /** + * Returns a snapshot earlier than the timestamp mills. A non-existent snapshot may be returned + * if all snapshots are later than the timestamp mills. + */ + public @Nullable Long earlierThanTimeMills(long timestampMills) { + Long earliest = earliestSnapshotId(); + Long latest = latestSnapshotId(); + if (earliest == null || latest == null) { + return null; + } + + for (long i = latest; i >= earliest; i--) { + long commitTime = snapshot(i).timeMillis(); + if (commitTime < timestampMills) { + return i; + } + } + return earliest - 1; + } + public long snapshotCount() throws IOException { return listVersionedFiles(snapshotDirectory(), SNAPSHOT_PREFIX).count(); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java index 665a241a..c7e15ef4 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java @@ -66,7 +66,7 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable { @Override public DataTableScan newScan() { AppendOnlyFileStoreScan scan = store.newScan(); - return new DataTableScan(scan, tableSchema, store.pathFactory()) { + return new DataTableScan(scan, tableSchema, store.pathFactory(), options()) { @Override protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) { return new AppendOnlySplitGenerator( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java index 05294e60..8696b9db 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java @@ -83,7 +83,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable { @Override public DataTableScan newScan() { KeyValueFileStoreScan scan = store.newScan(); - return new DataTableScan(scan, tableSchema, store.pathFactory()) { + return new DataTableScan(scan, tableSchema, store.pathFactory(), options()) { @Override protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) { return new MergeTreeSplitGenerator( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java index baef2dfa..06563306 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java @@ -146,7 +146,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable { @Override public DataTableScan newScan() { KeyValueFileStoreScan scan = store.newScan(); - return new DataTableScan(scan, tableSchema, store.pathFactory()) { + return new DataTableScan(scan, tableSchema, store.pathFactory(), options()) { @Override protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) { return new MergeTreeSplitGenerator( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java index f9bb7881..d33c138e 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.table.store.table; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.apache.flink.table.store.file.WriteMode; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; @@ -60,6 +61,11 @@ public class FileStoreTableFactory { Configuration newOptions = Configuration.fromMap(tableSchema.options()); dynamicOptions.toMap().forEach(newOptions::setString); newOptions.set(PATH, tablePath.toString()); + + // validate merged options + validateOptions(new CoreOptions(newOptions)); + + // copy a new table store to contain dynamic options tableSchema = tableSchema.copy(newOptions.toMap()); SchemaManager schemaManager = new SchemaManager(tablePath); @@ -73,4 +79,17 @@ public class FileStoreTableFactory { } } } + + private static void validateOptions(CoreOptions options) { + if (options.logStartupMode() == LogStartupMode.FROM_TIMESTAMP) { + if (options.logScanTimestampMills() == null) { + throw new IllegalArgumentException( + String.format( + "%s can not be null when you use %s for %s", + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), + CoreOptions.LogStartupMode.FROM_TIMESTAMP, + CoreOptions.LOG_SCAN.key())); + } + } + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java index e1ed455f..e155ec1f 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/DataTableScan.java @@ -20,12 +20,14 @@ package org.apache.flink.table.store.table.source; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.io.DataFileMeta; import org.apache.flink.table.store.file.operation.FileStoreScan; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.predicate.PredicateBuilder; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.SnapshotManager; import javax.annotation.Nullable; @@ -42,14 +44,19 @@ public abstract class DataTableScan implements TableScan { private final FileStoreScan scan; private final TableSchema tableSchema; private final FileStorePathFactory pathFactory; + private final CoreOptions options; private boolean isIncremental = false; protected DataTableScan( - FileStoreScan scan, TableSchema tableSchema, FileStorePathFactory pathFactory) { + FileStoreScan scan, + TableSchema tableSchema, + FileStorePathFactory pathFactory, + CoreOptions options) { this.scan = scan; this.tableSchema = tableSchema; this.pathFactory = pathFactory; + this.options = options; } public DataTableScan withSnapshot(long snapshotId) { @@ -142,6 +149,14 @@ public abstract class DataTableScan implements TableScan { protected abstract void withNonPartitionFilter(Predicate predicate); + public CoreOptions options() { + return options; + } + + public SnapshotManager snapshotManager() { + return new SnapshotManager(pathFactory.root()); + } + /** Scanning plan containing snapshot ID and input splits. */ public static class DataFilePlan implements Plan { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java index edbaa1bc..067ef812 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java @@ -22,14 +22,18 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.utils.SnapshotManager; +import org.apache.flink.table.store.table.source.DataTableScan.DataFilePlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.Collections; import java.util.concurrent.Callable; +import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION; + /** Enumerator to enumerate incremental snapshots. */ public class SnapshotEnumerator implements Callable<SnapshotEnumerator.EnumeratorResult> { @@ -85,7 +89,7 @@ public class SnapshotEnumerator implements Callable<SnapshotEnumerator.Enumerato continue; } - DataTableScan.DataFilePlan plan = scan.withSnapshot(nextSnapshotId).plan(); + DataFilePlan plan = scan.withSnapshot(nextSnapshotId).plan(); EnumeratorResult result = new EnumeratorResult(nextSnapshotId, plan); LOG.debug("Find snapshot id {}.", nextSnapshotId); @@ -99,11 +103,48 @@ public class SnapshotEnumerator implements Callable<SnapshotEnumerator.Enumerato public final long snapshotId; - public final DataTableScan.DataFilePlan plan; + public final DataFilePlan plan; - private EnumeratorResult(long snapshotId, DataTableScan.DataFilePlan plan) { + private EnumeratorResult(long snapshotId, DataFilePlan plan) { this.snapshotId = snapshotId; this.plan = plan; } } + + /** Startup snapshot enumerator, this is the first plan for continuous reading. */ + public static DataFilePlan startup(DataTableScan scan) { + CoreOptions options = scan.options(); + SnapshotManager snapshotManager = scan.snapshotManager(); + CoreOptions.LogStartupMode startupMode = options.logStartupMode(); + switch (startupMode) { + case FULL: + DataFilePlan plan; + if (options.changelogProducer() == FULL_COMPACTION) { + // Read the results of the last full compaction. + // Only full compaction results will appear on the max level. + plan = scan.withLevel(options.numLevels() - 1).plan(); + } else { + plan = scan.plan(); + } + return plan; + case LATEST: + return new DataFilePlan( + snapshotManager.latestSnapshotId(), Collections.emptyList()); + case FROM_TIMESTAMP: + Long timestampMills = options.logScanTimestampMills(); + if (timestampMills == null) { + throw new IllegalArgumentException( + String.format( + "%s can not be null when you use %s for %s", + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), + CoreOptions.LogStartupMode.FROM_TIMESTAMP, + CoreOptions.LOG_SCAN.key())); + } + return new DataFilePlan( + snapshotManager.earlierThanTimeMills(timestampMills), + Collections.emptyList()); + default: + throw new UnsupportedOperationException("Unsupported startup mode: " + startupMode); + } + } }