This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4e92bdb186e6abc1d5e033ebfb4978e94af20cc7 Author: JingsongLi <[email protected]> AuthorDate: Tue Feb 11 16:50:49 2020 +0800 [FLINK-15912][table] Support create table source/sink by context in hive connector --- .../flink/connectors/hive/HiveTableFactory.java | 26 +++++++--------------- .../connectors/hive/HiveTableFactoryTest.java | 16 +++++++++---- .../flink/connectors/hive/HiveTableSourceTest.java | 3 ++- .../flink/table/factories/TableFactoryUtil.java | 14 ------------ 4 files changed, 22 insertions(+), 37 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java index dc70ec2..9a17b71 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java @@ -64,26 +64,16 @@ public class HiveTableFactory } @Override - public TableSink<Row> createTableSink(Map<String, String> properties) { - throw new UnsupportedOperationException(); - } - - @Override - public TableSource<BaseRow> createTableSource(Map properties) { - throw new UnsupportedOperationException(); - } - - @Override - public TableSource<BaseRow> createTableSource(ObjectPath tablePath, CatalogTable table) { - Preconditions.checkNotNull(table); + public TableSource<BaseRow> createTableSource(TableSourceFactory.Context context) { + CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); if (!isGeneric) { - return createHiveTableSource(tablePath, table); + return createHiveTableSource(context.getObjectIdentifier().toObjectPath(), table); } else { - return TableFactoryUtil.findAndCreateTableSource(table); + return TableFactoryUtil.findAndCreateTableSource(context); } } @@ -95,16 +85,16 @@ public class HiveTableFactory } @Override - public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) { - Preconditions.checkNotNull(table); + public TableSink<Row> createTableSink(TableSinkFactory.Context context) { + CatalogTable table = checkNotNull(context.getTable()); Preconditions.checkArgument(table instanceof CatalogTableImpl); boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC)); if (!isGeneric) { - return createOutputFormatTableSink(tablePath, table); + return createOutputFormatTableSink(context.getObjectIdentifier().toObjectPath(), table); } else { - return TableFactoryUtil.findAndCreateTableSink(table); + return TableFactoryUtil.findAndCreateTableSink(context); } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java index f04d496..88dc191 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableFactoryTest.java @@ -18,16 +18,20 @@ package org.apache.flink.connectors.hive; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.factories.TableFactory; +import org.apache.flink.table.factories.TableSinkFactoryContextImpl; +import org.apache.flink.table.factories.TableSourceFactoryContextImpl; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.StreamTableSource; @@ -78,9 +82,11 @@ public class HiveTableFactoryTest { Optional<TableFactory> opt = catalog.getTableFactory(); assertTrue(opt.isPresent()); HiveTableFactory tableFactory = (HiveTableFactory) opt.get(); - TableSource tableSource = tableFactory.createTableSource(path, table); + TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl( + ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration())); assertTrue(tableSource instanceof StreamTableSource); - TableSink tableSink = tableFactory.createTableSink(path, table); + TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl( + ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration())); assertTrue(tableSink instanceof StreamTableSink); } @@ -100,9 +106,11 @@ public class HiveTableFactoryTest { Optional<TableFactory> opt = catalog.getTableFactory(); assertTrue(opt.isPresent()); HiveTableFactory tableFactory = (HiveTableFactory) opt.get(); - TableSink tableSink = tableFactory.createTableSink(path, table); + TableSink tableSink = tableFactory.createTableSink(new TableSinkFactoryContextImpl( + ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration())); assertTrue(tableSink instanceof HiveTableSink); - TableSource tableSource = tableFactory.createTableSource(path, table); + TableSource tableSource = tableFactory.createTableSource(new TableSourceFactoryContextImpl( + ObjectIdentifier.of("mycatalog", "mydb", "mytable"), table, new Configuration())); assertTrue(tableSource instanceof HiveTableSource); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java index 18cdea2..e5afef5 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java @@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; +import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -447,7 +448,7 @@ public class HiveTableSourceTest { HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get()); doReturn(new TestVectorReaderSource(new JobConf(hiveCatalog.getHiveConf()), tablePath, catalogTable)) - .when(tableFactorySpy).createTableSource(any(ObjectPath.class), any(CatalogTable.class)); + .when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class)); HiveCatalog catalogSpy = spy(hiveCatalog); doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index bea892e..beca21d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -89,20 +89,6 @@ public class TableFactoryUtil { } /** - * Returns a table sink matching the {@link org.apache.flink.table.catalog.CatalogTable}. - */ - public static <T> TableSink<T> findAndCreateTableSink(CatalogTable table) { - return findAndCreateTableSink(table.toProperties()); - } - - /** - * Returns a table source matching the {@link org.apache.flink.table.catalog.CatalogTable}. - */ - public static <T> TableSource<T> findAndCreateTableSource(CatalogTable table) { - return findAndCreateTableSource(table.toProperties()); - } - - /** * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog. */ public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) {
