This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 40a2128b2144ae47ba6a60b96e1a10719a94ef6f Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Jul 18 14:44:52 2019 +0200 [hotfix][table] Unify default catalog & builtin catalog naming --- .../java/org/apache/flink/table/api/Table.java | 2 +- .../table/api/internal/TableEnvironmentImpl.java | 21 +++++++--------- .../apache/flink/table/catalog/CatalogManager.java | 28 ++++++++++++++++------ .../flink/table/catalog/FunctionCatalog.java | 13 +++++----- .../table/api/scala/BatchTableEnvironment.scala | 4 ++-- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 5 ++-- .../flink/table/api/internal/TableEnvImpl.scala | 27 +++++++++++---------- 7 files changed, 56 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 0087f94..70350fa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -830,7 +830,7 @@ public interface Table { /** * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name - * in the initial default catalog. + * in the built-in catalog. * * <p>A batch {@link Table} can only be written to a * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 97d27f0..0b5d5fe 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -84,9 +84,6 @@ public class TableEnvironmentImpl implements TableEnvironment { // and this should always be true. This avoids too many hard code. private static final boolean IS_STREAM_TABLE = true; private final CatalogManager catalogManager; - - private final String builtinCatalogName; - private final String builtinDatabaseName; private final OperationTreeBuilder operationTreeBuilder; private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>(); @@ -106,10 +103,6 @@ public class TableEnvironmentImpl implements TableEnvironment { this.execEnv = executor; this.tableConfig = tableConfig; - // The current catalog and database are definitely builtin, - // see #create(EnvironmentSettings) - this.builtinCatalogName = catalogManager.getCurrentCatalog(); - this.builtinDatabaseName = catalogManager.getCurrentDatabase(); this.functionCatalog = functionCatalog; this.planner = planner; @@ -485,8 +478,8 @@ public class TableEnvironmentImpl implements TableEnvironment { protected void registerTableInternal(String name, CatalogBaseTable table) { try { checkValidTableName(name); - ObjectPath path = new ObjectPath(builtinDatabaseName, name); - Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName); + ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name); + Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName()); if (catalog.isPresent()) { catalog.get().createTable( path, @@ -500,8 +493,8 @@ public class TableEnvironmentImpl implements TableEnvironment { private void replaceTableInternal(String name, CatalogBaseTable table) { try { - ObjectPath path = new ObjectPath(builtinDatabaseName, name); - Optional<Catalog> catalog = catalogManager.getCatalog(builtinCatalogName); + ObjectPath path = new ObjectPath(catalogManager.getBuiltInDatabaseName(), name); + Optional<Catalog> catalog = catalogManager.getCatalog(catalogManager.getBuiltInCatalogName()); if (catalog.isPresent()) { catalog.get().alterTable( path, @@ -521,7 +514,8 @@ public class TableEnvironmentImpl implements TableEnvironment { private void registerTableSourceInternal(String name, TableSource<?> tableSource) { validateTableSource(tableSource); - Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name); + Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(), + catalogManager.getBuiltInDatabaseName(), name); if (table.isPresent()) { if (table.get() instanceof ConnectorCatalogTable<?, ?>) { @@ -546,7 +540,8 @@ public class TableEnvironmentImpl implements TableEnvironment { } private void registerTableSinkInternal(String name, TableSink<?> tableSink) { - Optional<CatalogBaseTable> table = getCatalogTable(builtinCatalogName, builtinDatabaseName, name); + Optional<CatalogBaseTable> table = getCatalogTable(catalogManager.getBuiltInCatalogName(), + catalogManager.getBuiltInDatabaseName(), name); if (table.isPresent()) { if (table.get() instanceof ConnectorCatalogTable<?, ?>) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index c5d0bc7..5933487 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -64,8 +64,8 @@ public class CatalogManager { private String currentDatabaseName; - // The name of the default catalog - private final String defaultCatalogName; + // The name of the built-in catalog + private final String builtInCatalogName; /** * Temporary solution to handle both {@link CatalogBaseTable} and @@ -128,7 +128,9 @@ public class CatalogManager { catalogs.put(defaultCatalogName, defaultCatalog); this.currentCatalogName = defaultCatalogName; this.currentDatabaseName = defaultCatalog.getDefaultDatabase(); - this.defaultCatalogName = defaultCatalogName; + + // right now the default catalog is always the built-in one + this.builtInCatalogName = defaultCatalogName; } /** @@ -298,12 +300,24 @@ public class CatalogManager { } /** - * Gets the default catalog name. + * Gets the built-in catalog name. The built-in catalog is used for storing all non-serializable + * transient meta-objects. + * + * @return the built-in catalog name + */ + public String getBuiltInCatalogName() { + return builtInCatalogName; + } + + /** + * Gets the built-in database name in the built-in catalog. The built-in database is used for storing + * all non-serializable transient meta-objects. * - * @return the default catalog + * @return the built-in database name */ - public String getDefaultCatalogName() { - return defaultCatalogName; + public String getBuiltInDatabaseName() { + // The default database of the built-in catalog is also the built-in database. + return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase(); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 01639b8..1faffde 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -150,9 +150,7 @@ public class FunctionCatalog implements FunctionLookup { .map(FunctionDefinition::toString) .collect(Collectors.toList())); - return result.stream() - .collect(Collectors.toList()) - .toArray(new String[0]); + return result.toArray(new String[0]); } @Override @@ -180,7 +178,7 @@ public class FunctionCatalog implements FunctionLookup { userCandidate) ); } else { - // TODO: should go thru function definition discover service + // TODO: should go through function definition discover service } } catch (FunctionNotExistException e) { // Ignore @@ -206,10 +204,11 @@ public class FunctionCatalog implements FunctionLookup { .map(Function.identity()); } - String defaultCatalogName = catalogManager.getDefaultCatalogName(); - return foundDefinition.map(definition -> new FunctionLookup.Result( - ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name), + ObjectIdentifier.of( + catalogManager.getBuiltInCatalogName(), + catalogManager.getBuiltInDatabaseName(), + name), definition) ); } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala index 6b3ecbc..516e6b1 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala @@ -295,11 +295,11 @@ object BatchTableEnvironment { classOf[ExecutionEnvironment], classOf[TableConfig], classOf[CatalogManager]) - val defaultCatalog = "default_catalog" + val builtInCatalog = "default_catalog" val catalogManager = new CatalogManager( "default_catalog", new GenericInMemoryCatalog( - defaultCatalog, + builtInCatalog, "default_database") ) const.newInstance(executionEnvironment, tableConfig, catalogManager) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 79631f3..0923bba 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -76,9 +76,10 @@ class FlinkRelMdHandlerTestBase { val tableConfig = new TableConfig() val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema() - val defaultCatalog = "default_catalog" + val builtinCatalog = "default_catalog" + val builtinDatabase = "default_database" val catalogManager = new CatalogManager( - defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database")) + builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase)) // TODO batch RelNode and stream RelNode should have different PlannerContext // and RelOptCluster due to they have different trait definitions. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 021a732..b75426c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -60,10 +60,6 @@ abstract class TableEnvImpl( private val catalogManager: CatalogManager) extends TableEnvironment { - // The current catalog and database are definitely builtin. - protected val builtinCatalogName: String = catalogManager.getCurrentCatalog - protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase - // Table API/SQL function catalog private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager) @@ -266,7 +262,9 @@ abstract class TableEnvImpl( tableSource: TableSource[_]) : Unit = { // register - getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match { + getCatalogTable( + catalogManager.getBuiltInCatalogName, + catalogManager.getBuiltInDatabaseName, name) match { // check if a table (source or sink) is registered case Some(table: ConnectorCatalogTable[_, _]) => @@ -293,7 +291,10 @@ abstract class TableEnvImpl( tableSink: TableSink[_]) : Unit = { // check if a table (source or sink) is registered - getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match { + getCatalogTable( + catalogManager.getBuiltInCatalogName, + catalogManager.getBuiltInDatabaseName, + name) match { // table source and/or sink is registered case Some(table: ConnectorCatalogTable[_, _]) => @@ -352,27 +353,29 @@ abstract class TableEnvImpl( protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = { checkValidTableName(name) - val path = new ObjectPath(builtinDatabaseName, name) - JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match { + val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name) + JavaScalaConversionUtil.toScala( + catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match { case Some(catalog) => catalog.createTable( path, table, false) - case None => throw new TableException("The default catalog does not exist.") + case None => throw new TableException("The built-in catalog does not exist.") } } protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = { checkValidTableName(name) - val path = new ObjectPath(builtinDatabaseName, name) - JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) match { + val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name) + JavaScalaConversionUtil.toScala( + catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match { case Some(catalog) => catalog.alterTable( path, table, false) - case None => throw new TableException("The default catalog does not exist.") + case None => throw new TableException("The built-in catalog does not exist.") } }