This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e92bdb186e6abc1d5e033ebfb4978e94af20cc7
Author: JingsongLi <[email protected]>
AuthorDate: Tue Feb 11 16:50:49 2020 +0800

    [FLINK-15912][table] Support create table source/sink by context in hive 
connector
---
 .../flink/connectors/hive/HiveTableFactory.java    | 26 +++++++---------------
 .../connectors/hive/HiveTableFactoryTest.java      | 16 +++++++++----
 .../flink/connectors/hive/HiveTableSourceTest.java |  3 ++-
 .../flink/table/factories/TableFactoryUtil.java    | 14 ------------
 4 files changed, 22 insertions(+), 37 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index dc70ec2..9a17b71 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -64,26 +64,16 @@ public class HiveTableFactory
        }
 
        @Override
-       public TableSink<Row> createTableSink(Map<String, String> properties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public TableSource<BaseRow> createTableSource(Map properties) {
-               throw new UnsupportedOperationException();
-       }
-
-       @Override
-       public TableSource<BaseRow> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
-               Preconditions.checkNotNull(table);
+       public TableSource<BaseRow> 
createTableSource(TableSourceFactory.Context context) {
+               CatalogTable table = checkNotNull(context.getTable());
                Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
                boolean isGeneric = 
Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
                if (!isGeneric) {
-                       return createHiveTableSource(tablePath, table);
+                       return 
createHiveTableSource(context.getObjectIdentifier().toObjectPath(), table);
                } else {
-                       return TableFactoryUtil.findAndCreateTableSource(table);
+                       return 
TableFactoryUtil.findAndCreateTableSource(context);
                }
        }
 
@@ -95,16 +85,16 @@ public class HiveTableFactory
        }
 
        @Override
-       public TableSink<Row> createTableSink(ObjectPath tablePath, 
CatalogTable table) {
-               Preconditions.checkNotNull(table);
+       public TableSink<Row> createTableSink(TableSinkFactory.Context context) 
{
+               CatalogTable table = checkNotNull(context.getTable());
                Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
                boolean isGeneric = 
Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
                if (!isGeneric) {
-                       return createOutputFormatTableSink(tablePath, table);
+                       return 
createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), 
table);
                } else {
-                       return TableFactoryUtil.findAndCreateTableSink(table);
+                       return TableFactoryUtil.findAndCreateTableSink(context);
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
index f04d496..88dc191 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -78,9 +82,11 @@ public class HiveTableFactoryTest {
                Optional<TableFactory> opt = catalog.getTableFactory();
                assertTrue(opt.isPresent());
                HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
-               TableSource tableSource = tableFactory.createTableSource(path, 
table);
+               TableSource tableSource = tableFactory.createTableSource(new 
TableSourceFactoryContextImpl(
+                               ObjectIdentifier.of("mycatalog", "mydb", 
"mytable"), table, new Configuration()));
                assertTrue(tableSource instanceof StreamTableSource);
-               TableSink tableSink = tableFactory.createTableSink(path, table);
+               TableSink tableSink = tableFactory.createTableSink(new 
TableSinkFactoryContextImpl(
+                               ObjectIdentifier.of("mycatalog", "mydb", 
"mytable"), table, new Configuration()));
                assertTrue(tableSink instanceof StreamTableSink);
        }
 
@@ -100,9 +106,11 @@ public class HiveTableFactoryTest {
                Optional<TableFactory> opt = catalog.getTableFactory();
                assertTrue(opt.isPresent());
                HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
-               TableSink tableSink = tableFactory.createTableSink(path, table);
+               TableSink tableSink = tableFactory.createTableSink(new 
TableSinkFactoryContextImpl(
+                               ObjectIdentifier.of("mycatalog", "mydb", 
"mytable"), table, new Configuration()));
                assertTrue(tableSink instanceof HiveTableSink);
-               TableSource tableSource = tableFactory.createTableSource(path, 
table);
+               TableSource tableSource = tableFactory.createTableSource(new 
TableSourceFactoryContextImpl(
+                               ObjectIdentifier.of("mycatalog", "mydb", 
"mytable"), table, new Configuration()));
                assertTrue(tableSource instanceof HiveTableSource);
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..e5afef5 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -447,7 +448,7 @@ public class HiveTableSourceTest {
 
                HiveTableFactory tableFactorySpy = spy((HiveTableFactory) 
hiveCatalog.getTableFactory().get());
                doReturn(new TestVectorReaderSource(new 
JobConf(hiveCatalog.getHiveConf()), tablePath, catalogTable))
-                               
.when(tableFactorySpy).createTableSource(any(ObjectPath.class), 
any(CatalogTable.class));
+                               
.when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));
                HiveCatalog catalogSpy = spy(hiveCatalog);
                
doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index bea892e..beca21d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -89,20 +89,6 @@ public class TableFactoryUtil {
        }
 
        /**
-        * Returns a table sink matching the {@link 
org.apache.flink.table.catalog.CatalogTable}.
-        */
-       public static <T> TableSink<T> findAndCreateTableSink(CatalogTable 
table) {
-               return findAndCreateTableSink(table.toProperties());
-       }
-
-       /**
-        * Returns a table source matching the {@link 
org.apache.flink.table.catalog.CatalogTable}.
-        */
-       public static <T> TableSource<T> findAndCreateTableSource(CatalogTable 
table) {
-               return findAndCreateTableSource(table.toProperties());
-       }
-
-       /**
         * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the catalog.
         */
        public static Optional<TableSink> 
createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context 
context) {

Reply via email to