This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 306a89a3556ca3fbab0306301f56972ccf11641b Author: JingsongLi <[email protected]> AuthorDate: Tue Feb 11 15:39:59 2020 +0800 [FLINK-15912][table] Support create table source/sink by context in sql-cli --- .../client/gateway/local/ExecutionContext.java | 37 +++++++++++++++------- .../gateway/utils/TestTableSinkFactoryBase.java | 11 +++---- .../gateway/utils/TestTableSourceFactoryBase.java | 10 +++--- .../flink/table/catalog/CatalogTableImpl.java | 15 +++++++++ .../table/descriptors/ConnectTableDescriptor.java | 15 +-------- .../table/descriptors/DescriptorProperties.java | 19 +++++++++++ 6 files changed, 71 insertions(+), 36 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index ba69ebd..8bad114 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -48,11 +48,12 @@ import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl; import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.DeploymentEntry; -import org.apache.flink.table.client.config.entries.ExecutionEntry; import org.apache.flink.table.client.config.entries.SinkTableEntry; import org.apache.flink.table.client.config.entries.SourceSinkTableEntry; import org.apache.flink.table.client.config.entries.SourceTableEntry; @@ -72,7 +73,9 @@ import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.factories.TableSinkFactory; +import org.apache.flink.table.factories.TableSinkFactoryContextImpl; import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.factories.TableSourceFactoryContextImpl; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionService; @@ -374,12 +377,18 @@ public class ExecutionContext<ClusterID> { return factory.createCatalog(name, catalogProperties); } - private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) { - if (execution.isStreamingPlanner()) { + private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) { + if (environment.getExecution().isStreamingPlanner()) { final TableSourceFactory<?> factory = (TableSourceFactory<?>) TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader); - return factory.createTableSource(sourceProperties); - } else if (execution.isBatchPlanner()) { + return factory.createTableSource(new TableSourceFactoryContextImpl( + ObjectIdentifier.of( + tableEnv.getCurrentCatalog(), + tableEnv.getCurrentDatabase(), + name), + CatalogTableImpl.fromProperties(sourceProperties), + tableEnv.getConfig().getConfiguration())); + } else if (environment.getExecution().isBatchPlanner()) { final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>) TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader); return factory.createBatchTableSource(sourceProperties); @@ -387,12 +396,18 @@ public class ExecutionContext<ClusterID> { throw new SqlExecutionException("Unsupported execution type for sources."); } - private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) { - if (execution.isStreamingPlanner()) { + private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) { + if (environment.getExecution().isStreamingPlanner()) { final TableSinkFactory<?> factory = (TableSinkFactory<?>) TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader); - return factory.createTableSink(sinkProperties); - } else if (execution.isBatchPlanner()) { + return factory.createTableSink(new TableSinkFactoryContextImpl( + ObjectIdentifier.of( + tableEnv.getCurrentCatalog(), + tableEnv.getCurrentDatabase(), + name), + CatalogTableImpl.fromProperties(sinkProperties), + tableEnv.getConfig().getConfiguration())); + } else if (environment.getExecution().isBatchPlanner()) { final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>) TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader); return factory.createBatchTableSink(sinkProperties); @@ -567,10 +582,10 @@ public class ExecutionContext<ClusterID> { Map<String, TableSink<?>> tableSinks = new HashMap<>(); environment.getTables().forEach((name, entry) -> { if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) { - tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader)); + tableSources.put(name, createTableSource(name, entry.asMap())); } if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) { - tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader)); + tableSinks.put(name, createTableSink(name, entry.asMap())); } }); // register table sources diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java index 408bf24..e934959 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java @@ -22,9 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sinks.TableSink; @@ -93,12 +92,10 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory } @Override - public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) { - final DescriptorProperties params = new DescriptorProperties(true); - params.putProperties(properties); + public StreamTableSink<Row> createTableSink(TableSinkFactory.Context context) { return new TestTableSink( - SchemaValidator.deriveTableSinkSchema(params), - properties.get(testProperty)); + context.getTable().getSchema(), + context.getTable().getProperties().get(testProperty)); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java index e64ae51..f305b90 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java @@ -26,6 +26,7 @@ import org.apache.flink.table.api.Types; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.StreamTableSourceFactory; +import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.sources.DefinedProctimeAttribute; import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; @@ -96,14 +97,15 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac } @Override - public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { + public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) { + TableSchema schema = context.getTable().getSchema(); final DescriptorProperties params = new DescriptorProperties(true); - params.putProperties(properties); + params.putProperties(context.getTable().getProperties()); final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params); final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params); return new TestTableSource( - TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)), - properties.get(testProperty), + schema, + context.getTable().getProperties().get(testProperty), proctime.orElse(null), rowtime); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java index 5566fc3..9808b32 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java @@ -81,4 +81,19 @@ public class CatalogTableImpl extends AbstractCatalogTable { return descriptor.asMap(); } + + /** + * Construct a {@link CatalogTableImpl} from complete properties that contains table schema. + */ + public static CatalogTableImpl fromProperties(Map<String, String> properties) { + DescriptorProperties descriptorProperties = new DescriptorProperties(); + descriptorProperties.putProperties(properties); + TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA); + descriptorProperties.removeKeyPrefix(Schema.SCHEMA); + return new CatalogTableImpl( + tableSchema, + descriptorProperties.asMap(), + "" + ); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java index fb63382..a7c54c7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java @@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** @@ -80,19 +79,7 @@ public abstract class ConnectTableDescriptor " use registerTableSource/registerTableSink/registerTableSourceAndSink."); } - Map<String, String> schemaProperties = schemaDescriptor.toProperties(); - TableSchema tableSchema = getTableSchema(schemaProperties); - - Map<String, String> properties = new HashMap<>(toProperties()); - schemaProperties.keySet().forEach(properties::remove); - - CatalogTableImpl catalogTable = new CatalogTableImpl( - tableSchema, - properties, - "" - ); - - registration.createTemporaryTable(path, catalogTable); + registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties())); } private TableSchema getTableSchema(Map<String, String> schemaProperties) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java index f299db4..1f17519 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -144,6 +145,24 @@ public class DescriptorProperties { } /** + * Removes the mapping for a key prefix from this properties if it is present. + * + * <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and + * value "v" will be removed. + */ + public void removeKeyPrefix(String prefix) { + checkNotNull(prefix); + + Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator(); + while (iterator.hasNext()) { + String key = iterator.next().getKey(); + if (key.startsWith(prefix)) { + iterator.remove(); + } + } + } + + /** * Adds a class under the given key. */ public void putClass(String key, Class<?> clazz) {
