[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-21 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r286155068
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +315,142 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return The current default catalog name that is used for the path 
resolution.
+* @see TableEnvironment#useCatalog(String)
+*/
+   String getCurrentCatalog();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#useDatabase(String, String)}.
+*
+* This is used during the resolution of object paths. Both the 
catalog and database are optional
+* when referencing catalog objects(tables, views etc.). The algorithm 
looks for requested objects in following
+* paths in that order:
+* 
+* {@code 
[current-catalog].[current-database].[requested-path]}
+* {@code [current-catalog].[requested-path]}
+* {@code [requested-path]}
+* 
+*
+* Example:
+*
+* Given structure with default catalog set to {@code 
default-catalog} and default database set to
+* {@code default-database}.
+* 
+* root:
+*   |- default-catalog
+*   |- default-database
+*   |- tab1
+*   |- db1
+*   |- tab1
+*   |- cat1
+*   |- db1
+*   |- tab1
+* 
+*
+* The following table describes resolved paths:
+* 
+* 
+* 
+* Requested path
+* Resolved path
+* 
+* 
+* 
+* 
+* tab1
+* default-catalog.default-database.tab1
+* 
+* 
+* db1.tab1
+* default-catalog.db1.tab1
+* 
+* 
+* cat1.db1.tab1
+* cat1.db1.tab1
+* 
+* 
+* 
+*
+* @param catalogName The name of the catalog to set as the current 
default catalog.
+* @throws CatalogException thrown if a catalog with given name could 
not be set as the default one
+*/
+   void useCatalog(String catalogName);
+
+   /**
+* Gets the current default database name of the running session.
+*
+* @return The name of the current database of the current catalog.
+* @see TableEnvironment#useDatabase(String, String)
+*/
+   String getCurrentDatabase();
+
+   /**
+* Sets the current default catalog and database. That path will be 
used as the default one
+* when looking for unqualified object names.
+*
+* This is used during the resolution of object paths. Both the 
catalog and database are optional
+* when referencing catalog objects(tables, views etc.). The algorithm 
looks for requested objects in following
+* paths in that order:
+* 
+* {@code 
[current-catalog].[current-database].[requested-path]}
+* {@code [current-catalog].[requested-path]}
+* {@code [requested-path]}
+* 
+*
+* Example:
+*
+* Given structure with default catalog set to {@code 
default-catalog} and default database set to
+* {@code default-database}.
+* 
+* root:
+*   |- default-catalog
+*   |- default-database
+*   |- tab1
+*   |- db1
+*   |- tab1
+*   |- cat1
+*   |- db1
+*   |- tab1
+* 
+*
+* The following table describes resolved paths:
+* 
+* 
+* 
+* Requested path
+* Resolved path
+* 
+* 
+* 
+* 
+* tab1
+* default-catalog.default-database.tab1
+* 
+* 
+* db1.tab1
+* default-catalog.db1.tab1
+* 
+* 
+* cat1.db1.tab1
+* cat1.db1.tab1
+* 
+* 
+* 
+*
+* @param catalogName The name of the catalog to set as the current 
catalog.
+* @param databaseName The name of the database to set as the current 
database

[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-20 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r285694599
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
 ##
 @@ -129,16 +142,48 @@ public Context getContext() {
}
 
/**
-* Creates a configured {@link FrameworkConfig} for a planning session.
-*
-* @param defaultSchema the default schema to look for first during 
planning
-* @return configured framework config
+* Returns the SQL parser config for this environment including a 
custom Calcite configuration.
 */
-   public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
+   public SqlParser.Config getSqlParserConfig() {
+   return 
JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(()
 ->
+   // we use Java lex because back ticks are easier than 
double quotes in programming
+   // and cases are preserved
+   SqlParser
+   .configBuilder()
+   .setLex(Lex.JAVA)
+   .build());
+   }
+
+   private CatalogReader createCatalogReader(
+   boolean lenientCaseSensitivity,
+   String currentCatalog,
+   String currentDatabase) {
+   SqlParser.Config sqlParserConfig = getSqlParserConfig();
+   final boolean caseSensitive;
+   if (lenientCaseSensitivity) {
+   caseSensitive = false;
+   } else {
+   caseSensitive = sqlParserConfig.caseSensitive();
+   }
+
+   SqlParser.Config parserConfig = 
SqlParser.configBuilder(sqlParserConfig)
+   .setCaseSensitive(caseSensitive)
+   .build();
+
+   return new CatalogReader(
 
 Review comment:
   Great!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284928417
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
 ##
 @@ -129,16 +142,48 @@ public Context getContext() {
}
 
/**
-* Creates a configured {@link FrameworkConfig} for a planning session.
-*
-* @param defaultSchema the default schema to look for first during 
planning
-* @return configured framework config
+* Returns the SQL parser config for this environment including a 
custom Calcite configuration.
 */
-   public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
+   public SqlParser.Config getSqlParserConfig() {
+   return 
JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(()
 ->
+   // we use Java lex because back ticks are easier than 
double quotes in programming
+   // and cases are preserved
+   SqlParser
+   .configBuilder()
+   .setLex(Lex.JAVA)
+   .build());
+   }
+
+   private CatalogReader createCatalogReader(
+   boolean lenientCaseSensitivity,
+   String currentCatalog,
+   String currentDatabase) {
+   SqlParser.Config sqlParserConfig = getSqlParserConfig();
+   final boolean caseSensitive;
+   if (lenientCaseSensitivity) {
+   caseSensitive = false;
+   } else {
+   caseSensitive = sqlParserConfig.caseSensitive();
+   }
+
+   SqlParser.Config parserConfig = 
SqlParser.configBuilder(sqlParserConfig)
+   .setCaseSensitive(caseSensitive)
+   .build();
+
+   return new CatalogReader(
 
 Review comment:
   Maybe this is taken care of somewhere else, but I'm wondering how we tell 
Calcite to look up in new catalogs that are registered later on in user's 
session. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284923963
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 ##
 @@ -57,6 +57,18 @@
 */
private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
+   /**
+* Specifies the name of the initial catalog to be created when 
instantiating
+* TableEnvironment.
+*/
+   private String bultinCatalogName = "default-catalog";
+
+   /**
+* Specifies the name of the default database in the initial catalog to 
be created when instantiating
+* TableEnvironment.
+*/
+   private String bultinDatabaseName = "default-database";
 
 Review comment:
   Having '-' in the names might  confuse the parser as it can be read as a 
numeric minus operation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284882487
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   RelDataType relDataType = table.getRowType(typeFactory);
+
+   String[] fieldNames = relDataType.getFieldNames().toArray(new 
String[0]);
+   TypeInformation[] fieldTypes = relDataType.getFieldList()
+   .stream()
+   .map(field -> 
FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
+
+   return new TableSchema(fieldNames, fieldTypes);
+   }
+
+   @Override
+   public String getComment() {
+   return null;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return this;
 
 Review comment:
   Never mind then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284883458
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given dbName, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(new 
DatabaseNotExistException(catalogName, schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   return null;
+   }
+
+   @Override
+   public Set getTableNames() {
+   return new HashSet<>();
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return null;
 
 Review comment:
   I thought this was to resolve any type in the type system and I saw in Blink 
something other than null was returned. Anyway, I wasn't clear what this API is 
for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284883858
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink catalog's database and Calcite's schema.
+ * Tables are registered as tables in the schema.
+ */
+class DatabaseCalciteSchema implements Schema {
+   private final String dbName;
+   private final Catalog catalog;
+
+   public DatabaseCalciteSchema(String dbName, Catalog catalog) {
+   this.dbName = dbName;
+   this.catalog = catalog;
+   }
+
+   @Override
+   public Table getTable(String tableName) {
+
+   ObjectPath tablePath = new ObjectPath(dbName, tableName);
+
+   try {
+   if (!catalog.tableExists(tablePath)) {
+   return null;
+   }
+
+   CatalogBaseTable table = catalog.getTable(tablePath);
+
+   if (table instanceof CalciteCatalogTable) {
+   return ((CalciteCatalogTable) table).getTable();
+   } else {
+   throw new TableException("Unsupported table 
type: " + table);
 
 Review comment:
   Okay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-13 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283529723
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given name.

[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-13 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283529723
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given name.

[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283025643
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ##
 @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging {
 * @param externalTable the [[ExternalCatalogTable]] instance which to 
convert
 * @return converted [[TableSourceTable]] instance from the input catalog 
table
 */
-  def fromExternalCatalogTable[T1, T2](
-  tableEnv: TableEnvironment,
-  externalTable: ExternalCatalogTable)
+  def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: 
ExternalCatalogTable)
 : TableSourceSinkTable[T1, T2] = {
 
 val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
 
 val source: Option[TableSourceTable[T1]] = if 
(externalTable.isTableSource) {
-  Some(createTableSource(tableEnv, externalTable, statistics))
+  Some(createTableSource(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
-  Some(createTableSink(tableEnv, externalTable, statistics))
+  Some(createTableSink(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 new TableSourceSinkTable[T1, T2](source, sink)
   }
 
+  def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = {
+if (externalTable.isTableSource) {
+  
TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema
+} else {
+  val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable)
+  new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes)
+}
+  }
+
   private def createTableSource[T](
-  tableEnv: TableEnvironment,
+  isBatch: Boolean,
   externalTable: ExternalCatalogTable,
   statistics: FlinkStatistic)
-: TableSourceTable[T] = tableEnv match {
-
-case _: BatchTableEnvImpl if externalTable.isBatchTable =>
+: TableSourceTable[T] = {
+if (isBatch && externalTable.isBatchTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], 
statistics)
-
-case _: StreamTableEnvImpl if externalTable.isStreamTable =>
+} else if (!isBatch && externalTable.isStreamTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], 
statistics)
-
-case _ =>
+} else {
   throw new ValidationException(
 "External catalog table does not support the current environment for a 
table source.")
 
 Review comment:
   This message might need to change because it covers two cases: isBatch==true 
&& externalTable.isStreamTable, and isBatch==false && 
externalTable.isBatchTable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282981545
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given dbName, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(new 
DatabaseNotExistException(catalogName, schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   return null;
+   }
+
+   @Override
+   public Set getTableNames() {
+   return new HashSet<>();
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return null;
 
 Review comment:
   Returning "null" means no type with the given name is found. I'm not sure if 
that's desired, but feel free to add an TODO item for this if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282976905
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   RelDataType relDataType = table.getRowType(typeFactory);
+
+   String[] fieldNames = relDataType.getFieldNames().toArray(new 
String[0]);
+   TypeInformation[] fieldTypes = relDataType.getFieldList()
+   .stream()
+   .map(field -> 
FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
+
+   return new TableSchema(fieldNames, fieldTypes);
+   }
+
+   @Override
+   public String getComment() {
+   return null;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return this;
 
 Review comment:
   Maybe we throw an exception here because we are not able to provide a copy. 
Return just this might be dangerous because caller assumes a copy and make 
changes to the returned object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282967862
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
 
 Review comment:
   It might be better just to return an empty map instead of throwing an 
exception. By the definition, getProperties() return any additional properties 
a table might have. It doesn't mean the property form of the table.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282961373
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +316,55 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return the current default catalog that is used for path resolution
+* @see TableEnvironment#setCurrentCatalog(String)
+*/
+   String getCurrentCatalogName();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#setCurrentDatabase(String, String)}.
+*
+* This is used during resolution of object paths. The default path 
is constructed as
+* {@code [current-catalog].[current.database]}. During the resolution, 
first we try to look for
+* {@code [default-path].[object-path]} if no object is found we assume 
the object path is a fully
+* qualified one and we look for {@code [object-path]}.
+*
+* @param name name of the catalog to set as current default catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+   void setCurrentCatalog(String name) throws CatalogNotExistException;
+
+   /**
+* Gets the current default database name of the running session.
+*
+* @return the current database of the current catalog
+* @see TableEnvironment#setCurrentDatabase(String, String)
+*/
+   String getCurrentDatabaseName();
+
+   /**
+* Sets the current default catalog and database. That path will be 
used as the default one
+* when looking for unqualified object names.
+*
+* This is used during resolution of object paths. The default path 
is constructed as
+* {@code [current-catalog].[current.database]}. During the resolution, 
first we try to look for
 
 Review comment:
   should it be [current-database]?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-10 Thread GitBox
xuefuz commented on a change in pull request #8404: [FLINK-11476][table] Create 
CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r282893011
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +316,55 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return the current default catalog that is used for path resolution
+* @see TableEnvironment#setCurrentCatalog(String)
+*/
+   String getCurrentCatalogName();
 
 Review comment:
   Nit: I wonder if we just name it getCurrentCatalog(), which returns the name 
of the current catalog. (Same for get current database name. I understand that 
the given name is more explicit, but getCurrentCatalog() is a mirror of the set 
method below and is more consistent with  catalog APIs in which, every get 
(table, database) is named getXxx() instead of getXxxName().


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services