This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new c233679e [FLINK-28387] Organizing Options in table store c233679e is described below commit c233679e5e0a3dea7d98c4d3487466d1139da6b8 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Jul 5 14:43:37 2022 +0800 [FLINK-28387] Organizing Options in table store This closes #197 --- docs/content/docs/development/create-table.md | 4 +- .../store/connector/TableStoreManagedFactory.java | 30 +-- .../store/connector/AbstractTableStoreFactory.java | 51 ++-- ...toryOptions.java => FlinkConnectorOptions.java} | 41 ++- .../store/connector/sink/FlinkSinkBuilder.java | 6 +- .../table/store/connector/sink/TableStoreSink.java | 30 +-- .../store/connector/source/FlinkSourceBuilder.java | 25 +- .../store/connector/source/TableStoreSource.java | 33 ++- .../ComputedColumnAndWatermarkTableITCase.java | 16 +- .../table/store/connector/CreateTableITCase.java | 7 +- .../table/store/connector/DropTableITCase.java | 4 +- .../store/connector/FileStoreTableITCase.java | 6 +- .../store/connector/FileSystemCatalogITCase.java | 8 +- .../store/connector/ReadWriteTableITCase.java | 6 +- .../store/connector/ReadWriteTableTestBase.java | 23 +- .../store/connector/StreamingWarehouseITCase.java | 2 +- .../connector/TableStoreManagedFactoryTest.java | 44 +--- .../table/store/connector/TableStoreTestBase.java | 14 +- .../store/connector/sink/LogStoreSinkITCase.java | 9 +- .../source/TestChangelogDataReadWrite.java | 2 +- .../apache/flink/table/store/CatalogOptions.java | 25 +- .../org/apache/flink/table/store/CoreOptions.java | 200 ++++++-------- .../table/store/file/AppendOnlyFileStore.java | 2 +- .../flink/table/store/file/FileStoreOptions.java | 289 --------------------- .../flink/table/store/file/KeyValueFileStore.java | 2 + .../table/store/file/catalog/CatalogFactory.java | 16 +- .../store/file/mergetree/MergeTreeOptions.java | 178 ------------- .../file/operation/KeyValueFileStoreWrite.java | 26 +- .../apache/flink/table/store/log/LogOptions.java | 194 -------------- .../table/store/log/LogStoreTableFactory.java | 24 +- .../table/store/table/sink/MemoryTableWrite.java | 2 +- .../table/store/file/mergetree/MergeTreeTest.java | 19 +- .../flink/table/store/tests/LogStoreE2eTest.java | 2 +- .../flink/table/store/hive/HiveCatalogFactory.java | 10 +- .../flink/table/store/kafka/KafkaLogOptions.java | 2 +- .../table/store/kafka/KafkaLogStoreFactory.java | 21 +- .../flink/table/store/kafka/KafkaLogTestUtils.java | 4 +- 37 files changed, 310 insertions(+), 1067 deletions(-) diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md index cba5a9da..cec6c481 100644 --- a/docs/content/docs/development/create-table.md +++ b/docs/content/docs/development/create-table.md @@ -115,14 +115,14 @@ Important options include the following: <td>The log system used to keep changes of the table, supports 'kafka'.</td> </tr> <tr> - <td><h5>log.kafka.bootstrap.servers</h5></td> + <td><h5>kafka.bootstrap.servers</h5></td> <td>No</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Required Kafka server connection string for log store.</td> </tr> <tr> - <td><h5>log.topic</h5></td> + <td><h5>kafka.topic</h5></td> <td>No</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java index 431e5c9b..37dfda61 100644 --- a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java +++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java @@ -18,11 +18,9 @@ package org.apache.flink.table.store.connector; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.factories.ManagedTableFactory; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.schema.SchemaManager; @@ -31,6 +29,7 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil; import org.apache.flink.table.store.log.LogStoreTableFactory; import org.apache.flink.util.Preconditions; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.HashMap; @@ -39,13 +38,13 @@ import java.util.Objects; import java.util.Optional; import static org.apache.flink.table.store.CoreOptions.BUCKET; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; import static org.apache.flink.table.store.CoreOptions.PATH; -import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX; import static org.apache.flink.table.store.CoreOptions.WRITE_MODE; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY; /** Default implementation of {@link ManagedTableFactory}. */ @@ -85,21 +84,12 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory createOptionalLogStoreFactory(context.getClassLoader(), enrichedOptions); logFactory.ifPresent( factory -> - factory.enrichOptions(createLogContext(context, enrichedOptions)) - .forEach((k, v) -> enrichedOptions.putIfAbsent(LOG_PREFIX + k, v))); + factory.enrichOptions(new TableStoreDynamicContext(context, enrichedOptions)) + .forEach(enrichedOptions::putIfAbsent)); return enrichedOptions; } - @VisibleForTesting - static String relativeTablePath(ObjectIdentifier tableIdentifier) { - return String.format( - "%s.catalog/%s.db/%s", - tableIdentifier.getCatalogName(), - tableIdentifier.getDatabaseName(), - tableIdentifier.getObjectName()); - } - @Override public void onCreateTable(Context context, boolean ignoreIfExists) { Map<String, String> options = context.getCatalogTable().getOptions(); @@ -151,7 +141,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory .ifPresent( factory -> factory.onCreateTable( - createLogContext(context), + context, Integer.parseInt( options.getOrDefault( BUCKET.key(), @@ -179,7 +169,7 @@ public class TableStoreManagedFactory extends AbstractTableStoreFactory createOptionalLogStoreFactory(context) .ifPresent( factory -> - factory.onDropTable(createLogContext(context), ignoreIfNotExists)); + factory.onDropTable(context, ignoreIfNotExists)); } @Override diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java index 54de82ff..4f729bf6 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java @@ -21,13 +21,15 @@ package org.apache.flink.table.store.connector; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogChangelogMode; +import org.apache.flink.table.store.CoreOptions.LogConsistency; +import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.apache.flink.table.store.connector.sink.TableStoreSink; import org.apache.flink.table.store.connector.source.TableStoreSource; import org.apache.flink.table.store.file.schema.TableSchema; @@ -39,17 +41,16 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM; import static org.apache.flink.table.store.log.LogStoreTableFactory.discoverLogStoreFactory; /** Abstract table store factory to create table source and table sink. */ @@ -63,7 +64,7 @@ public abstract class AbstractTableStoreFactory buildFileStoreTable(context), context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, - createLogContext(context), + context, createOptionalLogStoreFactory(context).orElse(null)); } @@ -72,7 +73,7 @@ public abstract class AbstractTableStoreFactory return new TableStoreSink( context.getObjectIdentifier(), buildFileStoreTable(context), - createLogContext(context), + context, createOptionalLogStoreFactory(context).orElse(null)); } @@ -83,9 +84,9 @@ public abstract class AbstractTableStoreFactory @Override public Set<ConfigOption<?>> optionalOptions() { - Set<ConfigOption<?>> options = CoreOptions.allOptions(); - options.addAll(CoreOptions.allOptions()); - options.addAll(TableStoreFactoryOptions.allOptions()); + Set<ConfigOption<?>> options = new HashSet<>(); + options.addAll(FlinkConnectorOptions.getOptions()); + options.addAll(CoreOptions.getOptions()); return options; } @@ -112,44 +113,24 @@ public abstract class AbstractTableStoreFactory } private static void validateFileStoreContinuous(Configuration options) { - Configuration logOptions = new DelegatingConfiguration(options, LOG_PREFIX); - CoreOptions.LogChangelogMode changelogMode = logOptions.get(LOG_CHANGELOG_MODE); - if (changelogMode == CoreOptions.LogChangelogMode.UPSERT) { + LogChangelogMode changelogMode = options.get(LOG_CHANGELOG_MODE); + if (changelogMode == LogChangelogMode.UPSERT) { throw new ValidationException( "File store continuous reading dose not support upsert changelog mode."); } - CoreOptions.LogConsistency consistency = logOptions.get(LOG_CONSISTENCY); - if (consistency == CoreOptions.LogConsistency.EVENTUAL) { + LogConsistency consistency = options.get(LOG_CONSISTENCY); + if (consistency == LogConsistency.EVENTUAL) { throw new ValidationException( "File store continuous reading dose not support eventual consistency mode."); } - CoreOptions.LogStartupMode startupMode = logOptions.get(LOG_SCAN); - if (startupMode == CoreOptions.LogStartupMode.FROM_TIMESTAMP) { + LogStartupMode startupMode = options.get(LOG_SCAN); + if (startupMode == LogStartupMode.FROM_TIMESTAMP) { throw new ValidationException( "File store continuous reading dose not support from_timestamp scan mode, " + "you can add timestamp filters instead."); } } - static DynamicTableFactory.Context createLogContext(DynamicTableFactory.Context context) { - return createLogContext(context, context.getCatalogTable().getOptions()); - } - - static DynamicTableFactory.Context createLogContext( - DynamicTableFactory.Context context, Map<String, String> options) { - return new TableStoreDynamicContext(context, filterLogStoreOptions(options)); - } - - static Map<String, String> filterLogStoreOptions(Map<String, String> options) { - return options.entrySet().stream() - .filter(entry -> !entry.getKey().equals(LOG_SYSTEM.key())) // exclude log.system - .filter(entry -> entry.getKey().startsWith(LOG_PREFIX)) - .collect( - Collectors.toMap( - entry -> entry.getKey().substring(LOG_PREFIX.length()), - Map.Entry::getValue)); - } - static FileStoreTable buildFileStoreTable(DynamicTableFactory.Context context) { FileStoreTable table = FileStoreTableFactory.create( diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java similarity index 71% rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java index 38d6a209..e74a8e5d 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java @@ -21,14 +21,19 @@ package org.apache.flink.table.store.connector; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.factories.FactoryUtil; -import java.util.HashSet; -import java.util.Set; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; -/** Options for {@link TableStoreManagedFactory}. */ -public class TableStoreFactoryOptions { +/** Options for flink connector. */ +public class FlinkConnectorOptions { + public static final String TABLE_STORE_PREFIX = "table-store."; + + @Internal public static final ConfigOption<String> ROOT_PATH = ConfigOptions.key("root-path") .stringType() @@ -68,11 +73,27 @@ public class TableStoreFactoryOptions { + "By default, if this option is not defined, the planner will derive the parallelism " + "for each statement individually by also considering the global configuration."); - public static Set<ConfigOption<?>> allOptions() { - Set<ConfigOption<?>> allOptions = new HashSet<>(); - allOptions.add(LOG_SYSTEM); - allOptions.add(SINK_PARALLELISM); - allOptions.add(SCAN_PARALLELISM); - return allOptions; + public static String relativeTablePath(ObjectIdentifier tableIdentifier) { + return String.format( + "%s.catalog/%s.db/%s", + tableIdentifier.getCatalogName(), + tableIdentifier.getDatabaseName(), + tableIdentifier.getObjectName()); + } + + @Internal + public static List<ConfigOption<?>> getOptions() { + final Field[] fields = FlinkConnectorOptions.class.getFields(); + final List<ConfigOption<?>> list = new ArrayList<>(fields.length); + for (Field field : fields) { + if (ConfigOption.class.isAssignableFrom(field.getType())) { + try { + list.add((ConfigOption<?>) field.get(FlinkConnectorOptions.class)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + return list; } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java index d0d9cccb..fa3cfeb4 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java @@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.CoreOptions; -import org.apache.flink.table.store.connector.TableStoreFactoryOptions; +import org.apache.flink.table.store.connector.FlinkConnectorOptions; import org.apache.flink.table.store.file.utils.JsonSerdeUtil; import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.table.sink.LogSinkFunction; @@ -81,7 +81,7 @@ public class FlinkSinkBuilder { @SuppressWarnings("unchecked") @Nullable private Map<String, String> getCompactPartSpec() { - String json = conf.get(TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC); + String json = conf.get(FlinkConnectorOptions.COMPACTION_PARTITION_SPEC); if (json == null) { return null; } @@ -103,7 +103,7 @@ public class FlinkSinkBuilder { new StoreSink( tableIdentifier, table, - conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED), + conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED), getCompactPartSpec(), lockFactory, overwritePartition, diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java index 0f4e5dac..f2fcb843 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java @@ -19,7 +19,6 @@ package org.apache.flink.table.store.connector.sink; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -28,9 +27,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.factories.DynamicTableFactory; -import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogChangelogMode; +import org.apache.flink.table.store.connector.FlinkConnectorOptions; import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider; -import org.apache.flink.table.store.connector.TableStoreFactoryOptions; import org.apache.flink.table.store.log.LogSinkProvider; import org.apache.flink.table.store.log.LogStoreTableFactory; import org.apache.flink.table.store.table.AppendOnlyFileStoreTable; @@ -45,12 +44,14 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; + /** Table sink to create {@link StoreSink}. */ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning { private final ObjectIdentifier tableIdentifier; private final FileStoreTable table; - private final DynamicTableFactory.Context logStoreContext; + private final DynamicTableFactory.Context context; @Nullable private final LogStoreTableFactory logStoreTableFactory; private Map<String, String> staticPartitions = new HashMap<>(); @@ -60,11 +61,11 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp public TableStoreSink( ObjectIdentifier tableIdentifier, FileStoreTable table, - DynamicTableFactory.Context logStoreContext, + DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) { this.tableIdentifier = tableIdentifier; this.table = table; - this.logStoreContext = logStoreContext; + this.context = context; this.logStoreTableFactory = logStoreTableFactory; } @@ -76,12 +77,8 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp // no primary key, sink all changelogs return requestedMode; } else if (table instanceof ChangelogWithKeyFileStoreTable) { - Configuration logOptions = - new DelegatingConfiguration( - Configuration.fromMap(table.schema().options()), - CoreOptions.LOG_PREFIX); - if (logOptions.get(CoreOptions.LOG_CHANGELOG_MODE) - != CoreOptions.LogChangelogMode.ALL) { + Configuration options = Configuration.fromMap(table.schema().options()); + if (options.get(LOG_CHANGELOG_MODE) != LogChangelogMode.ALL) { // with primary key, default sink upsert ChangelogMode.Builder builder = ChangelogMode.newBuilder(); for (RowKind kind : requestedMode.getContainedKinds()) { @@ -110,13 +107,13 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { LogSinkProvider logSinkProvider = null; if (logStoreTableFactory != null) { - logSinkProvider = logStoreTableFactory.createSinkProvider(logStoreContext, context); + logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context); } Configuration conf = Configuration.fromMap(table.schema().options()); // Do not sink to log store when overwrite mode final LogSinkFunction logSinkFunction = - overwrite || conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED) + overwrite || conf.get(FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED) ? null : (logSinkProvider == null ? null : logSinkProvider.createSink()); return new TableStoreDataStreamSinkProvider( @@ -129,15 +126,14 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp .withLockFactory(lockFactory) .withLogSinkFunction(logSinkFunction) .withOverwritePartition(overwrite ? staticPartitions : null) - .withParallelism( - conf.get(TableStoreFactoryOptions.SINK_PARALLELISM)) + .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) .build()); } @Override public DynamicTableSink copy() { TableStoreSink copied = - new TableStoreSink(tableIdentifier, table, logStoreContext, logStoreTableFactory); + new TableStoreSink(tableIdentifier, table, context, logStoreTableFactory); copied.staticPartitions = new HashMap<>(staticPartitions); copied.overwrite = overwrite; copied.lockFactory = lockFactory; diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java index 396f6d94..2bdaf7c3 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.connector.base.source.hybrid.HybridSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,8 +29,8 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.store.CoreOptions; -import org.apache.flink.table.store.connector.TableStoreFactoryOptions; +import org.apache.flink.table.store.CoreOptions.LogStartupMode; +import org.apache.flink.table.store.CoreOptions.MergeEngine; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.log.LogSourceProvider; import org.apache.flink.table.store.table.FileStoreTable; @@ -43,6 +42,11 @@ import javax.annotation.Nullable; import java.util.Optional; +import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; +import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED; + /** Source builder to build a Flink {@link Source}. */ public class FlinkSourceBuilder { @@ -94,7 +98,7 @@ public class FlinkSourceBuilder { } private long discoveryIntervalMills() { - return conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(); + return conf.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis(); } private FileStoreSource buildFileSource(boolean isContinuous, boolean continuousScanLatest) { @@ -111,19 +115,16 @@ public class FlinkSourceBuilder { if (isContinuous) { // TODO move validation to a dedicated method if (table.schema().primaryKeys().size() > 0 - && conf.get(CoreOptions.MERGE_ENGINE) - == CoreOptions.MergeEngine.PARTIAL_UPDATE) { + && conf.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE) { throw new ValidationException( "Partial update continuous reading is not supported."); } - CoreOptions.LogStartupMode startupMode = - new DelegatingConfiguration(conf, CoreOptions.LOG_PREFIX) - .get(CoreOptions.LOG_SCAN); + LogStartupMode startupMode = conf.get(LOG_SCAN); if (logSourceProvider == null) { - return buildFileSource(true, startupMode == CoreOptions.LogStartupMode.LATEST); + return buildFileSource(true, startupMode == LogStartupMode.LATEST); } else { - if (startupMode != CoreOptions.LogStartupMode.FULL) { + if (startupMode != LogStartupMode.FULL) { return logSourceProvider.createSource(null); } return HybridSource.<RowData, StaticFileStoreSplitEnumerator>builder( @@ -151,7 +152,7 @@ public class FlinkSourceBuilder { .orElse(rowType); DataStreamSource<RowData> dataStream = env.fromSource( - conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED) + conf.get(COMPACTION_MANUAL_TRIGGERED) ? new FileStoreEmptySource() : buildSource(), WatermarkStrategy.noWatermarks(), diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java index 880c6a9c..c5545658 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java @@ -19,7 +19,6 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -28,9 +27,10 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.factories.DynamicTableFactory; -import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogChangelogMode; +import org.apache.flink.table.store.CoreOptions.LogConsistency; +import org.apache.flink.table.store.connector.FlinkConnectorOptions; import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider; -import org.apache.flink.table.store.connector.TableStoreFactoryOptions; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.predicate.PredicateBuilder; import org.apache.flink.table.store.file.predicate.PredicateConverter; @@ -47,6 +47,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; +import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; + /** * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled. * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link @@ -59,7 +62,7 @@ public class TableStoreSource private final ObjectIdentifier tableIdentifier; private final FileStoreTable table; private final boolean streaming; - private final DynamicTableFactory.Context logStoreContext; + private final DynamicTableFactory.Context context; @Nullable private final LogStoreTableFactory logStoreTableFactory; @Nullable private Predicate predicate; @@ -69,12 +72,12 @@ public class TableStoreSource ObjectIdentifier tableIdentifier, FileStoreTable table, boolean streaming, - DynamicTableFactory.Context logStoreContext, + DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) { this.tableIdentifier = tableIdentifier; this.table = table; this.streaming = streaming; - this.logStoreContext = logStoreContext; + this.context = context; this.logStoreTableFactory = logStoreTableFactory; } @@ -92,14 +95,9 @@ public class TableStoreSource } else if (table instanceof ChangelogWithKeyFileStoreTable) { // optimization: transaction consistency and all changelog mode avoid the generation of // normalized nodes. See TableStoreSink.getChangelogMode validation. - Configuration logOptions = - new DelegatingConfiguration( - Configuration.fromMap(table.schema().options()), - CoreOptions.LOG_PREFIX); - return logOptions.get(CoreOptions.LOG_CONSISTENCY) - == CoreOptions.LogConsistency.TRANSACTIONAL - && logOptions.get(CoreOptions.LOG_CHANGELOG_MODE) - == CoreOptions.LogChangelogMode.ALL + Configuration options = Configuration.fromMap(table.schema().options()); + return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL + && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL ? ChangelogMode.all() : ChangelogMode.upsert(); } else { @@ -113,8 +111,7 @@ public class TableStoreSource LogSourceProvider logSourceProvider = null; if (logStoreTableFactory != null) { logSourceProvider = - logStoreTableFactory.createSourceProvider( - logStoreContext, scanContext, projectFields); + logStoreTableFactory.createSourceProvider(context, scanContext, projectFields); } FlinkSourceBuilder sourceBuilder = @@ -125,7 +122,7 @@ public class TableStoreSource .withPredicate(predicate) .withParallelism( Configuration.fromMap(table.schema().options()) - .get(TableStoreFactoryOptions.SCAN_PARALLELISM)); + .get(FlinkConnectorOptions.SCAN_PARALLELISM)); return new TableStoreDataStreamScanProvider( !streaming, env -> sourceBuilder.withEnv(env).build()); @@ -135,7 +132,7 @@ public class TableStoreSource public DynamicTableSource copy() { TableStoreSource copied = new TableStoreSource( - tableIdentifier, table, streaming, logStoreContext, logStoreTableFactory); + tableIdentifier, table, streaming, context, logStoreTableFactory); copied.predicate = predicate; copied.projectFields = projectFields; return copied; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java index be277d35..febc784d 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ComputedColumnAndWatermarkTableITCase.java @@ -19,7 +19,7 @@ package org.apache.flink.table.store.connector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.junit.Test; @@ -29,7 +29,7 @@ import java.util.Collections; import java.util.stream.Collectors; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; +import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.ratesWithTimestamp; @@ -146,8 +146,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas null, false, Collections.singletonMap( - LOG_PREFIX + CoreOptions.LOG_SCAN.key(), - CoreOptions.LogStartupMode.LATEST.name().toLowerCase()), + LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()), "rate_by_to_currency IS NULL", Arrays.asList( "corrected_rate_by_to_currency", @@ -191,8 +190,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"), false, Collections.singletonMap( - LOG_PREFIX + CoreOptions.LOG_SCAN.key(), - CoreOptions.LogStartupMode.LATEST.name().toLowerCase()), + LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()), lateEventFilter, Collections.emptyList(), // projection Collections.singletonList( @@ -215,8 +213,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas WatermarkSpec.of("ts1", "ts1 - INTERVAL '3' YEAR"), false, Collections.singletonMap( - LOG_PREFIX + CoreOptions.LOG_SCAN.key(), - CoreOptions.LogStartupMode.LATEST.name().toLowerCase()), + LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()), lateEventFilter.replaceAll("ts", "ts1"), Arrays.asList("currency", "rate", "ts1"), Collections.singletonList( @@ -240,8 +237,7 @@ public class ComputedColumnAndWatermarkTableITCase extends ReadWriteTableTestBas WatermarkSpec.of("ts", "ts - INTERVAL '3' YEAR"), false, Collections.singletonMap( - LOG_PREFIX + CoreOptions.LOG_SCAN.key(), - CoreOptions.LogStartupMode.LATEST.name().toLowerCase()), + LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()), lateEventFilter, Arrays.asList( "currency", diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java index 9e3be19f..68703685 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CreateTableITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -42,6 +41,7 @@ import java.util.List; import java.util.UUID; import static org.apache.flink.table.store.CoreOptions.BUCKET; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -79,8 +79,7 @@ public class CreateTableITCase extends TableStoreTestBase { assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier)) .isPresent(); // check table store - assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile()) - .exists(); + assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile()).exists(); // check log store assertThat(topicExists(tableIdentifier.asSummaryString())).isEqualTo(enableLogStore); } else { @@ -129,7 +128,7 @@ public class CreateTableITCase extends TableStoreTestBase { } } else if (expectedResult.expectedMessage.startsWith("Failed to create file store path.")) { // failed when creating file store - Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile().mkdirs(); + Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile().mkdirs(); } else if (expectedResult.expectedMessage.startsWith("Failed to create kafka topic.")) { // failed when creating log store createTopicIfNotExists(tableIdentifier.asSummaryString(), BUCKET.defaultValue()); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java index e7278976..177d16c4 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/DropTableITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; @@ -40,6 +39,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -80,7 +80,7 @@ public class DropTableITCase extends TableStoreTestBase { assertThat(((TableEnvironmentImpl) tEnv).getCatalogManager().getTable(tableIdentifier)) .isNotPresent(); // check table store - assertThat(Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile()) + assertThat(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile()) .doesNotExist(); // check log store assertThat(topicExists(tableIdentifier.asSummaryString())).isFalse(); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java index 4e313402..754d1354 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java @@ -51,9 +51,9 @@ import java.time.Duration; import java.util.List; import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; -import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX; -import static org.apache.flink.table.store.CoreOptions.relativeTablePath; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.junit.jupiter.api.Assertions.fail; /** ITCase for file store table api. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java index 01e84211..69ee10c2 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java @@ -60,8 +60,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase { String.format( "CREATE TABLE T (a STRING, b STRING, c STRING) WITH (" + "'log.system'='kafka', " - + "'log.kafka.bootstrap.servers'='%s'," - + "'log.topic'='%s'" + + "'kafka.bootstrap.servers'='%s'," + + "'kafka.topic'='%s'" + ")", getBootstrapServers(), topic)); innerTestWriteRead(); @@ -85,8 +85,8 @@ public class FileSystemCatalogITCase extends KafkaTableTestBase { + "d AS CAST(c as INT) + 1" + ") WITH (" + "'log.system'='kafka', " - + "'log.kafka.bootstrap.servers'='%s'," - + "'log.topic'='%s'" + + "'kafka.bootstrap.servers'='%s'," + + "'kafka.topic'='%s'" + ")", getBootstrapServers(), topic)); BlockingIterator<Row, Row> iterator = diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java index 10adddfb..0f4d5ea5 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java @@ -58,14 +58,14 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SCAN_PARALLELISM; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.SINK_PARALLELISM; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRates; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithUB; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.dailyRatesChangelogWithoutUB; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.rates; import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM; import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java index 33d28349..3c9fcb76 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableTestBase.java @@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogTable; -import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogStartupMode; import org.apache.flink.table.store.file.utils.BlockingIterator; import org.apache.flink.table.store.kafka.KafkaTableTestBase; import org.apache.flink.types.Row; @@ -49,7 +49,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; +import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; +import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithChangelogRecords; import static org.apache.flink.table.store.connector.ReadWriteTableTestUtil.prepareHelperSourceWithInsertOnlyRecords; import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertIntoQuery; @@ -57,8 +61,6 @@ import static org.apache.flink.table.store.connector.ShowCreateUtil.buildInsertO import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSelectQuery; import static org.apache.flink.table.store.connector.ShowCreateUtil.buildSimpleSelectQuery; import static org.apache.flink.table.store.connector.ShowCreateUtil.createTableLikeDDL; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; import static org.assertj.core.api.Assertions.assertThat; @@ -71,7 +73,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase { protected void checkFileStorePath( StreamTableEnvironment tEnv, String managedTable, @Nullable String partitionList) { String relativeFilePath = - CoreOptions.relativeTablePath( + relativeTablePath( ObjectIdentifier.of( tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable)); // check snapshot file path @@ -230,9 +232,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase { List<Row> expected) throws Exception { Map<String, String> hints = new HashMap<>(); - hints.put( - LOG_PREFIX + CoreOptions.LOG_SCAN.key(), - CoreOptions.LogStartupMode.LATEST.name().toLowerCase()); + hints.put(LOG_SCAN.key(), LogStartupMode.LATEST.name().toLowerCase()); collectAndCheckUnderSameEnv( true, true, @@ -258,9 +258,8 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase { List<Row> expected) throws Exception { Map<String, String> hints = new HashMap<>(); - hints.put(LOG_PREFIX + CoreOptions.LOG_SCAN.key(), "from-timestamp"); - hints.put( - LOG_PREFIX + CoreOptions.LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp)); + hints.put(LOG_SCAN.key(), "from-timestamp"); + hints.put(LOG_SCAN_TIMESTAMP_MILLS.key(), String.valueOf(timestamp)); collectAndCheckUnderSameEnv( true, true, @@ -292,7 +291,7 @@ public class ReadWriteTableTestBase extends KafkaTableTestBase { tableOptions.put(ROOT_PATH.key(), rootPath); if (enableLogStore) { tableOptions.put(LOG_SYSTEM.key(), "kafka"); - tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers()); + tableOptions.put(BOOTSTRAP_SERVERS.key(), getBootstrapServers()); } String sourceTable = "source_table_" + UUID.randomUUID(); String managedTable = "managed_table_" + UUID.randomUUID(); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java index 598fb75f..15424e02 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/StreamingWarehouseITCase.java @@ -90,7 +90,7 @@ public class StreamingWarehouseITCase extends ReadWriteTableTestBase { + "WITH (\n" + " 'path' = '%s',\n" + " 'log.system' = 'kafka', " - + " 'log.kafka.bootstrap.servers' = '%s');", + + " 'kafka.bootstrap.servers' = '%s');", rootPath, getBootstrapServers()); streamTableEnv.executeSql(orderSource); streamTableEnv.executeSql(cleanedOrders); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java index 31832f59..843b99f5 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java @@ -57,13 +57,13 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.flink.table.store.CoreOptions.BUCKET; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; import static org.apache.flink.table.store.CoreOptions.PATH; -import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX; import static org.apache.flink.table.store.CoreOptions.path; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.COMPACTION_PARTITION_SPEC; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_PARTITION_SPEC; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.apache.flink.table.store.connector.TableStoreTestBase.createResolvedTable; import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PART_TYPE; import static org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE; @@ -122,20 +122,20 @@ public class TableStoreManagedFactoryTest { Map<String, String> sessionMap = new HashMap<>(); sessionMap.put("table-store.root-path", "my_path"); sessionMap.put("table-store.log.system", "kafka"); - sessionMap.put("table-store.log.topic", "my_topic"); + sessionMap.put("table-store.kafka.topic", "my_topic"); context = createNonEnrichedContext(sessionMap, emptyMap()); assertThatThrownBy(() -> tableStoreManagedFactory.enrichOptions(context)) .hasMessage( "Managed table can not contain custom topic. You need to remove topic in table options or session config."); - sessionMap.remove("table-store.log.topic"); + sessionMap.remove("table-store.kafka.topic"); context = createNonEnrichedContext(sessionMap, emptyMap()); Map<String, String> enriched = tableStoreManagedFactory.enrichOptions(context); Map<String, String> expected = new HashMap<>(); expected.put("path", "my_path/catalog.catalog/database.db/table"); expected.put("log.system", "kafka"); - expected.put("log.topic", "catalog.database.table"); + expected.put("kafka.topic", "catalog.database.table"); assertThat(enriched).containsExactlyEntriesOf(expected); } @@ -188,27 +188,6 @@ public class TableStoreManagedFactoryTest { } } - @Test - public void testFilterLogStoreOptions() { - // mix invalid key and leave value to empty to emphasize the deferred validation - Map<String, String> expectedLogOptions = - of( - CoreOptions.LOG_SCAN.key(), - "", - CoreOptions.LOG_RETENTION.key(), - "", - "dummy.key", - "", - CoreOptions.LOG_CHANGELOG_MODE.key(), - ""); - Map<String, String> enrichedOptions = - addPrefix(expectedLogOptions, LOG_PREFIX, (key) -> true); - enrichedOptions.put("foo", "bar"); - - assertThat(TableStoreManagedFactory.filterLogStoreOptions(enrichedOptions)) - .containsExactlyInAnyOrderEntriesOf(expectedLogOptions); - } - @ParameterizedTest @MethodSource("provideResolvedTable") public void testCreateAndCheckTableStore( @@ -322,9 +301,9 @@ public class TableStoreManagedFactoryTest { BUCKET.defaultValue().toString(), ROOT_PATH.key(), sharedTempDir.toString(), - LOG_PREFIX + BOOTSTRAP_SERVERS.key(), + BOOTSTRAP_SERVERS.key(), "localhost:9092", - LOG_PREFIX + LOG_CONSISTENCY.key(), + LOG_CONSISTENCY.key(), LOG_CONSISTENCY.defaultValue().name()); // set configuration under session level @@ -365,8 +344,7 @@ public class TableStoreManagedFactoryTest { Map<String, String> expected = new HashMap<>(enrichedOptions); String rootPath = expected.remove(ROOT_PATH.key()); if (rootPath != null) { - String path = - rootPath + "/" + TableStoreManagedFactory.relativeTablePath(TABLE_IDENTIFIER); + String path = rootPath + "/" + relativeTablePath(TABLE_IDENTIFIER); expected.put(PATH.key(), path); } return expected; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java index 1f321af9..b3284952 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java @@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.kafka.KafkaTableTestBase; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -50,10 +49,10 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.table.store.CoreOptions.LOG_PREFIX; -import static org.apache.flink.table.store.CoreOptions.TABLE_STORE_PREFIX; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM; -import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.LOG_SYSTEM; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.ROOT_PATH; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.TABLE_STORE_PREFIX; +import static org.apache.flink.table.store.connector.FlinkConnectorOptions.relativeTablePath; import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; /** End-to-end test base for table store. */ @@ -111,7 +110,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase { Configuration configuration = tEnv.getConfig().getConfiguration(); configuration.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), rootPath); configuration.setString( - TABLE_STORE_PREFIX + LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers()); + TABLE_STORE_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers()); if (enableLogStore) { configuration.setString(TABLE_STORE_PREFIX + LOG_SYSTEM.key(), "kafka"); } @@ -153,8 +152,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase { } protected void deleteTablePath() { - FileUtils.deleteQuietly( - Paths.get(rootPath, CoreOptions.relativeTablePath(tableIdentifier)).toFile()); + FileUtils.deleteQuietly(Paths.get(rootPath, relativeTablePath(tableIdentifier)).toFile()); } /** Expected result wrapper. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java index 59465a73..1f4cb4e5 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java @@ -21,7 +21,8 @@ package org.apache.flink.table.store.connector.sink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; -import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.CoreOptions.LogChangelogMode; +import org.apache.flink.table.store.CoreOptions.LogConsistency; import org.apache.flink.table.store.connector.source.FlinkSourceBuilder; import org.apache.flink.table.store.file.utils.BlockingIterator; import org.apache.flink.table.store.kafka.KafkaLogSinkProvider; @@ -102,10 +103,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase { KafkaLogTestUtils.testContext( name, getBootstrapServers(), - CoreOptions.LogChangelogMode.AUTO, - transaction - ? CoreOptions.LogConsistency.TRANSACTIONAL - : CoreOptions.LogConsistency.EVENTUAL, + LogChangelogMode.AUTO, + transaction ? LogConsistency.TRANSACTIONAL : LogConsistency.EVENTUAL, TABLE_TYPE, hasPk ? new int[] {2} : new int[0]); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java index 4a4be635..16134667 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java @@ -156,7 +156,7 @@ public class TestChangelogDataReadWrite { .createEmptyWriter(partition, bucket, service); ((MemoryOwner) writer) .setMemoryPool( - new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize)); + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return writer; } } diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java similarity index 58% copy from flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java index 8cdb6d0c..925a0d13 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CatalogOptions.java @@ -16,23 +16,30 @@ * limitations under the License. */ -package org.apache.flink.table.store.kafka; +package org.apache.flink.table.store; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory; -/** Options for kafka log. */ -public class KafkaLogOptions { +/** Catalog options for table store. */ +public class CatalogOptions { + private CatalogOptions() {} - public static final ConfigOption<String> BOOTSTRAP_SERVERS = - ConfigOptions.key("kafka.bootstrap.servers") + public static final ConfigOption<String> WAREHOUSE = + ConfigOptions.key("warehouse") .stringType() .noDefaultValue() - .withDescription("Required Kafka server connection string"); + .withDescription("The warehouse root path of catalog."); - public static final ConfigOption<String> TOPIC = - ConfigOptions.key("topic") + public static final ConfigOption<String> METASTORE = + ConfigOptions.key("metastore") + .stringType() + .defaultValue(FileSystemCatalogFactory.IDENTIFIER); + + public static final ConfigOption<String> URI = + ConfigOptions.key("uri") .stringType() .noDefaultValue() - .withDescription("Topic of this kafka table."); + .withDescription("Uri of metastore server."); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index 1df27218..07557caa 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store; +import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -26,16 +27,16 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.core.fs.Path; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.store.file.WriteMode; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.lang.reflect.Field; import java.time.Duration; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.Set; import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.configuration.description.TextElement.text; @@ -43,8 +44,6 @@ import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption; /** Core options for table store. */ public class CoreOptions implements Serializable { - public static final String LOG_PREFIX = "log."; - public static final String TABLE_STORE_PREFIX = "table-store."; public static final ConfigOption<Integer> BUCKET = ConfigOptions.key("bucket") @@ -52,6 +51,7 @@ public class CoreOptions implements Serializable { .defaultValue(1) .withDescription("Bucket number for file store."); + @Internal public static final ConfigOption<String> PATH = ConfigOptions.key("path") .stringType() @@ -116,18 +116,17 @@ public class CoreOptions implements Serializable { .defaultValue(Duration.ofSeconds(1)) .withDescription("The discovery interval of continuous reading."); - public static final ConfigOption<CoreOptions.MergeEngine> MERGE_ENGINE = + public static final ConfigOption<MergeEngine> MERGE_ENGINE = ConfigOptions.key("merge-engine") - .enumType(CoreOptions.MergeEngine.class) - .defaultValue(CoreOptions.MergeEngine.DEDUPLICATE) + .enumType(MergeEngine.class) + .defaultValue(MergeEngine.DEDUPLICATE) .withDescription( Description.builder() .text("Specify the merge engine for table with primary key.") .linebreak() .list( - formatEnumOption(CoreOptions.MergeEngine.DEDUPLICATE), - formatEnumOption( - CoreOptions.MergeEngine.PARTIAL_UPDATE)) + formatEnumOption(MergeEngine.DEDUPLICATE), + formatEnumOption(MergeEngine.PARTIAL_UPDATE)) .build()); public static final ConfigOption<WriteMode> WRITE_MODE = @@ -230,7 +229,7 @@ public class CoreOptions implements Serializable { + "it can be read directly during stream reads."); public static final ConfigOption<LogStartupMode> LOG_SCAN = - ConfigOptions.key("scan") + ConfigOptions.key("log.scan") .enumType(LogStartupMode.class) .defaultValue(LogStartupMode.FULL) .withDescription( @@ -243,21 +242,21 @@ public class CoreOptions implements Serializable { .build()); public static final ConfigOption<Long> LOG_SCAN_TIMESTAMP_MILLS = - ConfigOptions.key("scan.timestamp-millis") + ConfigOptions.key("log.scan.timestamp-millis") .longType() .noDefaultValue() .withDescription( "Optional timestamp used in case of \"from-timestamp\" scan mode"); public static final ConfigOption<Duration> LOG_RETENTION = - ConfigOptions.key("retention") + ConfigOptions.key("log.retention") .durationType() .noDefaultValue() .withDescription( "It means how long changes log will be kept. The default value is from the log system cluster."); public static final ConfigOption<LogConsistency> LOG_CONSISTENCY = - ConfigOptions.key("consistency") + ConfigOptions.key("log.consistency") .enumType(LogConsistency.class) .defaultValue(LogConsistency.TRANSACTIONAL) .withDescription( @@ -270,7 +269,7 @@ public class CoreOptions implements Serializable { .build()); public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE = - ConfigOptions.key("changelog-mode") + ConfigOptions.key("log.changelog-mode") .enumType(LogChangelogMode.class) .defaultValue(LogChangelogMode.AUTO) .withDescription( @@ -284,79 +283,27 @@ public class CoreOptions implements Serializable { .build()); public static final ConfigOption<String> LOG_KEY_FORMAT = - ConfigOptions.key("key.format") + ConfigOptions.key("log.key.format") .stringType() .defaultValue("json") .withDescription( "Specify the key message format of log system with primary key."); public static final ConfigOption<String> LOG_FORMAT = - ConfigOptions.key("format") + ConfigOptions.key("log.format") .stringType() .defaultValue("debezium-json") .withDescription("Specify the message format of log system."); - public long writeBufferSize; + private final Configuration options; - public int pageSize; - - public long targetFileSize; - - public int numSortedRunCompactionTrigger; - - public int numSortedRunStopTrigger; - - public int numLevels; - - public boolean commitForceCompact; - - public int maxSizeAmplificationPercent; - - public int sizeRatio; - - public boolean enableChangelogFile; - - private Configuration options; - - public CoreOptions( - long writeBufferSize, - int pageSize, - long targetFileSize, - int numSortedRunCompactionTrigger, - int numSortedRunStopTrigger, - Integer numLevels, - boolean commitForceCompact, - int maxSizeAmplificationPercent, - int sizeRatio, - boolean enableChangelogFile) { - this.writeBufferSize = writeBufferSize; - this.pageSize = pageSize; - this.targetFileSize = targetFileSize; - this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger; - this.numSortedRunStopTrigger = - Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger); - // By default, this ensures that the compaction does not fall to level 0, but at least to - // level 1 - this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels; - this.commitForceCompact = commitForceCompact; - this.maxSizeAmplificationPercent = maxSizeAmplificationPercent; - this.sizeRatio = sizeRatio; - this.enableChangelogFile = enableChangelogFile; + public CoreOptions(Map<String, String> options) { + this(Configuration.fromMap(options)); } - public CoreOptions(Configuration config) { - this( - config.get(WRITE_BUFFER_SIZE).getBytes(), - (int) config.get(PAGE_SIZE).getBytes(), - config.get(TARGET_FILE_SIZE).getBytes(), - config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER), - config.get(NUM_SORTED_RUNS_STOP_TRIGGER), - config.get(NUM_LEVELS), - config.get(COMMIT_FORCE_COMPACT), - config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT), - config.get(COMPACTION_SIZE_RATIO), - config.get(CHANGELOG_FILE)); - this.options = config; + public CoreOptions(Configuration options) { + this.options = options; + // TODO validate all keys Preconditions.checkArgument( snapshotNumRetainMin() > 0, SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); @@ -367,39 +314,6 @@ public class CoreOptions implements Serializable { + SNAPSHOT_NUM_RETAINED_MAX.key()); } - public CoreOptions(Map<String, String> options) { - this(Configuration.fromMap(options)); - } - - public static Set<ConfigOption<?>> allOptions() { - Set<ConfigOption<?>> allOptions = new HashSet<>(); - allOptions.add(BUCKET); - allOptions.add(PATH); - allOptions.add(FILE_FORMAT); - allOptions.add(MANIFEST_FORMAT); - allOptions.add(MANIFEST_TARGET_FILE_SIZE); - allOptions.add(MANIFEST_MERGE_MIN_COUNT); - allOptions.add(PARTITION_DEFAULT_NAME); - allOptions.add(SNAPSHOT_NUM_RETAINED_MIN); - allOptions.add(SNAPSHOT_NUM_RETAINED_MAX); - allOptions.add(SNAPSHOT_TIME_RETAINED); - allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL); - allOptions.add(MERGE_ENGINE); - allOptions.add(WRITE_MODE); - allOptions.add(SOURCE_SPLIT_TARGET_SIZE); - allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST); - allOptions.add(WRITE_BUFFER_SIZE); - allOptions.add(PAGE_SIZE); - allOptions.add(TARGET_FILE_SIZE); - allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER); - allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER); - allOptions.add(NUM_LEVELS); - allOptions.add(COMMIT_FORCE_COMPACT); - allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT); - allOptions.add(COMPACTION_SIZE_RATIO); - return allOptions; - } - public int bucket() { return options.get(BUCKET); } @@ -416,14 +330,6 @@ public class CoreOptions implements Serializable { return new Path(options.get(PATH)); } - public static String relativeTablePath(ObjectIdentifier tableIdentifier) { - return String.format( - "%s.catalog/%s.db/%s", - tableIdentifier.getCatalogName(), - tableIdentifier.getDatabaseName(), - tableIdentifier.getObjectName()); - } - public FileFormat fileFormat() { return FileFormat.fromTableOptions(options, FILE_FORMAT); } @@ -464,6 +370,50 @@ public class CoreOptions implements Serializable { return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes(); } + public long writeBufferSize() { + return options.get(WRITE_BUFFER_SIZE).getBytes(); + } + + public int pageSize() { + return (int) options.get(PAGE_SIZE).getBytes(); + } + + public long targetFileSize() { + return options.get(TARGET_FILE_SIZE).getBytes(); + } + + public int numSortedRunCompactionTrigger() { + return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER); + } + + public int numSortedRunStopTrigger() { + return Math.max(numSortedRunCompactionTrigger(), options.get(NUM_SORTED_RUNS_STOP_TRIGGER)); + } + + public int numLevels() { + // By default, this ensures that the compaction does not fall to level 0, but at least to + // level 1 + Integer numLevels = options.get(NUM_LEVELS); + numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels; + return numLevels; + } + + public boolean commitForceCompact() { + return options.get(COMMIT_FORCE_COMPACT); + } + + public int maxSizeAmplificationPercent() { + return options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT); + } + + public int sizeRatio() { + return options.get(COMPACTION_SIZE_RATIO); + } + + public boolean enableChangelogFile() { + return options.get(CHANGELOG_FILE); + } + /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), @@ -579,4 +529,20 @@ public class CoreOptions implements Serializable { return text(description); } } + + @Internal + public static List<ConfigOption<?>> getOptions() { + final Field[] fields = CoreOptions.class.getFields(); + final List<ConfigOption<?>> list = new ArrayList<>(fields.length); + for (Field field : fields) { + if (ConfigOption.class.isAssignableFrom(field.getType())) { + try { + list.add((ConfigOption<?>) field.get(CoreOptions.class)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + return list; + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java index 6236a39f..897ab2dc 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java @@ -62,7 +62,7 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> { pathFactory(), snapshotManager(), newScan(true), - options.targetFileSize); + options.targetFileSize()); } private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java deleted file mode 100644 index 43065064..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.file; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DescribedEnum; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.description.Description; -import org.apache.flink.configuration.description.InlineElement; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.store.file.mergetree.MergeTreeOptions; -import org.apache.flink.table.store.format.FileFormat; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.time.Duration; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.configuration.ConfigOptions.key; -import static org.apache.flink.configuration.description.TextElement.text; -import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption; - -/** Options for {@link FileStore}. */ -public class FileStoreOptions implements Serializable { - - public static final String TABLE_STORE_PREFIX = "table-store."; - - public static final ConfigOption<Integer> BUCKET = - ConfigOptions.key("bucket") - .intType() - .defaultValue(1) - .withDescription("Bucket number for file store."); - - public static final ConfigOption<String> PATH = - ConfigOptions.key("path") - .stringType() - .noDefaultValue() - .withDescription("The file path of this table in the filesystem."); - - public static final ConfigOption<String> FILE_FORMAT = - ConfigOptions.key("file.format") - .stringType() - .defaultValue("orc") - .withDescription("Specify the message format of data files."); - - public static final ConfigOption<String> MANIFEST_FORMAT = - ConfigOptions.key("manifest.format") - .stringType() - .defaultValue("avro") - .withDescription("Specify the message format of manifest files."); - - public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE = - ConfigOptions.key("manifest.target-file-size") - .memoryType() - .defaultValue(MemorySize.ofMebiBytes(8)) - .withDescription("Suggested file size of a manifest file."); - - public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT = - ConfigOptions.key("manifest.merge-min-count") - .intType() - .defaultValue(30) - .withDescription( - "To avoid frequent manifest merges, this parameter specifies the minimum number " - + "of ManifestFileMeta to merge."); - - public static final ConfigOption<String> PARTITION_DEFAULT_NAME = - key("partition.default-name") - .stringType() - .defaultValue("__DEFAULT_PARTITION__") - .withDescription( - "The default partition name in case the dynamic partition" - + " column value is null/empty string."); - - public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN = - ConfigOptions.key("snapshot.num-retained.min") - .intType() - .defaultValue(10) - .withDescription("The minimum number of completed snapshots to retain."); - - public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX = - ConfigOptions.key("snapshot.num-retained.max") - .intType() - .defaultValue(Integer.MAX_VALUE) - .withDescription("The maximum number of completed snapshots to retain."); - - public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED = - ConfigOptions.key("snapshot.time-retained") - .durationType() - .defaultValue(Duration.ofHours(1)) - .withDescription("The maximum time of completed snapshots to retain."); - - public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL = - ConfigOptions.key("continuous.discovery-interval") - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("The discovery interval of continuous reading."); - - public static final ConfigOption<MergeEngine> MERGE_ENGINE = - ConfigOptions.key("merge-engine") - .enumType(MergeEngine.class) - .defaultValue(MergeEngine.DEDUPLICATE) - .withDescription( - Description.builder() - .text("Specify the merge engine for table with primary key.") - .linebreak() - .list( - formatEnumOption(MergeEngine.DEDUPLICATE), - formatEnumOption(MergeEngine.PARTIAL_UPDATE)) - .build()); - - public static final ConfigOption<WriteMode> WRITE_MODE = - ConfigOptions.key("write-mode") - .enumType(WriteMode.class) - .defaultValue(WriteMode.CHANGE_LOG) - .withDescription( - Description.builder() - .text("Specify the write mode for table.") - .linebreak() - .list(formatEnumOption(WriteMode.APPEND_ONLY)) - .list(formatEnumOption(WriteMode.CHANGE_LOG)) - .build()); - - public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE = - ConfigOptions.key("source.split.target-size") - .memoryType() - .defaultValue(MemorySize.ofMebiBytes(128)) - .withDescription("Target size of a source split when scanning a bucket."); - - public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST = - ConfigOptions.key("source.split.open-file-cost") - .memoryType() - .defaultValue(MemorySize.ofMebiBytes(4)) - .withDescription( - "Open file cost of a source file. It is used to avoid reading" - + " too many files with a source split, which can be very slow."); - - private final Configuration options; - - public static Set<ConfigOption<?>> allOptions() { - Set<ConfigOption<?>> allOptions = new HashSet<>(); - allOptions.add(BUCKET); - allOptions.add(PATH); - allOptions.add(FILE_FORMAT); - allOptions.add(MANIFEST_FORMAT); - allOptions.add(MANIFEST_TARGET_FILE_SIZE); - allOptions.add(MANIFEST_MERGE_MIN_COUNT); - allOptions.add(PARTITION_DEFAULT_NAME); - allOptions.add(SNAPSHOT_NUM_RETAINED_MIN); - allOptions.add(SNAPSHOT_NUM_RETAINED_MAX); - allOptions.add(SNAPSHOT_TIME_RETAINED); - allOptions.add(CONTINUOUS_DISCOVERY_INTERVAL); - allOptions.add(MERGE_ENGINE); - allOptions.add(WRITE_MODE); - allOptions.add(SOURCE_SPLIT_TARGET_SIZE); - allOptions.add(SOURCE_SPLIT_OPEN_FILE_COST); - return allOptions; - } - - public FileStoreOptions(Map<String, String> options) { - this(Configuration.fromMap(options)); - } - - public FileStoreOptions(Configuration options) { - this.options = options; - // TODO validate all keys - Preconditions.checkArgument( - snapshotNumRetainMin() > 0, - SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); - Preconditions.checkArgument( - snapshotNumRetainMin() <= snapshotNumRetainMax(), - SNAPSHOT_NUM_RETAINED_MIN.key() - + " should not be larger than " - + SNAPSHOT_NUM_RETAINED_MAX.key()); - } - - public int bucket() { - return options.get(BUCKET); - } - - public Path path() { - return path(options.toMap()); - } - - public static Path path(Map<String, String> options) { - return new Path(options.get(PATH.key())); - } - - public static Path path(Configuration options) { - return new Path(options.get(PATH)); - } - - public static String relativeTablePath(ObjectIdentifier tableIdentifier) { - return String.format( - "%s.catalog/%s.db/%s", - tableIdentifier.getCatalogName(), - tableIdentifier.getDatabaseName(), - tableIdentifier.getObjectName()); - } - - public FileFormat fileFormat() { - return FileFormat.fromTableOptions(options, FILE_FORMAT); - } - - public FileFormat manifestFormat() { - return FileFormat.fromTableOptions(options, MANIFEST_FORMAT); - } - - public MemorySize manifestTargetSize() { - return options.get(MANIFEST_TARGET_FILE_SIZE); - } - - public String partitionDefaultName() { - return options.get(PARTITION_DEFAULT_NAME); - } - - public MergeTreeOptions mergeTreeOptions() { - return new MergeTreeOptions(options); - } - - public int snapshotNumRetainMin() { - return options.get(SNAPSHOT_NUM_RETAINED_MIN); - } - - public int snapshotNumRetainMax() { - return options.get(SNAPSHOT_NUM_RETAINED_MAX); - } - - public Duration snapshotTimeRetain() { - return options.get(SNAPSHOT_TIME_RETAINED); - } - - public int manifestMergeMinCount() { - return options.get(MANIFEST_MERGE_MIN_COUNT); - } - - public long splitTargetSize() { - return options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes(); - } - - public long splitOpenFileCost() { - return options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes(); - } - - /** Specifies the merge engine for table with primary key. */ - public enum MergeEngine implements DescribedEnum { - DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), - - PARTIAL_UPDATE("partial-update", "Partial update non-null fields."); - - private final String value; - private final String description; - - MergeEngine(String value, String description) { - this.value = value; - this.description = description; - } - - @Override - public String toString() { - return value; - } - - @Override - public InlineElement getDescription() { - return text(description); - } - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java index fb751a73..9443ddd2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java @@ -34,6 +34,8 @@ import java.util.function.Supplier; /** {@link FileStore} for querying and updating {@link KeyValue}s. */ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { + private static final long serialVersionUID = 1L; + private final RowType keyType; private final RowType valueType; private final Supplier<Comparator<RowData>> keyComparatorSupplier; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java index 403c8be0..608cd266 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java @@ -18,8 +18,6 @@ package org.apache.flink.table.store.file.catalog; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.Preconditions; @@ -28,6 +26,9 @@ import java.util.List; import java.util.ServiceLoader; import java.util.stream.Collectors; +import static org.apache.flink.table.store.CatalogOptions.METASTORE; +import static org.apache.flink.table.store.CatalogOptions.WAREHOUSE; + /** Factory to create {@link Catalog}. Each factory should have a unique identifier. */ public interface CatalogFactory { @@ -35,17 +36,6 @@ public interface CatalogFactory { Catalog create(String warehouse, ReadableConfig options); - ConfigOption<String> METASTORE = - ConfigOptions.key("metastore") - .stringType() - .defaultValue(FileSystemCatalogFactory.IDENTIFIER); - - ConfigOption<String> WAREHOUSE = - ConfigOptions.key("warehouse") - .stringType() - .noDefaultValue() - .withDescription("The warehouse root path of catalog."); - static Catalog createCatalog(ReadableConfig options) { // manual validation // because different catalog types may have different options diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java deleted file mode 100644 index 3f3bfa07..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.file.mergetree; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.ReadableConfig; - -import java.util.HashSet; -import java.util.Set; - -/** Options for merge tree. */ -public class MergeTreeOptions { - - public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE = - ConfigOptions.key("write-buffer-size") - .memoryType() - .defaultValue(MemorySize.parse("256 mb")) - .withDescription( - "Amount of data to build up in memory before converting to a sorted on-disk file."); - - public static final ConfigOption<MemorySize> PAGE_SIZE = - ConfigOptions.key("page-size") - .memoryType() - .defaultValue(MemorySize.parse("64 kb")) - .withDescription("Memory page size."); - - public static final ConfigOption<MemorySize> TARGET_FILE_SIZE = - ConfigOptions.key("target-file-size") - .memoryType() - .defaultValue(MemorySize.ofMebiBytes(128)) - .withDescription("Target size of a file."); - - public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER = - ConfigOptions.key("num-sorted-run.compaction-trigger") - .intType() - .defaultValue(5) - .withDescription( - "The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and " - + "high-level runs (one level one sorted run)."); - - public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER = - ConfigOptions.key("num-sorted-run.stop-trigger") - .intType() - .defaultValue(10) - .withDescription( - "The number of sorted-runs that trigger the stopping of writes."); - - public static final ConfigOption<Integer> NUM_LEVELS = - ConfigOptions.key("num-levels") - .intType() - .noDefaultValue() - .withDescription( - "Total level number, for example, there are 3 levels, including 0,1,2 levels."); - - public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT = - ConfigOptions.key("commit.force-compact") - .booleanType() - .defaultValue(false) - .withDescription("Whether to force a compaction before commit."); - - public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT = - ConfigOptions.key("compaction.max-size-amplification-percent") - .intType() - .defaultValue(200) - .withDescription( - "The size amplification is defined as the amount (in percentage) of additional storage " - + "needed to store a single byte of data in the merge tree."); - - public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO = - ConfigOptions.key("compaction.size-ratio") - .intType() - .defaultValue(1) - .withDescription( - "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) " - + "size is 1% smaller than the next sorted run's size, then include next sorted run " - + "into this candidate set."); - - public static final ConfigOption<Boolean> CHANGELOG_FILE = - ConfigOptions.key("changelog-file") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether to double write to a changelog file when flushing memory table. " - + "This changelog file keeps the order of data input and the details of data changes, " - + "it can be read directly during stream reads."); - - public final long writeBufferSize; - - public final int pageSize; - - public final long targetFileSize; - - public final int numSortedRunCompactionTrigger; - - public final int numSortedRunStopTrigger; - - public final int numLevels; - - public final boolean commitForceCompact; - - public final int maxSizeAmplificationPercent; - - public final int sizeRatio; - - public final boolean enableChangelogFile; - - public MergeTreeOptions( - long writeBufferSize, - int pageSize, - long targetFileSize, - int numSortedRunCompactionTrigger, - int numSortedRunStopTrigger, - Integer numLevels, - boolean commitForceCompact, - int maxSizeAmplificationPercent, - int sizeRatio, - boolean enableChangelogFile) { - this.writeBufferSize = writeBufferSize; - this.pageSize = pageSize; - this.targetFileSize = targetFileSize; - this.numSortedRunCompactionTrigger = numSortedRunCompactionTrigger; - this.numSortedRunStopTrigger = - Math.max(numSortedRunCompactionTrigger, numSortedRunStopTrigger); - // By default, this ensures that the compaction does not fall to level 0, but at least to - // level 1 - this.numLevels = numLevels == null ? numSortedRunCompactionTrigger + 1 : numLevels; - this.commitForceCompact = commitForceCompact; - this.maxSizeAmplificationPercent = maxSizeAmplificationPercent; - this.sizeRatio = sizeRatio; - this.enableChangelogFile = enableChangelogFile; - } - - public MergeTreeOptions(ReadableConfig config) { - this( - config.get(WRITE_BUFFER_SIZE).getBytes(), - (int) config.get(PAGE_SIZE).getBytes(), - config.get(TARGET_FILE_SIZE).getBytes(), - config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER), - config.get(NUM_SORTED_RUNS_STOP_TRIGGER), - config.get(NUM_LEVELS), - config.get(COMMIT_FORCE_COMPACT), - config.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT), - config.get(COMPACTION_SIZE_RATIO), - config.get(CHANGELOG_FILE)); - } - - public static Set<ConfigOption<?>> allOptions() { - Set<ConfigOption<?>> allOptions = new HashSet<>(); - allOptions.add(WRITE_BUFFER_SIZE); - allOptions.add(PAGE_SIZE); - allOptions.add(TARGET_FILE_SIZE); - allOptions.add(NUM_SORTED_RUNS_COMPACTION_TRIGGER); - allOptions.add(NUM_SORTED_RUNS_STOP_TRIGGER); - allOptions.add(NUM_LEVELS); - allOptions.add(COMMIT_FORCE_COMPACT); - allOptions.add(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT); - allOptions.add(COMPACTION_SIZE_RATIO); - return allOptions; - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index b06df389..09cd4604 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -85,7 +85,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { valueType, fileFormat, pathFactory, - options.targetFileSize); + options.targetFileSize()); this.keyComparatorSupplier = keyComparatorSupplier; this.mergeFunction = mergeFunction; this.options = options; @@ -112,10 +112,10 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { } Comparator<RowData> keyComparator = keyComparatorSupplier.get(); CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator); - Levels levels = new Levels(keyComparator, compactFiles, options.numLevels); + Levels levels = new Levels(keyComparator, compactFiles, options.numLevels()); CompactUnit unit = CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns()); - return new CompactTask(keyComparator, options.targetFileSize, rewriter, unit, true); + return new CompactTask(keyComparator, options.targetFileSize(), rewriter, unit, true); } private MergeTreeWriter createMergeTreeWriter( @@ -132,18 +132,18 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { partition, bucket, new UniversalCompaction( - options.maxSizeAmplificationPercent, - options.sizeRatio, - options.numSortedRunCompactionTrigger), + options.maxSizeAmplificationPercent(), + options.sizeRatio(), + options.numSortedRunCompactionTrigger()), compactExecutor), - new Levels(keyComparator, restoreFiles, options.numLevels), + new Levels(keyComparator, restoreFiles, options.numLevels()), getMaxSequenceNumber(restoreFiles), keyComparator, mergeFunction.copy(), dataFileWriter, - options.commitForceCompact, - options.numSortedRunStopTrigger, - options.enableChangelogFile); + options.commitForceCompact(), + options.numSortedRunStopTrigger(), + options.enableChangelogFile()); } private CompactManager createCompactManager( @@ -154,7 +154,11 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { Comparator<RowData> keyComparator = keyComparatorSupplier.get(); CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator); return new CompactManager( - compactExecutor, compactStrategy, keyComparator, options.targetFileSize, rewriter); + compactExecutor, + compactStrategy, + keyComparator, + options.targetFileSize(), + rewriter); } private CompactRewriter compactRewriter( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java deleted file mode 100644 index f975233d..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.store.log; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.DescribedEnum; -import org.apache.flink.configuration.description.Description; -import org.apache.flink.configuration.description.InlineElement; - -import java.time.Duration; - -import static org.apache.flink.configuration.description.TextElement.text; -import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption; - -/** Options for log store. */ -public class LogOptions { - - public static final String LOG_PREFIX = "log."; - - public static final ConfigOption<LogStartupMode> SCAN = - ConfigOptions.key("scan") - .enumType(LogStartupMode.class) - .defaultValue(LogStartupMode.FULL) - .withDescription( - Description.builder() - .text("Specify the startup mode for log consumer.") - .linebreak() - .list(formatEnumOption(LogStartupMode.FULL)) - .list(formatEnumOption(LogStartupMode.LATEST)) - .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP)) - .build()); - - public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLS = - ConfigOptions.key("scan.timestamp-millis") - .longType() - .noDefaultValue() - .withDescription( - "Optional timestamp used in case of \"from-timestamp\" scan mode"); - - public static final ConfigOption<Duration> RETENTION = - ConfigOptions.key("retention") - .durationType() - .noDefaultValue() - .withDescription( - "It means how long changes log will be kept. The default value is from the log system cluster."); - - public static final ConfigOption<LogConsistency> CONSISTENCY = - ConfigOptions.key("consistency") - .enumType(LogConsistency.class) - .defaultValue(LogConsistency.TRANSACTIONAL) - .withDescription( - Description.builder() - .text("Specify the log consistency mode for table.") - .linebreak() - .list( - formatEnumOption(LogConsistency.TRANSACTIONAL), - formatEnumOption(LogConsistency.EVENTUAL)) - .build()); - - public static final ConfigOption<LogChangelogMode> CHANGELOG_MODE = - ConfigOptions.key("changelog-mode") - .enumType(LogChangelogMode.class) - .defaultValue(LogChangelogMode.AUTO) - .withDescription( - Description.builder() - .text("Specify the log changelog mode for table.") - .linebreak() - .list( - formatEnumOption(LogChangelogMode.AUTO), - formatEnumOption(LogChangelogMode.ALL), - formatEnumOption(LogChangelogMode.UPSERT)) - .build()); - - public static final ConfigOption<String> KEY_FORMAT = - ConfigOptions.key("key.format") - .stringType() - .defaultValue("json") - .withDescription( - "Specify the key message format of log system with primary key."); - - public static final ConfigOption<String> FORMAT = - ConfigOptions.key("format") - .stringType() - .defaultValue("debezium-json") - .withDescription("Specify the message format of log system."); - - /** Specifies the startup mode for log consumer. */ - public enum LogStartupMode implements DescribedEnum { - FULL( - "full", - "Perform a snapshot on the table upon first startup," - + " and continue to read the latest changes."), - - LATEST("latest", "Start from the latest."), - - FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp."); - - private final String value; - private final String description; - - LogStartupMode(String value, String description) { - this.value = value; - this.description = description; - } - - @Override - public String toString() { - return value; - } - - @Override - public InlineElement getDescription() { - return text(description); - } - } - - /** Specifies the log consistency mode for table. */ - public enum LogConsistency implements DescribedEnum { - TRANSACTIONAL( - "transactional", - "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."), - - EVENTUAL( - "eventual", - "Immediate data visibility, you may see some intermediate states, " - + "but eventually the right results will be produced, only works for table with primary key."); - - private final String value; - private final String description; - - LogConsistency(String value, String description) { - this.value = value; - this.description = description; - } - - @Override - public String toString() { - return value; - } - - @Override - public InlineElement getDescription() { - return text(description); - } - } - - /** Specifies the log changelog mode for table. */ - public enum LogChangelogMode implements DescribedEnum { - AUTO("auto", "Upsert for table with primary key, all for table without primary key.."), - - ALL("all", "The log system stores all changes including UPDATE_BEFORE."), - - UPSERT( - "upsert", - "The log system does not store the UPDATE_BEFORE changes, the log consumed job" - + " will automatically add the normalized node, relying on the state" - + " to generate the required update_before."); - - private final String value; - private final String description; - - LogChangelogMode(String value, String description) { - this.value = value; - this.description = description; - } - - @Override - public String toString() { - return value; - } - - @Override - public InlineElement getDescription() { - return text(description); - } - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java index 4ae4db06..0df149ed 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java @@ -34,13 +34,15 @@ import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.flink.table.factories.SerializationFormatFactory; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.types.RowKind; import javax.annotation.Nullable; import java.util.Map; +import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT; +import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT; + /** * Base interface for configuring a default log table connector. The log table is used by managed * table factory. @@ -83,36 +85,32 @@ public interface LogStoreTableFactory extends DynamicTableFactory { static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat( TableFactoryHelper helper) { DecodingFormat<DeserializationSchema<RowData>> format = - helper.discoverDecodingFormat( - DeserializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT); - validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT)); + helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_KEY_FORMAT); + validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT)); return format; } static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat( TableFactoryHelper helper) { EncodingFormat<SerializationSchema<RowData>> format = - helper.discoverEncodingFormat( - SerializationFormatFactory.class, CoreOptions.LOG_KEY_FORMAT); - validateKeyFormat(format, helper.getOptions().get(CoreOptions.LOG_KEY_FORMAT)); + helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_KEY_FORMAT); + validateKeyFormat(format, helper.getOptions().get(LOG_KEY_FORMAT)); return format; } static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat( TableFactoryHelper helper) { DecodingFormat<DeserializationSchema<RowData>> format = - helper.discoverDecodingFormat( - DeserializationFormatFactory.class, CoreOptions.LOG_FORMAT); - validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT)); + helper.discoverDecodingFormat(DeserializationFormatFactory.class, LOG_FORMAT); + validateValueFormat(format, helper.getOptions().get(LOG_FORMAT)); return format; } static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat( TableFactoryHelper helper) { EncodingFormat<SerializationSchema<RowData>> format = - helper.discoverEncodingFormat( - SerializationFormatFactory.class, CoreOptions.LOG_FORMAT); - validateValueFormat(format, helper.getOptions().get(CoreOptions.LOG_FORMAT)); + helper.discoverEncodingFormat(SerializationFormatFactory.class, LOG_FORMAT); + validateValueFormat(format, helper.getOptions().get(LOG_FORMAT)); return format; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java index 1e997477..1f49b1b7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java @@ -42,7 +42,7 @@ public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> { super(write, recordConverter); HeapMemorySegmentPool memoryPool = - new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize); + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()); this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java index 2de27d14..eecb0f30 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java @@ -122,7 +122,7 @@ public class MergeTreeTest { valueType, flushingAvro, pathFactory, - options.targetFileSize) + options.targetFileSize()) .create(BinaryRowDataUtil.EMPTY_ROW, 0); writer = createMergeTreeWriter(Collections.emptyList()); } @@ -271,15 +271,16 @@ public class MergeTreeTest { dataFileWriter.keyType(), dataFileWriter.valueType(), createCompactManager(dataFileWriter, service), - new Levels(comparator, files, options.numLevels), + new Levels(comparator, files, options.numLevels()), maxSequenceNumber, comparator, new DeduplicateMergeFunction(), dataFileWriter, - options.commitForceCompact, - options.numSortedRunStopTrigger, + options.commitForceCompact(), + options.numSortedRunStopTrigger(), false); - writer.setMemoryPool(new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize)); + writer.setMemoryPool( + new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return writer; } @@ -287,9 +288,9 @@ public class MergeTreeTest { DataFileWriter dataFileWriter, ExecutorService compactExecutor) { CompactStrategy compactStrategy = new UniversalCompaction( - options.maxSizeAmplificationPercent, - options.sizeRatio, - options.numSortedRunCompactionTrigger); + options.maxSizeAmplificationPercent(), + options.sizeRatio(), + options.numSortedRunCompactionTrigger()); CompactRewriter rewriter = (outputLevel, dropDelete, sections) -> dataFileWriter.write( @@ -302,7 +303,7 @@ public class MergeTreeTest { new DeduplicateMergeFunction())), outputLevel); return new CompactManager( - compactExecutor, compactStrategy, comparator, options.targetFileSize, rewriter); + compactExecutor, compactStrategy, comparator, options.targetFileSize(), rewriter); } private void mergeCompacted( diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java index da385956..487e3198 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/LogStoreE2eTest.java @@ -74,7 +74,7 @@ public class LogStoreE2eTest extends E2eTestBase { + " 'root-path' = '%s',\n" + " 'log.consistency' = 'eventual',\n" + " 'log.system' = 'kafka',\n" - + " 'log.kafka.bootstrap.servers' = '%s'\n" + + " 'kafka.bootstrap.servers' = '%s'\n" + ");"; tableStoreStreamDdl = String.format( diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java index 1e2cbabb..ec0dc2d1 100644 --- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java @@ -18,24 +18,18 @@ package org.apache.flink.table.store.hive; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.store.file.catalog.Catalog; import org.apache.flink.table.store.file.catalog.CatalogFactory; import org.apache.flink.util.Preconditions; +import static org.apache.flink.table.store.CatalogOptions.URI; + /** Factory to create {@link HiveCatalog}. */ public class HiveCatalogFactory implements CatalogFactory { private static final String IDENTIFIER = "hive"; - private static final ConfigOption<String> URI = - ConfigOptions.key("uri") - .stringType() - .noDefaultValue() - .withDescription("Uri of Hive metastore's thrift server."); - @Override public String identifier() { return IDENTIFIER; diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java index 8cdb6d0c..4b4527b8 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java @@ -31,7 +31,7 @@ public class KafkaLogOptions { .withDescription("Required Kafka server connection string"); public static final ConfigOption<String> TOPIC = - ConfigOptions.key("topic") + ConfigOptions.key("kafka.topic") .stringType() .noDefaultValue() .withDescription("Topic of this kafka table."); diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java index 85870e9f..d05a06cc 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java @@ -57,8 +57,6 @@ import java.util.concurrent.ExecutionException; import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper; import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.CoreOptions.LOG_FORMAT; -import static org.apache.flink.table.store.CoreOptions.LOG_KEY_FORMAT; import static org.apache.flink.table.store.CoreOptions.LOG_RETENTION; import static org.apache.flink.table.store.CoreOptions.LOG_SCAN; import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_TIMESTAMP_MILLS; @@ -88,16 +86,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory { @Override public Set<ConfigOption<?>> optionalOptions() { - Set<ConfigOption<?>> options = new HashSet<>(); - options.add(LOG_SCAN); - options.add(TOPIC); - options.add(LOG_SCAN_TIMESTAMP_MILLS); - options.add(LOG_RETENTION); - options.add(LOG_CONSISTENCY); - options.add(LOG_CHANGELOG_MODE); - options.add(LOG_KEY_FORMAT); - options.add(LOG_FORMAT); - return options; + return new HashSet<>(); } @Override @@ -119,12 +108,10 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory { @Override public void onCreateTable(Context context, int numBucket, boolean ignoreIfExists) { - FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context); - helper.validateExcept(KAFKA_PREFIX); - try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) { + Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions()); + try (AdminClient adminClient = AdminClient.create(toKafkaProperties(options))) { Map<String, String> configs = new HashMap<>(); - helper.getOptions() - .getOptional(LOG_RETENTION) + options.getOptional(LOG_RETENTION) .ifPresent( retention -> configs.put( diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java index 343525e6..8c82a433 100644 --- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java +++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java @@ -35,6 +35,8 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.store.CoreOptions.LogChangelogMode; +import org.apache.flink.table.store.CoreOptions.LogConsistency; import org.apache.flink.table.store.log.LogStoreTableFactory; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.types.DataType; @@ -56,8 +58,6 @@ import java.util.stream.IntStream; import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW; import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.CoreOptions.LogChangelogMode; -import static org.apache.flink.table.store.CoreOptions.LogConsistency; import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;