[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r286155068 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -280,6 +315,142 @@ */ void sqlUpdate(String stmt, QueryConfig config); + /** +* Gets the current default catalog name of the current session. +* +* @return The current default catalog name that is used for the path resolution. +* @see TableEnvironment#useCatalog(String) +*/ + String getCurrentCatalog(); + + /** +* Sets the current catalog to the given value. It also sets the default +* database to the catalog's default one. To assign both catalog and database explicitly +* see {@link TableEnvironment#useDatabase(String, String)}. +* +* This is used during the resolution of object paths. Both the catalog and database are optional +* when referencing catalog objects(tables, views etc.). The algorithm looks for requested objects in following +* paths in that order: +* +* {@code [current-catalog].[current-database].[requested-path]} +* {@code [current-catalog].[requested-path]} +* {@code [requested-path]} +* +* +* Example: +* +* Given structure with default catalog set to {@code default-catalog} and default database set to +* {@code default-database}. +* +* root: +* |- default-catalog +* |- default-database +* |- tab1 +* |- db1 +* |- tab1 +* |- cat1 +* |- db1 +* |- tab1 +* +* +* The following table describes resolved paths: +* +* +* +* Requested path +* Resolved path +* +* +* +* +* tab1 +* default-catalog.default-database.tab1 +* +* +* db1.tab1 +* default-catalog.db1.tab1 +* +* +* cat1.db1.tab1 +* cat1.db1.tab1 +* +* +* +* +* @param catalogName The name of the catalog to set as the current default catalog. +* @throws CatalogException thrown if a catalog with given name could not be set as the default one +*/ + void useCatalog(String catalogName); + + /** +* Gets the current default database name of the running session. +* +* @return The name of the current database of the current catalog. +* @see TableEnvironment#useDatabase(String, String) +*/ + String getCurrentDatabase(); + + /** +* Sets the current default catalog and database. That path will be used as the default one +* when looking for unqualified object names. +* +* This is used during the resolution of object paths. Both the catalog and database are optional +* when referencing catalog objects(tables, views etc.). The algorithm looks for requested objects in following +* paths in that order: +* +* {@code [current-catalog].[current-database].[requested-path]} +* {@code [current-catalog].[requested-path]} +* {@code [requested-path]} +* +* +* Example: +* +* Given structure with default catalog set to {@code default-catalog} and default database set to +* {@code default-database}. +* +* root: +* |- default-catalog +* |- default-database +* |- tab1 +* |- db1 +* |- tab1 +* |- cat1 +* |- db1 +* |- tab1 +* +* +* The following table describes resolved paths: +* +* +* +* Requested path +* Resolved path +* +* +* +* +* tab1 +* default-catalog.default-database.tab1 +* +* +* db1.tab1 +* default-catalog.db1.tab1 +* +* +* cat1.db1.tab1 +* cat1.db1.tab1 +* +* +* +* +* @param catalogName The name of the catalog to set as the current catalog. +* @param databaseName The name of the database to set as the current database
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r285694599 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java ## @@ -129,16 +142,48 @@ public Context getContext() { } /** -* Creates a configured {@link FrameworkConfig} for a planning session. -* -* @param defaultSchema the default schema to look for first during planning -* @return configured framework config +* Returns the SQL parser config for this environment including a custom Calcite configuration. */ - public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) { + public SqlParser.Config getSqlParserConfig() { + return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() -> + // we use Java lex because back ticks are easier than double quotes in programming + // and cases are preserved + SqlParser + .configBuilder() + .setLex(Lex.JAVA) + .build()); + } + + private CatalogReader createCatalogReader( + boolean lenientCaseSensitivity, + String currentCatalog, + String currentDatabase) { + SqlParser.Config sqlParserConfig = getSqlParserConfig(); + final boolean caseSensitive; + if (lenientCaseSensitivity) { + caseSensitive = false; + } else { + caseSensitive = sqlParserConfig.caseSensitive(); + } + + SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig) + .setCaseSensitive(caseSensitive) + .build(); + + return new CatalogReader( Review comment: Great! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284928417 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java ## @@ -129,16 +142,48 @@ public Context getContext() { } /** -* Creates a configured {@link FrameworkConfig} for a planning session. -* -* @param defaultSchema the default schema to look for first during planning -* @return configured framework config +* Returns the SQL parser config for this environment including a custom Calcite configuration. */ - public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) { + public SqlParser.Config getSqlParserConfig() { + return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() -> + // we use Java lex because back ticks are easier than double quotes in programming + // and cases are preserved + SqlParser + .configBuilder() + .setLex(Lex.JAVA) + .build()); + } + + private CatalogReader createCatalogReader( + boolean lenientCaseSensitivity, + String currentCatalog, + String currentDatabase) { + SqlParser.Config sqlParserConfig = getSqlParserConfig(); + final boolean caseSensitive; + if (lenientCaseSensitivity) { + caseSensitive = false; + } else { + caseSensitive = sqlParserConfig.caseSensitive(); + } + + SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig) + .setCaseSensitive(caseSensitive) + .build(); + + return new CatalogReader( Review comment: Maybe this is taken care of somewhere else, but I'm wondering how we tell Calcite to look up in new catalogs that are registered later on in user's session. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284923963 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java ## @@ -57,6 +57,18 @@ */ private Integer maxGeneratedCodeLength = 64000; // just an estimate + /** +* Specifies the name of the initial catalog to be created when instantiating +* TableEnvironment. +*/ + private String bultinCatalogName = "default-catalog"; + + /** +* Specifies the name of the default database in the initial catalog to be created when instantiating +* TableEnvironment. +*/ + private String bultinDatabaseName = "default-database"; Review comment: Having '-' in the names might confuse the parser as it can be read as a numeric minus operation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284882487 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.Table; + +import java.util.Map; +import java.util.Optional; + +/** + * Thin wrapper around Calcite specific {@link Table}, this is a temporary solution + * that allows to register those tables in the {@link CatalogManager}. + * TODO remove once we decouple TableEnvironment from Calcite. + */ +@Internal +public class CalciteCatalogTable implements CatalogBaseTable { + private final Table table; + private final FlinkTypeFactory typeFactory; + + public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) { + this.table = table; + this.typeFactory = typeFactory; + } + + public Table getTable() { + return table; + } + + @Override + public Map getProperties() { + throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); + } + + @Override + public TableSchema getSchema() { + RelDataType relDataType = table.getRowType(typeFactory); + + String[] fieldNames = relDataType.getFieldNames().toArray(new String[0]); + TypeInformation[] fieldTypes = relDataType.getFieldList() + .stream() + .map(field -> FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new); + + return new TableSchema(fieldNames, fieldTypes); + } + + @Override + public String getComment() { + return null; + } + + @Override + public CatalogBaseTable copy() { + return this; Review comment: Never mind then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284883458 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final Catalog catalog; + + public CatalogCalciteSchema(String catalogName, Catalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Look up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName name of sub-schema to look up +* @return the sub-schema with a given dbName, or null +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException(new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + return null; + } + + @Override + public Set getTableNames() { + return new HashSet<>(); + } + + @Override + public RelProtoDataType getType(String name) { + return null; Review comment: I thought this was to resolve any type in the type system and I saw in Blink something other than null was returned. Anyway, I wasn't clear what this API is for. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r284883858 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink catalog's database and Calcite's schema. + * Tables are registered as tables in the schema. + */ +class DatabaseCalciteSchema implements Schema { + private final String dbName; + private final Catalog catalog; + + public DatabaseCalciteSchema(String dbName, Catalog catalog) { + this.dbName = dbName; + this.catalog = catalog; + } + + @Override + public Table getTable(String tableName) { + + ObjectPath tablePath = new ObjectPath(dbName, tableName); + + try { + if (!catalog.tableExists(tablePath)) { + return null; + } + + CatalogBaseTable table = catalog.getTable(tablePath); + + if (table instanceof CalciteCatalogTable) { + return ((CalciteCatalogTable) table).getTable(); + } else { + throw new TableException("Unsupported table type: " + table); Review comment: Okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283529723 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getCurrentDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already taken +*/ + public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogAlreadyExistsException(catalogName); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Catalog getCatalog(String catalogName) throws CatalogNotExistException { + if (!catalogs.keySet().contains(catalogName)) { + throw new CatalogNotExistException(catalogName); + } + + return catalogs.get(catalogName); + } + + /** +* Registers an external catalog under the given name.
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283529723 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.api.ExternalCatalogAlreadyExistException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.CatalogTableOperation; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager that encapsulates all available catalogs. It also implements the logic of + * table path resolution. Supports both new API ({@link ReadableCatalog} as well as {@link ExternalCatalog}. + */ +@Internal +public class CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); + + // A map between names and catalogs. + private Map catalogs; + + // TO BE REMOVED along with ExternalCatalog API + private Map externalCatalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + private String currentDatabaseName; + + public CatalogManager(String defaultCatalogName, Catalog defaultCatalog) { + catalogs = new LinkedHashMap<>(); + externalCatalogs = new LinkedHashMap<>(); + catalogs.put(defaultCatalogName, defaultCatalog); + this.currentCatalogName = defaultCatalogName; + this.currentDatabaseName = defaultCatalog.getCurrentDatabase(); + } + + /** +* Registers a catalog under the given name. The catalog name must be unique across both +* {@link Catalog}s and {@link ExternalCatalog}s. +* +* @param catalogName name under which to register the given catalog +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already taken +*/ + public void registerCatalog(String catalogName, Catalog catalog) throws CatalogAlreadyExistsException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be null or empty."); + checkNotNull(catalog, "Catalog cannot be null"); + + if (catalogs.containsKey(catalogName) || externalCatalogs.containsKey(catalogName)) { + throw new CatalogAlreadyExistsException(catalogName); + } + + catalogs.put(catalogName, catalog); + catalog.open(); + } + + /** +* Gets a catalog by name. +* +* @param catalogName name of the catalog to retrieve +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @see CatalogManager#getExternalCatalog(String) +*/ + public Catalog getCatalog(String catalogName) throws CatalogNotExistException { + if (!catalogs.keySet().contains(catalogName)) { + throw new CatalogNotExistException(catalogName); + } + + return catalogs.get(catalogName); + } + + /** +* Registers an external catalog under the given name.
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r283025643 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala ## @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging { * @param externalTable the [[ExternalCatalogTable]] instance which to convert * @return converted [[TableSourceTable]] instance from the input catalog table */ - def fromExternalCatalogTable[T1, T2]( - tableEnv: TableEnvironment, - externalTable: ExternalCatalogTable) + def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: ExternalCatalogTable) : TableSourceSinkTable[T1, T2] = { val statistics = new FlinkStatistic(toScala(externalTable.getTableStats)) val source: Option[TableSourceTable[T1]] = if (externalTable.isTableSource) { - Some(createTableSource(tableEnv, externalTable, statistics)) + Some(createTableSource(isBatch, externalTable, statistics)) } else { None } val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) { - Some(createTableSink(tableEnv, externalTable, statistics)) + Some(createTableSink(isBatch, externalTable, statistics)) } else { None } new TableSourceSinkTable[T1, T2](source, sink) } + def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = { +if (externalTable.isTableSource) { + TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema +} else { + val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable) + new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes) +} + } + private def createTableSource[T]( - tableEnv: TableEnvironment, + isBatch: Boolean, externalTable: ExternalCatalogTable, statistics: FlinkStatistic) -: TableSourceTable[T] = tableEnv match { - -case _: BatchTableEnvImpl if externalTable.isBatchTable => +: TableSourceTable[T] = { +if (isBatch && externalTable.isBatchTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], statistics) - -case _: StreamTableEnvImpl if externalTable.isStreamTable => +} else if (!isBatch && externalTable.isStreamTable) { val source = TableFactoryUtil.findAndCreateTableSource(externalTable) new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], statistics) - -case _ => +} else { throw new ValidationException( "External catalog table does not support the current environment for a table source.") Review comment: This message might need to change because it covers two cases: isBatch==true && externalTable.isStreamTable, and isBatch==false && externalTable.isBatchTable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282981545 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final Catalog catalog; + + public CatalogCalciteSchema(String catalogName, Catalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Look up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName name of sub-schema to look up +* @return the sub-schema with a given dbName, or null +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException(new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + return null; + } + + @Override + public Set getTableNames() { + return new HashSet<>(); + } + + @Override + public RelProtoDataType getType(String name) { + return null; Review comment: Returning "null" means no type with the given name is found. I'm not sure if that's desired, but feel free to add an TODO item for this if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282976905 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.Table; + +import java.util.Map; +import java.util.Optional; + +/** + * Thin wrapper around Calcite specific {@link Table}, this is a temporary solution + * that allows to register those tables in the {@link CatalogManager}. + * TODO remove once we decouple TableEnvironment from Calcite. + */ +@Internal +public class CalciteCatalogTable implements CatalogBaseTable { + private final Table table; + private final FlinkTypeFactory typeFactory; + + public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) { + this.table = table; + this.typeFactory = typeFactory; + } + + public Table getTable() { + return table; + } + + @Override + public Map getProperties() { + throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); + } + + @Override + public TableSchema getSchema() { + RelDataType relDataType = table.getRowType(typeFactory); + + String[] fieldNames = relDataType.getFieldNames().toArray(new String[0]); + TypeInformation[] fieldTypes = relDataType.getFieldList() + .stream() + .map(field -> FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new); + + return new TableSchema(fieldNames, fieldTypes); + } + + @Override + public String getComment() { + return null; + } + + @Override + public CatalogBaseTable copy() { + return this; Review comment: Maybe we throw an exception here because we are not able to provide a copy. Return just this might be dangerous because caller assumes a copy and make changes to the returned object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282967862 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.calcite.FlinkTypeFactory; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.schema.Table; + +import java.util.Map; +import java.util.Optional; + +/** + * Thin wrapper around Calcite specific {@link Table}, this is a temporary solution + * that allows to register those tables in the {@link CatalogManager}. + * TODO remove once we decouple TableEnvironment from Calcite. + */ +@Internal +public class CalciteCatalogTable implements CatalogBaseTable { + private final Table table; + private final FlinkTypeFactory typeFactory; + + public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) { + this.table = table; + this.typeFactory = typeFactory; + } + + public Table getTable() { + return table; + } + + @Override + public Map getProperties() { + throw new UnsupportedOperationException("Calcite table cannot be expressed as a map of properties."); Review comment: It might be better just to return an empty map instead of throwing an exception. By the definition, getProperties() return any additional properties a table might have. It doesn't mean the property form of the table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282961373 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -280,6 +316,55 @@ */ void sqlUpdate(String stmt, QueryConfig config); + /** +* Gets the current default catalog name of the current session. +* +* @return the current default catalog that is used for path resolution +* @see TableEnvironment#setCurrentCatalog(String) +*/ + String getCurrentCatalogName(); + + /** +* Sets the current catalog to the given value. It also sets the default +* database to the catalog's default one. To assign both catalog and database explicitly +* see {@link TableEnvironment#setCurrentDatabase(String, String)}. +* +* This is used during resolution of object paths. The default path is constructed as +* {@code [current-catalog].[current.database]}. During the resolution, first we try to look for +* {@code [default-path].[object-path]} if no object is found we assume the object path is a fully +* qualified one and we look for {@code [object-path]}. +* +* @param name name of the catalog to set as current default catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + void setCurrentCatalog(String name) throws CatalogNotExistException; + + /** +* Gets the current default database name of the running session. +* +* @return the current database of the current catalog +* @see TableEnvironment#setCurrentDatabase(String, String) +*/ + String getCurrentDatabaseName(); + + /** +* Sets the current default catalog and database. That path will be used as the default one +* when looking for unqualified object names. +* +* This is used during resolution of object paths. The default path is constructed as +* {@code [current-catalog].[current.database]}. During the resolution, first we try to look for Review comment: should it be [current-database]? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs URL: https://github.com/apache/flink/pull/8404#discussion_r282893011 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java ## @@ -280,6 +316,55 @@ */ void sqlUpdate(String stmt, QueryConfig config); + /** +* Gets the current default catalog name of the current session. +* +* @return the current default catalog that is used for path resolution +* @see TableEnvironment#setCurrentCatalog(String) +*/ + String getCurrentCatalogName(); Review comment: Nit: I wonder if we just name it getCurrentCatalog(), which returns the name of the current catalog. (Same for get current database name. I understand that the given name is more explicit, but getCurrentCatalog() is a mirror of the set method below and is more consistent with catalog APIs in which, every get (table, database) is named getXxx() instead of getXxxName(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services