This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 306a89a3556ca3fbab0306301f56972ccf11641b
Author: JingsongLi <[email protected]>
AuthorDate: Tue Feb 11 15:39:59 2020 +0800

    [FLINK-15912][table] Support create table source/sink by context in sql-cli
---
 .../client/gateway/local/ExecutionContext.java     | 37 +++++++++++++++-------
 .../gateway/utils/TestTableSinkFactoryBase.java    | 11 +++----
 .../gateway/utils/TestTableSourceFactoryBase.java  | 10 +++---
 .../flink/table/catalog/CatalogTableImpl.java      | 15 +++++++++
 .../table/descriptors/ConnectTableDescriptor.java  | 15 +--------
 .../table/descriptors/DescriptorProperties.java    | 19 +++++++++++
 6 files changed, 71 insertions(+), 36 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ba69ebd..8bad114 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -48,11 +48,12 @@ import 
org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
-import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.SinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -72,7 +73,9 @@ import 
org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionService;
@@ -374,12 +377,18 @@ public class ExecutionContext<ClusterID> {
                return factory.createCatalog(name, catalogProperties);
        }
 
-       private static TableSource<?> createTableSource(ExecutionEntry 
execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
-               if (execution.isStreamingPlanner()) {
+       private TableSource<?> createTableSource(String name, Map<String, 
String> sourceProperties) {
+               if (environment.getExecution().isStreamingPlanner()) {
                        final TableSourceFactory<?> factory = 
(TableSourceFactory<?>)
                                
TableFactoryService.find(TableSourceFactory.class, sourceProperties, 
classLoader);
-                       return factory.createTableSource(sourceProperties);
-               } else if (execution.isBatchPlanner()) {
+                       return factory.createTableSource(new 
TableSourceFactoryContextImpl(
+                                       ObjectIdentifier.of(
+                                                       
tableEnv.getCurrentCatalog(),
+                                                       
tableEnv.getCurrentDatabase(),
+                                                       name),
+                                       
CatalogTableImpl.fromProperties(sourceProperties),
+                                       
tableEnv.getConfig().getConfiguration()));
+               } else if (environment.getExecution().isBatchPlanner()) {
                        final BatchTableSourceFactory<?> factory = 
(BatchTableSourceFactory<?>)
                                
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, 
classLoader);
                        return factory.createBatchTableSource(sourceProperties);
@@ -387,12 +396,18 @@ public class ExecutionContext<ClusterID> {
                throw new SqlExecutionException("Unsupported execution type for 
sources.");
        }
 
-       private static TableSink<?> createTableSink(ExecutionEntry execution, 
Map<String, String> sinkProperties, ClassLoader classLoader) {
-               if (execution.isStreamingPlanner()) {
+       private TableSink<?> createTableSink(String name, Map<String, String> 
sinkProperties) {
+               if (environment.getExecution().isStreamingPlanner()) {
                        final TableSinkFactory<?> factory = 
(TableSinkFactory<?>)
                                
TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
-                       return factory.createTableSink(sinkProperties);
-               } else if (execution.isBatchPlanner()) {
+                       return factory.createTableSink(new 
TableSinkFactoryContextImpl(
+                                       ObjectIdentifier.of(
+                                                       
tableEnv.getCurrentCatalog(),
+                                                       
tableEnv.getCurrentDatabase(),
+                                                       name),
+                                       
CatalogTableImpl.fromProperties(sinkProperties),
+                                       
tableEnv.getConfig().getConfiguration()));
+               } else if (environment.getExecution().isBatchPlanner()) {
                        final BatchTableSinkFactory<?> factory = 
(BatchTableSinkFactory<?>)
                                
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, 
classLoader);
                        return factory.createBatchTableSink(sinkProperties);
@@ -567,10 +582,10 @@ public class ExecutionContext<ClusterID> {
                Map<String, TableSink<?>> tableSinks = new HashMap<>();
                environment.getTables().forEach((name, entry) -> {
                        if (entry instanceof SourceTableEntry || entry 
instanceof SourceSinkTableEntry) {
-                               tableSources.put(name, 
createTableSource(environment.getExecution(), entry.asMap(), classLoader));
+                               tableSources.put(name, createTableSource(name, 
entry.asMap()));
                        }
                        if (entry instanceof SinkTableEntry || entry instanceof 
SourceSinkTableEntry) {
-                               tableSinks.put(name, 
createTableSink(environment.getExecution(), entry.asMap(), classLoader));
+                               tableSinks.put(name, createTableSink(name, 
entry.asMap()));
                        }
                });
                // register table sources
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index 408bf24..e934959 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
@@ -93,12 +92,10 @@ public abstract class TestTableSinkFactoryBase implements 
StreamTableSinkFactory
        }
 
        @Override
-       public StreamTableSink<Row> createStreamTableSink(Map<String, String> 
properties) {
-               final DescriptorProperties params = new 
DescriptorProperties(true);
-               params.putProperties(properties);
+       public StreamTableSink<Row> createTableSink(TableSinkFactory.Context 
context) {
                return new TestTableSink(
-                               SchemaValidator.deriveTableSinkSchema(params),
-                               properties.get(testProperty));
+                               context.getTable().getSchema(),
+                               
context.getTable().getProperties().get(testProperty));
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index e64ae51..f305b90 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.Types;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -96,14 +97,15 @@ public abstract class TestTableSourceFactoryBase implements 
StreamTableSourceFac
        }
 
        @Override
-       public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
+       public StreamTableSource<Row> 
createTableSource(TableSourceFactory.Context context) {
+               TableSchema schema = context.getTable().getSchema();
                final DescriptorProperties params = new 
DescriptorProperties(true);
-               params.putProperties(properties);
+               params.putProperties(context.getTable().getProperties());
                final Optional<String> proctime = 
SchemaValidator.deriveProctimeAttribute(params);
                final List<RowtimeAttributeDescriptor> rowtime = 
SchemaValidator.deriveRowtimeAttributes(params);
                return new TestTableSource(
-                       
TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)),
-                       properties.get(testProperty),
+                       schema,
+                       context.getTable().getProperties().get(testProperty),
                        proctime.orElse(null),
                        rowtime);
        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 5566fc3..9808b32 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -81,4 +81,19 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 
                return descriptor.asMap();
        }
+
+       /**
+        * Construct a {@link CatalogTableImpl} from complete properties that 
contains table schema.
+        */
+       public static CatalogTableImpl fromProperties(Map<String, String> 
properties) {
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties();
+               descriptorProperties.putProperties(properties);
+               TableSchema tableSchema = 
descriptorProperties.getTableSchema(Schema.SCHEMA);
+               descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
+               return new CatalogTableImpl(
+                               tableSchema,
+                               descriptorProperties.asMap(),
+                               ""
+               );
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index fb63382..a7c54c7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -80,19 +79,7 @@ public abstract class ConnectTableDescriptor
                                        " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
                }
 
-               Map<String, String> schemaProperties = 
schemaDescriptor.toProperties();
-               TableSchema tableSchema = getTableSchema(schemaProperties);
-
-               Map<String, String> properties = new HashMap<>(toProperties());
-               schemaProperties.keySet().forEach(properties::remove);
-
-               CatalogTableImpl catalogTable = new CatalogTableImpl(
-                       tableSchema,
-                       properties,
-                       ""
-               );
-
-               registration.createTemporaryTable(path, catalogTable);
+               registration.createTemporaryTable(path, 
CatalogTableImpl.fromProperties(toProperties()));
        }
 
        private TableSchema getTableSchema(Map<String, String> 
schemaProperties) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index f299db4..1f17519 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -144,6 +145,24 @@ public class DescriptorProperties {
        }
 
        /**
+        * Removes the mapping for a key prefix from this properties if it is 
present.
+        *
+        * <p>For example: for prefix "flink", the kvs in properties like key 
"flink.k" and
+        * value "v" will be removed.
+        */
+       public void removeKeyPrefix(String prefix) {
+               checkNotNull(prefix);
+
+               Iterator<Map.Entry<String, String>> iterator = 
properties.entrySet().iterator();
+               while (iterator.hasNext()) {
+                       String key = iterator.next().getKey();
+                       if (key.startsWith(prefix)) {
+                               iterator.remove();
+                       }
+               }
+       }
+
+       /**
         * Adds a class under the given key.
         */
        public void putClass(String key, Class<?> clazz) {

Reply via email to