[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269042924
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
 
 Review comment:
   How can we list all of the tables(not views)? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269043100
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
+
+   /**
+* Gets a CatalogTable or CatalogView identified by objectPath.
+*
+* @param objectPathPath of the table or view
+* @throws TableNotExistException if the target does not exist
+* @return The requested table or view
+*/
+   CommonTable getTable(ObjectPath objectPath) throws 
TableNotExistException;
+
+   /**
+* Checks if a table or view exists in this catalog.
+*
+* @param objectPathPath of the table or view
+*/
+   boolean tableExists(ObjectPath objectPath);
+
+   /**
+* Gets paths of all views under this database. An empty list is 
returned if none exists.
+*
+* @param databaseName the name of the given database
+* @return the list of the names of all views in the given database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listViews(String databaseName) throws 
DatabaseNotExistException;
+
+
 
 Review comment:
   Remove blank line


This is an automated message from the Apache Git Service.
To respond to the message, please 

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269035636
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/CatalogTable.java
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.plan.stats.TableStats;
+
+/**
+ * Represents a table in a catalog.
+ */
+public interface CatalogTable extends CommonTable {
+   TableStats getStatistics();
 
 Review comment:
   Please add the java doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269036107
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/CatalogView.java
 ##
 @@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+/**
+ * Represents a view in a catalog.
+ */
+public interface CatalogView extends CommonTable {
+
+   /**
+* Original text of the view definition.
+*
+* @return the original string literal provided by the user.
+*/
+   String getOriginalQuery();
+
+   /**
+* Expanded text of the original view definition
+* This is needed because the context such as current DB is
+* lost after the session, in which view is defined, is gone.
+* Expanded query text takes care of this, as an example.
+*
+* For example, for a view that is defined in the context of 
"default" database with a query "select * from test1",
+* the expanded query text might become "select `test1`.`name`, 
`test1`.`value` from `default`.`test1`", where
 
 Review comment:
   We can use @code if there are some sql code. what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269035396
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public class CatalogDatabase {
+   // Property map for the database.
+   private final Map properties;
+
+   public CatalogDatabase() {
 
 Review comment:
   Do we need a `name` member, something like `databaseName `?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269045279
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import 
org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import 
org.apache.flink.table.api.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Create a database.
+*
+* @param name   Name of the database to be created
+* @param database   The database definition
+* @param ignoreIfExists Flag to specify behavior when a database with 
the given name already exists:
+*   if set to false, throw a 
DatabaseAlreadyExistException,
+*   if set to true, do nothing.
+* @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists is false
+*/
+   void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Drop a database.
+*
+* @param name  Name of the database to be dropped.
+* @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modify an existing database.
+*
+* @param nameName of the database to be modified
+* @param newDatabaseThe new database definition
+* @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void alterDatabase(String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   /**
+* Rename an existing database.
+*
+* @param nameName of the database to be renamed
+* @param newName the new name of the database
+* @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   void renameDatabase(String name, String newName, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   // -- tables and views --
+
+   /**
+* Drop a table or view.
+*
+* @param tablePath Path of the table or view to be dropped
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table or view does not exist
+*/
+   void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws 
TableNotExistException;
+
+   // -- tables --
+

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269039472
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
+   checkNotNull(objectName, "objectName cannot be null");
+
+   this.databaseName = databaseName;
+   this.objectName = objectName;
+   }
+
+   public String getDatabaseName() {
+   return databaseName;
+   }
+
+   public String getObjectName() {
+   return objectName;
+   }
+
+   public String getFullName() {
+   return String.format("%s.%s", databaseName, objectName);
+   }
+
+   public static ObjectPath fromString(String fullName) {
+   String[] paths = fullName.split("\\.");
 
 Review comment:
   Please check null case for fullname?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269034814
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public class CatalogDatabase {
+   // Property map for the database.
 
 Review comment:
   How about using the interface?  we may have `HiveCatalogDatabase`, 
`MySqlCatalogDatabase` etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269044922
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import 
org.apache.flink.table.api.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import 
org.apache.flink.table.api.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269036820
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/CommonTable.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * CommonTable is the common parent of table and view. It has a map of
+ * key-value pairs defining the properties of the table.
+ */
+public interface CommonTable {
+   /**
 
 Review comment:
   Do we need to add a `getName` method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269044748
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
+
+   /**
+* Gets a CatalogTable or CatalogView identified by objectPath.
+*
+* @param objectPathPath of the table or view
+* @throws TableNotExistException if the target does not exist
+* @return The requested table or view
+*/
+   CommonTable getTable(ObjectPath objectPath) throws 
TableNotExistException;
+
+   /**
+* Checks if a table or view exists in this catalog.
+*
+* @param objectPathPath of the table or view
+*/
+   boolean tableExists(ObjectPath objectPath);
+
+   /**
+* Gets paths of all views under this database. An empty list is 
returned if none exists.
+*
+* @param databaseName the name of the given database
+* @return the list of the names of all views in the given database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listViews(String databaseName) throws 
DatabaseNotExistException;
+
+
+   /**
+* Gets the statistics of a table.
+*
+* @param tablePath Path of the table
+* @return The statistics of the given table
+* @throws Ta

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269040649
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
 
 Review comment:
   Do we need this comment? if so, I suggest moving it to 
getDefaultDatabaseName part, What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269038738
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
 
 Review comment:
   What are the format restrictions for `dataBaseName` and `objectName`, such 
as special characters? e.g.: '.', duo to we using `fullName.split("\\.")`. 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-03-27 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r269538937
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
 
 Review comment:
   I think we can add a TableType, such as `BASE_TABLE`, `VIEW`, and SQL server 
did in this way, please see the detail: 
https://chartio.com/resources/tutorials/sql-server-list-tables-how-to-show-all-tables/
   welcome any feedback!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271987200
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.catalog;
+
+import org.apache.flink.table.api.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.api.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the default database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the default db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the default database
+*/
+   String getDefaultDatabaseName();
+
+   /**
+* Set the default database. A default database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setDefaultDatabaseName(String databaseName);
+
+   // -- databases --
+   /**
+* Get the names of all databases in this catalog.
+*
+* @return The list of the names of all databases
+*/
+   List listDatabases();
+
+   /**
+* Get a database from this catalog.
+*
+* @param dbNameName of the database
+* @return The requested database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   CatalogDatabase getDatabase(String dbName) throws 
DatabaseNotExistException;
+
+   /**
+* Check if a database exists in this catalog.
+*
+* @param dbNameName of the database
+*/
+   boolean databaseExists(String dbName);
+
+   /**
+* Gets paths of all tables and views under this database. An empty 
list is returned if none exists.
+*
+* @return A list of the names of all tables and views in this database
+* @throws DatabaseNotExistException if the database does not exist
+*/
+   List listTables(String dbName) throws DatabaseNotExistException;
 
 Review comment:
   Yes, but do you mind telling me the real reason why we do not need let user 
list all of the tables(not views) directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271995866
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
+   checkNotNull(objectName, "objectName cannot be null");
+
+   this.databaseName = databaseName;
+   this.objectName = objectName;
+   }
+
+   public String getDatabaseName() {
+   return databaseName;
+   }
+
+   public String getObjectName() {
+   return objectName;
+   }
+
+   public String getFullName() {
+   return String.format("%s.%s", databaseName, objectName);
+   }
+
+   public static ObjectPath fromString(String fullName) {
+   String[] paths = fullName.split("\\.");
 
 Review comment:
   Please check the null case for `fullname`? 
   Which I had left the comment in pre-review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990551
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
+   private final Map properties;
+   private String comment;
+
+   public GenericCatalogDatabase() {
+   this(new HashMap<>(), null);
 
 Review comment:
   Do we need to add the default comment, something like:  `This is a generic 
catalog database.`? then make it consistent with the comment in 
GenericCatalogTable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271997186
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String dbName, boolean ignoreIfNotExists) 
throws DatabaseNotExistException {
+   if (databases.containsKey(dbName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(dbName)) {
+   databases.remove(dbName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, dbName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
dbName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
+   // TODO: also check function when function is added.
+   }
+
+   @Override
+   public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+   throws DatabaseNotExistException {
+   if (databaseExists(databaseName)) {
+   databases.put(databaseName, newDataba

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996645
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
 
 Review comment:
   Add @BeforeClass for  `open` logic?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990685
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
 
 Review comment:
   private final ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271999138
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 Review comment:
   Do we need `Generic` prefix? How about using `InMemoryCatalog` directly?  Or 
you already see the requirement for many kinds of `InMemoryCatalog`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991218
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * A generic catalog view implementation.
+ */
+public class GenericCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   private TableSchema schema;
 
 Review comment:
   private final ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271995584
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A database name and object (table/view/function) name combo in a catalog.
+ */
+public class ObjectPath {
+   private final String databaseName;
+   private final String objectName;
+
+   public ObjectPath(String databaseName, String objectName) {
+   checkNotNull(databaseName, "databaseName cannot be null");
+   checkNotNull(objectName, "objectName cannot be null");
 
 Review comment:
   What are the format restrictions for dataBaseName and objectName, such as 
special characters? e.g.: '.', duo to we using fullName.split("\\."). right?  
So, it is better to add some check logic.
   I had left the comment in pre-review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996542
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
 
 Review comment:
   You want to set `catalogName` right? So, we should change `db1` to 
`testCatalogName`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271988554
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CommonTable.java
 ##
 @@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * CommonTable is the common parent of table and view. It has a map of
+ * key-value pairs defining the properties of the table.
+ */
+public interface CommonTable {
+   /**
+* Get the properties of the table.
+* @return table property map
+*/
+   Map getProperties();
+
+   /**
+* Get the schema of the table.
+* @return schema of the table
+*/
+   TableSchema getSchema();
+
+
 
 Review comment:
   Remove empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271994232
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utility class for catalog testing.
+ */
+public class CatalogTestUtil {
+
+   public static GenericCatalogTable createTable() {
+   TableSchema tableSchema = 
TableSchema.fromTypeInfo(getRowTypeInfo());
+   return createTable(tableSchema, createTableStats(), new 
HashMap());
+   }
+
+   public static RowTypeInfo getRowTypeInfo() {
+   return new RowTypeInfo(
+   new TypeInformation[] {BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO},
+   new String[] {"a", "b"});
+   }
+
+   public static TableStats createTableStats() {
+   return new TableStats(2);
+   }
+
+   public static GenericCatalogTable createTable(TableSchema schema, 
Map tableProperties) {
+   return createTable(schema, new TableStats(0), tableProperties);
+   }
+
+   public static GenericCatalogTable createTable(TableSchema schema, 
TableStats stats,
+   Map tableProperties) {
+   return new GenericCatalogTable(schema, stats, tableProperties);
+   }
+
+   public static void compare(GenericCatalogTable t1, GenericCatalogTable 
t2) {
 
 Review comment:
   Maybe change `compare` to `assertEquals` is more accurate? due to `compare` 
in general will return an INT.  What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991296
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * A generic catalog view implementation.
+ */
+public class GenericCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   private TableSchema schema;
+   private Map properties;
+   private String comment;
+
+   public GenericCatalogView(String originalQuery, String expandedQuery, 
TableSchema schema,
+   Map properties, String comment) {
+   this.originalQuery = originalQuery;
+   this.expandedQuery = expandedQuery;
+   this.schema = schema;
+   this.properties = properties;
+   this.comment = comment;
+   }
+
+   @Override
+   public String getOriginalQuery() {
+   return this.originalQuery;
+   }
+
+   @Override
+   public String getExpandedQuery() {
+   return this.expandedQuery;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return schema;
+   }
+
+   @Override
+   public GenericCatalogView copy() {
+   return new GenericCatalogView(this.originalQuery, 
this.expandedQuery, schema,
 
 Review comment:
   I think we should perform a deep copy. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271993087
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String dbName, boolean ignoreIfNotExists) 
throws DatabaseNotExistException {
+   if (databases.containsKey(dbName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(dbName)) {
+   databases.remove(dbName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, dbName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
dbName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
+   // TODO: also check function when function is added.
+   }
+
+   @Override
+   public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+   throws DatabaseNotExistException {
+   if (databaseExists(databaseName)) {
+   databases.put(databaseName, newDataba

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990956
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
+   // Comment of the table
+   private String comment = "This is a generic catalog table.";
+
+   public GenericCatalogTable(TableSchema tableSchema, TableStats 
tableStats, Map properties) {
+   this.tableSchema = tableSchema;
+   this.tableStats = tableStats;
+   this.properties = properties;
+   }
+
+   @Override
+   public TableStats getStatistics() {
+   return this.tableStats;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return this.tableSchema;
+   }
+
+   @Override
+   public GenericCatalogTable copy() {
+   return new GenericCatalogTable(this.tableSchema, 
this.tableStats, this.properties);
 
 Review comment:
   I think we should perform a deep copy as it is described in the interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271991771
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String defaultDatabaseName = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase());
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getDefaultDatabaseName() {
+   return defaultDatabaseName;
+   }
+
+   @Override
+   public void setDefaultDatabaseName(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   defaultDatabaseName = databaseName;
 
 Review comment:
   We should also change the name of the default database in this.databases. 
otherwise, will get empty when call database.get(defaultDatabaseName), right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271996701
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for GenericInMemoryCatalog.
+ */
+public class GenericInMemoryCatalogTest {
+   private static final String IS_STREAMING = "is_streaming";
+
+   private final String testCatalogName = "test-catalog";
+   private final String db1 = "db1";
+   private final String db2 = "db2";
+
+   private final String t1 = "t1";
+   private final String t2 = "t2";
+   private final ObjectPath path1 = new ObjectPath(db1, t1);
+   private final ObjectPath path2 = new ObjectPath(db2, t2);
+   private final ObjectPath path3 = new ObjectPath(db1, t2);
+   private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
+   private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
+
+   private static final String TEST_COMMENT = "test comment";
+
+   private static ReadableWritableCatalog catalog;
+
+   @Before
+   public void setUp() {
+   catalog = new GenericInMemoryCatalog(db1);
+   }
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
+
+   @After
+   public void close() {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+   @AfterClass
+   public static void clean() throws IOException {
+   catalog.close();
+   }
+
+   // -- tables --
+
+   @Test
+   public void testCreateTable_Streaming() {
+   catalog.createDatabase(db1, createDb(), false);
+   GenericCatalogTable table = createStreamingTable();
+   catalog.createTable(path1, table, false);
+
+   CatalogTestUtil.compare(table, (GenericCatalogTable) 
catalog.getTable(path1));
+   }
+
+   @Test
+   public void testCreateTable_Batch() {
+   catalog.createDatabase(db1, createDb(), false);
+
+   GenericCatalogTable table = createTable();
+   catalog.createTable(path1, table, false);
+
+   CatalogTestUtil.compare(table, (GenericCatalogTable) 
catalog.getTable(path1));
+
+   List tables = cat

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-03 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r271990155
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
 
 Review comment:
   We also need to override equals, hashCode and toString(Also for other 
generic classes).
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273318161
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogDatabase.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A generic catalog database implementation.
+ */
+public class GenericCatalogDatabase implements CatalogDatabase {
 
 Review comment:
   I think it's better to overwrite. But I am fine if you do not want to do it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-08 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273327251
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
 
 Review comment:
   Make sense to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273762199
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.Map;
+
+/**
+ * A generic catalog table implementation.
+ */
+public class GenericCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Properties of the table
+   private final Map properties;
+   // Statistics of the table
+   private TableStats tableStats = null;
 
 Review comment:
   I meant that we should add  `final`.  due to we only init it in the 
construct method. right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273781418
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 ##
 @@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import java.util.List;
+
+/**
+ * This interface is responsible for reading database/table/views/UDFs from a 
registered catalog.
+ * It connects a registered catalog and Flink's Table API.
+ */
+public interface ReadableCatalog {
+
+   /**
+* Open the catalog. Used for any required preparation in 
initialization phase.
+*/
+   void open();
+
+   /**
+* Close the catalog when it is no longer needed and release any 
resource that it might be holding.
+*/
+   void close();
+
+   /**
+* Get the current database of this type of catalog. This is used when 
users refers an object in the catalog
+* without specifying a database. For example, the current db in a Hive 
Metastore is 'default' by default.
+*
+* @return the name of the current database
+*/
+   String getCurrentDatabase();
+
+   /**
+* Set the current database. A current database is used when users 
refers an object in the catalog
+* without specifying a database.
+*
+* @param databaseName  the name of the database
+*/
+   void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException;
+
+   // -- databases --
 
 Review comment:
   Move // -- databases -- before the getCurrentDatabase method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273786560
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseN

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273784986
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Create a database.
+*
+* @param name   Name of the database to be created
+* @param database   The database definition
+* @param ignoreIfExists Flag to specify behavior when a database with 
the given name already exists:
+*   if set to false, throw a 
DatabaseAlreadyExistException,
+*   if set to true, do nothing.
+* @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists is false
+*/
+   void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Drop a database.
+*
+* @param name  Name of the database to be dropped.
+* @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modify an existing database.
+*
+* @param nameName of the database to be modified
+* @param newDatabaseThe new database definition
+* @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void alterDatabase(String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   // -- tables and views --
+
+   /**
+* Drop a table or view.
+*
+* @param tablePath Path of the table or view to be dropped
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table or view does not exist
+*/
+   void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws 
TableNotExistException;
+
+   /**
+* Rename an existing table or view.
+*
+* @param tablePath   Path of the table or view to rename
+* @param newTableName the new name of the table or view
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table does not exist
+* @throws DatabaseNotExistException if the database in tablePath to 
doesn't exist
+*/
+   void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExis

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273785242
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseN

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273781229
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
 
 Review comment:
   Move // -- databases -- before the getCurrentDatabase method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273786622
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseN

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273762720
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericCatalogView.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.util.Map;
+
+/**
+ * A generic catalog view implementation.
+ */
+public class GenericCatalogView implements CatalogView {
+   // Original text of the view definition.
+   private final String originalQuery;
+
+   // Expanded text of the original view definition
+   // This is needed because the context such as current DB is
+   // lost after the session, in which view is defined, is gone.
+   // Expanded query text takes care of the this, as an example.
+   private final String expandedQuery;
+
+   private TableSchema schema;
 
 Review comment:
   `private TableSchema schema` -> `private final TableSchema schema`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-09 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r273804286
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+/**
+ * An interface responsible for manipulating catalog metadata.
+ */
+public interface ReadableWritableCatalog extends ReadableCatalog {
+
+   // -- databases --
+
+   /**
+* Create a database.
+*
+* @param name   Name of the database to be created
+* @param database   The database definition
+* @param ignoreIfExists Flag to specify behavior when a database with 
the given name already exists:
+*   if set to false, throw a 
DatabaseAlreadyExistException,
+*   if set to true, do nothing.
+* @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists is false
+*/
+   void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+   throws DatabaseAlreadyExistException;
+
+   /**
+* Drop a database.
+*
+* @param name  Name of the database to be dropped.
+* @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException;
+
+   /**
+* Modify an existing database.
+*
+* @param nameName of the database to be modified
+* @param newDatabaseThe new database definition
+* @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws DatabaseNotExistException if the given database does not 
exist
+*/
+   void alterDatabase(String name, CatalogDatabase newDatabase, boolean 
ignoreIfNotExists)
+   throws DatabaseNotExistException;
+
+   // -- tables and views --
+
+   /**
+* Drop a table or view.
+*
+* @param tablePath Path of the table or view to be dropped
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table or view does not exist
+*/
+   void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws 
TableNotExistException;
+
+   /**
+* Rename an existing table or view.
+*
+* @param tablePath   Path of the table or view to rename
+* @param newTableName the new name of the table or view
+* @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+*  if set to false, throw an exception,
+*  if set to true, do nothing.
+* @throws TableNotExistException if the table does not exist
+* @throws DatabaseNotExistException if the database in tablePath to 
doesn't exist
+*/
+   void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExis

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-10 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274267157
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseN

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-10 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274267713
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A generic catalog implementation that holds all meta objects in memory.
+ */
+public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+
+   public static final String DEFAULT_DB = "default";
+
+   private String currentDatabase = DEFAULT_DB;
+
+   private final String catalogName;
+   private final Map databases;
+   private final Map tables;
+
+   public GenericInMemoryCatalog(String name) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
+
+   this.catalogName = name;
+   this.databases = new LinkedHashMap<>();
+   this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
+   this.tables = new LinkedHashMap<>();
+   }
+
+   @Override
+   public String getCurrentDatabase() {
+   return currentDatabase;
+   }
+
+   @Override
+   public void setCurrentDatabase(String databaseName) throws 
DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (!databaseExists(databaseName)) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+
+   currentDatabase = databaseName;
+   }
+
+   @Override
+   public void open() {
+
+   }
+
+   @Override
+   public void close() {
+
+   }
+
+   // -- databases --
+
+   @Override
+   public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
+   throws DatabaseAlreadyExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+   checkArgument(db != null);
+
+   if (databaseExists(databaseName)) {
+   if (!ignoreIfExists) {
+   throw new 
DatabaseAlreadyExistException(catalogName, databaseName);
+   }
+   } else {
+   databases.put(databaseName, db.copy());
+   }
+   }
+
+   @Override
+   public void dropDatabase(String databaseName, boolean 
ignoreIfNotExists) throws DatabaseNotExistException {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+   if (databases.containsKey(databaseName)) {
+
+   // Make sure the database is empty
+   if (isDatabaseEmpty(databaseName)) {
+   databases.remove(databaseName);
+   } else {
+   throw new 
DatabaseNotEmptyException(catalogName, databaseName);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new DatabaseNotExistException(catalogName, 
databaseName);
+   }
+   }
+
+   private boolean isDatabaseEmpty(String databaseName) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseN

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274821607
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public interface CatalogDatabase {
 
 Review comment:
   In the Catalog, we can have multiple `CatalogDatabases`. There are multiple 
`CatalogTables` and `CatalogViews` in each `CatalogDatabase`. I think the 
meaning of the `CatalogDatabase` is the management of the CommonTable. The 
management relationship here is a bit special. For example, the 
`CatalogDatabase` and the CommonTable are the members of XXXReadableCatalog, 
and members are flat managed. They are dependent on `ObjectPath`. For example, 
`ObjectPath` contains the databaseName attribute. This management method is 
similar to the way that spark integrates hive, and it is flat management. 
Please see: 
https://github.com/apache/spark/blob/bbbe54aa79c0f6b66e3f3ac34515cc096beb5730/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala#L237
   
   Feel free to correct me if there is anything I misunderstood. :-)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274831488
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public interface CatalogDatabase {
 
 Review comment:
   From the points of view, the user may want to see all databases in the 
catalog by `listDatabases`. and if user wants to see the detail for database 
info, such as properties, will call `getDatabase(..)`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8007: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-04-12 Thread GitBox
sunjincheng121 commented on a change in pull request #8007: 
[FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/8007#discussion_r274831488
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogDatabase.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+
+/**
+ * Represents a database object in a catalog.
+ */
+public interface CatalogDatabase {
 
 Review comment:
   From the points of view, the user may want to see all databases in the 
catalog by `listDatabases`. and if user wants to see the detail of database 
info, such as properties, will call `getDatabase(..)`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services