[jira] [Commented] (FLINK-12308) Support python language in Flink Table API

2019-04-23 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824834#comment-16824834
 ] 

Dian Fu commented on FLINK-12308:
-

Thanks [~sunjincheng121] for creating this ticket. A big +1 on this feature.

> Support python language in Flink Table API
> --
>
> Key: FLINK-12308
> URL: https://issues.apache.org/jira/browse/FLINK-12308
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI, the 
> Table API will become the first-class citizen. Table API is declarative, and 
> can be automatically optimized, which is mentioned in the Flink mid-term 
> roadmap by Stephan. So, first considering supporting Python at the Table 
> level to cater to the current large number of analytics users. And Flink's 
> goal for Python Table API as follows:
>  * Users can write Flink Table API job in Python, and should mirror Java / 
> Scala Table API
>  * Users can submit Python Table API job in the following ways:
>  ** Submit a job with python script, integrate with `flink run`
>  ** Submit a job with python script by REST service
>  ** Submit a job in an interactive way, similar `scala-shell`
>  ** Local debug in IDE.
>  * Users can write custom functions(UDF, UDTF, UDAF)
>  * Pandas functions can be used in Flink Python Table API
> A more detailed description can be found in FLIP-38(Will be done soon).
> For the API level, we make the following plan:
>  * The short-term:
>  We may initially go with a simple approach to map the Python Table API to 
> the Java Table API via Py4J.
>  * The long-term:
>  We may need to create a Python API that follows the same structure as 
> Flink's Table API that produces the language-independent DAG. (As Stephan 
> already motioned on the [mailing 
> thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
wuchong commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump 
Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#issuecomment-486071536
 
 
   Thanks @godfreyhe .
   
   Hi @walterddr @twalthr @KurtYoung  , I will merge this if you have no other 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
godfreyhe commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump 
Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#issuecomment-486071031
 
 
   LGTM, +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on issue #8212: [FLINK-11519][table] Add function related 
catalog APIs
URL: https://github.com/apache/flink/pull/8212#issuecomment-486070424
 
 
   Pushed more updates based on review feedback here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12308) Support python language in Flink Table API

2019-04-23 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12308:
---

 Summary: Support python language in Flink Table API
 Key: FLINK-12308
 URL: https://issues.apache.org/jira/browse/FLINK-12308
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: sunjincheng
Assignee: sunjincheng


At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI, the 
Table API will become the first-class citizen. Table API is declarative, and 
can be automatically optimized, which is mentioned in the Flink mid-term 
roadmap by Stephan. So, first considering supporting Python at the Table level 
to cater to the current large number of analytics users. And Flink's goal for 
Python Table API as follows:
 * Users can write Flink Table API job in Python, and should mirror Java / 
Scala Table API
 * Users can submit Python Table API job in the following ways:
 ** Submit a job with python script, integrate with `flink run`
 ** Submit a job with python script by REST service
 ** Submit a job in an interactive way, similar `scala-shell`
 ** Local debug in IDE.
 * Users can write custom functions(UDF, UDTF, UDAF)
 * Pandas functions can be used in Flink Python Table API

A more detailed description can be found in FLIP-38(Will be done soon).

For the API level, we make the following plan:
 * The short-term:
 We may initially go with a simple approach to map the Python Table API to the 
Java Table API via Py4J.

 * The long-term:
 We may need to create a Python API that follows the same structure as Flink's 
Table API that produces the language-independent DAG. (As Stephan already 
motioned on the [mailing 
thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Kejian-Li closed pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files.

2019-04-23 Thread GitBox
Kejian-Li closed pull request #8247: Change the name of variable "log" into 
"LOG" to correspond to other class files.
URL: https://github.com/apache/flink/pull/8247
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Kejian-Li opened a new pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files.

2019-04-23 Thread GitBox
Kejian-Li opened a new pull request #8247: Change the name of variable "log" 
into "LOG" to correspond to other class files.
URL: https://github.com/apache/flink/pull/8247
 
 
   
   
   ## What is the purpose of the change
   
   change the name of variable "log" into "LOG" to correspond to other class 
files.
   
   
   ## Brief change log
   
   log  ->  LOG
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277954981
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   close() will be called after each unit test finishes to remove side-effects 
between tests. Leaving dropFunction() within unit tests, for example here, may 
mislead people that we are intentionally testing it, though it's only used for 
cleanup purpose.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277953303
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   I think having it here is okay so that it doesn't pass the side-effect to 
other test cases. It's test code and harmless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277954019
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 ##
 @@ -132,4 +133,36 @@
 */
boolean tableExists(ObjectPath objectPath) throws CatalogException;
 
+   // -- functions --
+
+   /**
+* List the names of all functions in the given database. An empty list 
is returned if none is registered.
+*
+* @param dbNameName of the database.
 
 Review comment:
   I prefer reformatting happens in a separate PR as it would otherwise pollute 
the review here. However, I have reformatted the newly added methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277953303
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   I think having it here is okay so that it doesn't pass the side-effect to 
other test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277953161
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a function that already exists.
+ */
+public class FunctionAlreadyExistException extends Exception {
 
 Review comment:
   The current name is consistent with other exceptions. Exception renaming 
will be handled by a separate JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277952965
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
+*/
+   Map getProperties();
 
 Review comment:
   Properties store any property that's not captured in the class definition. 
Hive Jar  URI would be a very good example. Other things could be 
function_creation_time, last_modified_time, etc. Since this is just API, each 
catalog can have its specific properties that it needs in order to make a 
function work. Not sure about what you meant by "customize JarResource".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277950610
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
 
 Review comment:
   Yes, this is what we have in FLIP, but I think it's okay to leave this out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-4653) Refactor JobClientActor to adapt to the new Rpc framework and new cluster managerment

2019-04-23 Thread Jing Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jing Zhang closed FLINK-4653.
-
Resolution: Invalid

> Refactor JobClientActor to adapt to the new Rpc framework and new cluster 
> managerment
> -
>
> Key: FLINK-4653
> URL: https://issues.apache.org/jira/browse/FLINK-4653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Command Line Client
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>
> 1. Create a RpcEndpoint(temporary named JobInfoTracker) and 
> RpcGateway(temporary named JobInfoTrackerGateway) to replace the old 
> JobClientActor. 
> 2. Change rpc message communication in JobClientActor to rpc method call to 
> apply to the new rpc framework. 
> 3. JobInfoTracker is responsible for waiting for the jobStateChange and 
> jobResult util job complete. But it is not responsible for submitting new job 
> because jobSubmission behavior is different in different cluster



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277950481
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import java.util.Set;
+
+/**
+ * CatalogManager manages all the registered ReadableCatalog instances with 
unique names.
+ * It has a concept of current catalog, which will be used when it is not 
given when referencing meta-objects.
+ */
+@PublicEvolving
+public interface CatalogManager {
+
+   /**
+* Register a catalog with a unique name.
+*
+* @param catalogName catalog name to register
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
take
+*/
+   void registerCatalog(String catalogName, ReadableCatalog catalog) 
throws CatalogAlreadyExistsException;
+
+   /**
+* Get a catalog by name.
+*
+* @param catalogName catalog name
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+   ReadableCatalog getCatalog(String catalogName) throws 
CatalogNotExistException;
+
+   /**
+* Get names of all registered catalog.
+*
+* @return a set of names of registered catalogs
+*/
+   Set getCatalogNames();
+
+   /**
+* Get the current catalog.
+*
+* @return the current catalog
+*/
+   ReadableCatalog getCurrentCatalog();
+
+   /**
+* Get name of the current database.
+*
+* @return name of the current database
+*/
+   String getCurrentDatabaseName();
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277948967
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
 
 Review comment:
   missing catalog name in the message


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949377
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions_DatabaseNotExistException() throws 
Exception{
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.listFunctions(db1);
+   }
+
+   @Test
+   public void testGetFunction_FunctionNotExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   exception.expect(FunctionNotExistException.class);
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277948682
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949973
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
 
 Review comment:
   add an empty line between comment and `@return`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r27795
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949417
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions_DatabaseNotExistException() throws 
Exception{
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.listFunctions(db1);
+   }
+
+   @Test
+   public void testGetFunction_FunctionNotExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   exception.expect(FunctionNotExistException.class);
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277950050
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
+*/
+   Map getProperties();
+
+   /**
+* Create a deep copy of the function.
+* @return a deep copy of "this" instance
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277948110
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a function that already exists.
+ */
+public class FunctionAlreadyExistException extends Exception {
 
 Review comment:
   ```suggestion
   public class FunctionAlreadyExistsException extends Exception {
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949318
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
 
 Review comment:
   remove this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949341
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   remove these


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949437
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions_DatabaseNotExistException() throws 
Exception{
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.listFunctions(db1);
+   }
+
+   @Test
+   public void testGetFunction_FunctionNotExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   exception.expect(FunctionNotExistException.class);
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277950075
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 ##
 @@ -132,4 +133,36 @@
 */
boolean tableExists(ObjectPath objectPath) throws CatalogException;
 
+   // -- functions --
+
+   /**
+* List the names of all functions in the given database. An empty list 
is returned if none is registered.
+*
+* @param dbNameName of the database.
 
 Review comment:
   replace tab with space, also should use lower case for starting character, 
like "name of the database"
   
   all javadoc in this file needs some reformatting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949134
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
 
 Review comment:
   ```suggestion
exception.expectMessage("Database db1 does not exist in 
catalog");
   ```
   
   seems no need to have an upper "C". Change the "C" to "c" in 
DatabaseNotExistException?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277950023
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
 
 Review comment:
   empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277947738
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A generic catalog function implementation.
+ */
+public class GenericCatalogFunction implements CatalogFunction {
+
+   private final String clazzName; // Fully qualified class name of the 
function
 
 Review comment:
   className?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277948736
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277948627
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
 
 Review comment:
   we don't need these two drop commands, as they are already taken care in 
close()


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add 
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277949400
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 ##
 @@ -562,6 +568,158 @@ public void testRenameView() throws Exception {
catalog.dropTable(viewPath2, false);
}
 
+   // -- functions --
+
+   @Test
+   public void testCreateFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   assertFalse(catalog.functionExists(path1));
+
+   catalog.createFunction(path1, createFunction(), false);
+
+   assertTrue(catalog.functionExists(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+   assertFalse(catalog.databaseExists(db1));
+
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.createFunction(path1, createFunction(), false);
+
+   exception.expect(FunctionAlreadyExistException.class);
+   exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+   catalog.createFunction(path1, createFunction(), false);
+   }
+
+   @Test
+   public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.createFunction(path1, createAnotherFunction(), true);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+   CatalogFunction newFunc = createAnotherFunction();
+   catalog.alterFunction(path1, newFunc, false);
+
+   assertNotEquals(func, catalog.getFunction(path1));
+   CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+   exception.expect(FunctionNotExistException.class);
+   exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+   }
+
+   @Test
+   public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+   catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+   assertFalse(catalog.functionExists(nonExistObjectPath));
+
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions() throws Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   CatalogFunction func = createFunction();
+   catalog.createFunction(path1, func, false);
+
+   assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+   catalog.dropFunction(path1, false);
+   catalog.dropDatabase(db1, false);
+   }
+
+   @Test
+   public void testListFunctions_DatabaseNotExistException() throws 
Exception{
+   exception.expect(DatabaseNotExistException.class);
+   exception.expectMessage("Database db1 does not exist in 
Catalog");
+   catalog.listFunctions(db1);
+   }
+
+   @Test
+   public void testGetFunction_FunctionNotExistException() throws 
Exception {
+   catalog.createDatabase(db1, createDb(), false);
+
+   exception.expect(FunctionNotExistException.class);
+   

[jira] [Created] (FLINK-12307) Support translation from StreamExecWindowJoin to StreamTransformation.

2019-04-23 Thread Jing Zhang (JIRA)
Jing Zhang created FLINK-12307:
--

 Summary: Support translation from StreamExecWindowJoin to 
StreamTransformation.
 Key: FLINK-12307
 URL: https://issues.apache.org/jira/browse/FLINK-12307
 Project: Flink
  Issue Type: Task
  Components: Table SQL / API
Reporter: Jing Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
walterddr commented on a change in pull request #8234: [FLINK-12288] 
[table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#discussion_r277949412
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
 ##
 @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 
 
   

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface

2019-04-23 Thread GitBox
eaglewatcherwb commented on a change in pull request #8233: [FLINK-12227] 
[runtime] introduce SchedulingStrategy interface
URL: https://github.com/apache/flink/pull/8233#discussion_r277948095
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Objects;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
 
 Review comment:
   Shall we use `SchedulingExecutionVertexID` to identify this id is used in 
scheduler more clearly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8234: [FLINK-12288] 
[table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#discussion_r277948264
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
 ##
 @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 
 
   

[GitHub] [flink] wuchong commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
wuchong commented on a change in pull request #8234: [FLINK-12288] 
[table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#discussion_r277944655
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
 ##
 @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 
 
   

[jira] [Commented] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION

2019-04-23 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824781#comment-16824781
 ] 

Shengnan YU commented on FLINK-11848:
-

It does not work, I have looked up the source code. Flink partition discovery 
get the topic list from consumer metadata. However the warning occurs when 
consumer fetch metadata from cluster. It is a issue with kafka-client not flink 
actually. Thank you very much for help.

> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> 
>
> Key: FLINK-11848
> URL: https://issues.apache.org/jira/browse/FLINK-11848
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4
>Reporter: Shengnan YU
>Assignee: frank wang
>Priority: Major
>
> Recently we are doing some streaming jobs with apache flink. There are 
> multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex 
> pattern to let a consumer to consume those topics. However, if we delete some 
> older topics, it seems that the metadata in consumer does not update properly 
> so It still remember those outdated topic in its topic list, which leads to 
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
> seems to occur in producer as well. Any idea to solve this problem? Thank you 
> very much!
>  
> Example to reproduce problem:
> There are multiple kafka topics which are 
> "test20190310","test20190311","test20190312" for instance. I run the job and 
> everything is ok. Then if I delete topic "test20190310", the consumer does 
> not perceive the topic is deleted, it will still go fetching metadata of that 
> topic. In taskmanager's log, unknown errors display. 
> {code:java}
> public static void main(String []args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.rest", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> 
> props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>"120");
> Pattern topics = Pattern.compile("^test.*$");
> FlinkKafkaConsumer011 consumer = new 
> FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props);
> DataStream stream = env.addSource(consumer);
> stream.writeToSocket("localhost", 4, new SimpleStringSchema());
> env.execute("test");
> }
> }
> {code}
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] link3280 edited a comment on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem

2019-04-23 Thread GitBox
link3280 edited a comment on issue #8068: [FLINK-12042][StateBackends] Fix 
RocksDBStateBackend's mistaken usage of default filesystem
URL: https://github.com/apache/flink/pull/8068#issuecomment-486042889
 
 
   > Moreover, I was wondering whether `SnapshotDirectory.temporary` should 
always return a local `SnapshotDirectory`. If this should be the case, then one 
could change the signature to `SnapshotDirectory.temporary(File)` in order to 
avoid the wrong usage. Maybe @StefanRRichter could help to answer this question.
   
   IMHO, we should only use the local file system to store task local snapshots 
for efficiency and simplicity, and I'm not sure if Rocksdb can checkpoint to 
other file systems. Another alternative to avoid wrong `FileSystem` could be 
using `SnapshotDirectory(Path, FileSystem)` instead when creating temporary and 
permanent snapshot directories.
   
   I've updated the code and added a test on the ground of the above points, 
and will update the PR if @StefanRRichter confirms that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task

2019-04-23 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-12296:
--
Description: 
As the mail list said[1], there may be a problem when more than one operator 
chained in a single task, and all the operators have states, we'll encounter 
data loss silently problem.

Currently, the local directory we used is like below

../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state),

 

if more than one operator chained in a single task, and all the operators have 
states, then all the operators will share the same local directory(because the 
vertext_id is the same), this will lead a data loss problem. 

 

The path generation logic is below:
{code:java}
// LocalRecoveryDirectoryProviderImpl.java

@Override
public File subtaskSpecificCheckpointDirectory(long checkpointId) {
   return new File(subtaskBaseDirectory(checkpointId), 
checkpointDirString(checkpointId));
}


@VisibleForTesting
String subtaskDirString() {
   return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + 
subtaskIndex).toString();
}

@VisibleForTesting
String checkpointDirString(long checkpointId) {
   return "chk_" + checkpointId;
}
{code}
[1] 
[http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E]

  was:
As the mail list said[1], there may be a problem when more than one operator 
chained in a single task, and all the operators have states, we'll encounter 
data loss silently problem.

Currently, the local directory we used is like below

../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state),

 

if more than one operator chained in a single task, and all the operators have 
states, then all the operators will share the same local directory(because the 
vertext_id is the same), this will lead a data loss problem. 

 

The path generation logic is below:
{code:java}
// LocalRecoveryDirectoryProviderImpl.java

@Override
public File subtaskSpecificCheckpointDirectory(long checkpointId) {
   return new File(subtaskBaseDirectory(checkpointId), 
checkpointDirString(checkpointId));
}


@VisibleForTesting
String subtaskDirString() {
   return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + 
subtaskIndex).toString();
}

@VisibleForTesting
String checkpointDirString(long checkpointId) {
   return "chk_" + checkpointId;
}
{code}
[1] 
[https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7]


> Data loss silently in RocksDBStateBackend when more than one operator(has 
> states) chained in a single task 
> ---
>
> Key: FLINK-12296
> URL: https://issues.apache.org/jira/browse/FLINK-12296
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>
> As the mail list said[1], there may be a problem when more than one operator 
> chained in a single task, and all the operators have states, we'll encounter 
> data loss silently problem.
> Currently, the local directory we used is like below
> ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state),
>  
> if more than one operator chained in a single task, and all the operators 
> have states, then all the operators will share the same local 
> directory(because the vertext_id is the same), this will lead a data loss 
> problem. 
>  
> The path generation logic is below:
> {code:java}
> // LocalRecoveryDirectoryProviderImpl.java
> @Override
> public File subtaskSpecificCheckpointDirectory(long checkpointId) {
>return new File(subtaskBaseDirectory(checkpointId), 
> checkpointDirString(checkpointId));
> }
> @VisibleForTesting
> String subtaskDirString() {
>return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + 
> subtaskIndex).toString();
> }
> @VisibleForTesting
> String checkpointDirString(long checkpointId) {
>return "chk_" + checkpointId;
> }
> {code}
> [1] 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese

2019-04-23 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824772#comment-16824772
 ] 

Jark Wu commented on FLINK-11614:
-

I have left comments in your pull request. [~yangfei]

> Translate the "Configuring Dependencies" page into Chinese
> --
>
> Key: FLINK-11614
> URL: https://issues.apache.org/jira/browse/FLINK-11614
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: YangFei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html
> The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8200: [FLINK-11614] [docs-zh] Translate the "Configuring Dependencies" page into Chinese

2019-04-23 Thread GitBox
wuchong commented on issue #8200: [FLINK-11614] [docs-zh] Translate the 
"Configuring Dependencies" page into Chinese
URL: https://github.com/apache/flink/pull/8200#issuecomment-486050262
 
 
   Hi @Leev ,  please do not MERGE master. You can use `IDEA > VCS > Git > 
Rebase` tool to rebase commits if you are using IntelliJ IDEA. 
   
   You can also create a new clean branch, and copy the changes to the new 
branch, and then force update your remote Leev:FLINK-11614 branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11636) Translate "State Schema Evolution" into Chinese

2019-04-23 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824762#comment-16824762
 ] 

Congxian Qiu(klion26) commented on FLINK-11636:
---

[~yangfei] Now I'm not working on it, please assign it to yourself if you want.

> Translate "State Schema Evolution" into Chinese
> ---
>
> Key: FLINK-11636
> URL: https://issues.apache.org/jira/browse/FLINK-11636
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> doc locates in flink/docs/dev/stream/state/schema_evolution.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277934970
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
+*/
+   Map getProperties();
+
+   /**
+* Create a deep copy of the function.
+* @return a deep copy of "this" instance
+*/
+   CatalogFunction copy();
+
+   /**
+* Get a brief description of the table or view.
+*
+* @return an optional short description of the table/view
+*/
+   Optional getDescription();
+
+   /**
+* Get a detailed description of the table or view.
 
 Review comment:
   table or view => function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277936243
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
+*/
+   Map getProperties();
 
 Review comment:
   What are the properties?
   Any choice to customize JarResource? like `HiveFunction.resourceUris`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277932067
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
 
 Review comment:
   Why not ClassName?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277937225
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 ##
 @@ -131,4 +133,47 @@ void createTable(ObjectPath tablePath, CatalogBaseTable 
table, boolean ignoreIfE
void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
 
+   // -- functions --
+
+   /**
+* Create a function.
+*
+* @param functionPath  Path of the function
+* @param function  The function to be created
 
 Review comment:
   Comments can be considered for alignment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277934671
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
 
 Review comment:
   Any other source?
   I don't quite understand the meaning of adding this. This method doesn't 
seem to work. Wait until we add other Sources?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277936838
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -269,4 +273,79 @@ public boolean tableExists(ObjectPath tablePath) {
return tablePath != null && 
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
}
 
+   // -- functions --
+
+   @Override
+   public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists)
+   throws FunctionAlreadyExistException, DatabaseNotExistException 
{
+   checkArgument(functionPath != null);
 
 Review comment:
   prefer `checkNotNull` to these `checkArgument`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs

2019-04-23 Thread GitBox
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] 
Add function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r277934944
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+   /**
+* Get the source of the function. Right now, CLASS only.
+* @return the source of the function
+*/
+   default Source getSource() {
+   return Source.CLASS;
+   }
+
+   /**
+* Get the full name of the class backing the function.
+* @return the full name of the class
+*/
+   String getClazzName();
+
+   /**
+* Get the properties of the function.
+* @return the properties of the function
+*/
+   Map getProperties();
+
+   /**
+* Create a deep copy of the function.
+* @return a deep copy of "this" instance
+*/
+   CatalogFunction copy();
+
+   /**
+* Get a brief description of the table or view.
 
 Review comment:
   table or view => function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11636) Translate "State Schema Evolution" into Chinese

2019-04-23 Thread YangFei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824761#comment-16824761
 ] 

YangFei commented on FLINK-11636:
-

Hi [~klion26] , I want to know if you are working on this issue .If not , can I 
assign this issue to myself, thanks

> Translate "State Schema Evolution" into Chinese
> ---
>
> Key: FLINK-11636
> URL: https://issues.apache.org/jira/browse/FLINK-11636
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> doc locates in flink/docs/dev/stream/state/schema_evolution.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese

2019-04-23 Thread YangFei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824755#comment-16824755
 ] 

YangFei commented on FLINK-11614:
-

Hi [~jark].

 I have made some changes after reading [“Flink Translation 
Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications].
  

waiting to review. thanks

> Translate the "Configuring Dependencies" page into Chinese
> --
>
> Key: FLINK-11614
> URL: https://issues.apache.org/jira/browse/FLINK-11614
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: YangFei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html
> The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese

2019-04-23 Thread YangFei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YangFei updated FLINK-11614:

Comment: was deleted

(was: Hi JarkWu . I have made some changes after reading [“Flink Translation 
Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications].
  

waiting to review. thanks)

> Translate the "Configuring Dependencies" page into Chinese
> --
>
> Key: FLINK-11614
> URL: https://issues.apache.org/jira/browse/FLINK-11614
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: YangFei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html
> The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese

2019-04-23 Thread YangFei (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824754#comment-16824754
 ] 

YangFei commented on FLINK-11614:
-

Hi JarkWu . I have made some changes after reading [“Flink Translation 
Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications].
  

waiting to review. thanks

> Translate the "Configuring Dependencies" page into Chinese
> --
>
> Key: FLINK-11614
> URL: https://issues.apache.org/jira/browse/FLINK-11614
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: YangFei
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The page url is 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html
> The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md
> The markdown file will be created once FLINK-11529 is merged.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem

2019-04-23 Thread GitBox
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix 
RocksDBStateBackend's mistaken usage of default filesystem
URL: https://github.com/apache/flink/pull/8068#issuecomment-486042889
 
 
   > Moreover, I was wondering whether `SnapshotDirectory.temporary` should 
always return a local `SnapshotDirectory`. If this should be the case, then one 
could change the signature to `SnapshotDirectory.temporary(File)` in order to 
avoid the wrong usage. Maybe @StefanRRichter could help to answer this question.
   
   IMHO, we should only use the local file system to store task local snapshots 
for efficiency and simplicity, and I'm not sure if Rocksdb can checkpoint to 
other file systems. Another alternative to avoid wrong `FileSystem` could be 
using `SnapshotDirectory(Path, FileSystem)` instead when creating temporary and 
permanent snapshot directories.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager

2019-04-23 Thread GitBox
liyafan82 commented on a change in pull request #8236: 
[FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
URL: https://github.com/apache/flink/pull/8236#discussion_r277935627
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -387,11 +387,11 @@ public void release(MemorySegment segment) {
 *
 * @param segments The segments to be released.
 * @throws NullPointerException Thrown, if the given collection is null.
-* @throws IllegalArgumentException Thrown, id the segments are of an 
incompatible type.
+* @throws IllegalArgumentException Thrown, if the segments are of an 
incompatible type.
 */
public void release(Collection segments) {
if (segments == null) {
-   return;
+   throw new NullPointerException();
 
 Review comment:
   That sounds reasonable. Thank you for your comments. Hi @StephanEwen, would 
you please give some comments?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11945) Support over aggregation for blink streaming runtime

2019-04-23 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-11945:

Summary: Support over aggregation for blink streaming runtime  (was: 
Support Over aggregation for SQL)

> Support over aggregation for blink streaming runtime
> 
>
> Key: FLINK-11945
> URL: https://issues.apache.org/jira/browse/FLINK-11945
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Assignee: Forward Xu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is a simple port the over aggregation implementation from 
> {{flink-table-planner}} to {{flink-table-planner-blink}}.
> Note: please also convert the over function implementation from Scala to Java.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Aitozi commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager

2019-04-23 Thread GitBox
Aitozi commented on a change in pull request #8236: 
[FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
URL: https://github.com/apache/flink/pull/8236#discussion_r277933794
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -387,11 +387,11 @@ public void release(MemorySegment segment) {
 *
 * @param segments The segments to be released.
 * @throws NullPointerException Thrown, if the given collection is null.
-* @throws IllegalArgumentException Thrown, id the segments are of an 
incompatible type.
+* @throws IllegalArgumentException Thrown, if the segments are of an 
incompatible type.
 */
public void release(Collection segments) {
if (segments == null) {
-   return;
+   throw new NullPointerException();
 
 Review comment:
   It's truly inconsistent, but this fix will change the default behaviour of 
the release operation, and the `release(MemorySegment segment)` also skip the 
null...  I think may be it need @StephanEwen author of the `MemoryManager.java` 
to confirm, Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12304) AvroInputFormat should support schema evolution

2019-04-23 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-12304:


Assignee: vinoyang

> AvroInputFormat should support schema evolution
> ---
>
> Key: FLINK-12304
> URL: https://issues.apache.org/jira/browse/FLINK-12304
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.8.0
>Reporter: John
>Assignee: vinoyang
>Priority: Major
>
> From the avro spec:
> _A reader of Avro data, whether from an RPC or a file, can always parse that 
> data because its schema is provided. But that schema may not be exactly the 
> schema that was expected. For example, if the data was written with a 
> different version of the software than it is read, then records may have had 
> fields added or removed._
> The AvroInputFormat should allow the application to supply a reader's schema 
> to support cases where data was written with an old version of a schema and 
> needs to be read with a newer version.  The reader's schema can have addition 
> fields with defaults so that the old schema can be adapted to the new.  The 
> underlying avro java library supports schema resolution, so adding support in 
> AvroInputFormat should be straight forward.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet

2019-04-23 Thread GitBox
flinkbot commented on issue #8246: [hotfix] Add error message to precondition 
in HeapPriorityQueueSet
URL: https://github.com/apache/flink/pull/8246#issuecomment-486039200
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem

2019-04-23 Thread GitBox
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix 
RocksDBStateBackend's mistaken usage of default filesystem
URL: https://github.com/apache/flink/pull/8068#issuecomment-486039068
 
 
   Thanks for the review @tillrohrmann!
To answer the first three questions: the `IllegalStateException` should be 
thrown by the following lines in 1.6.4, 1.7.2 and 1.8.0 respectively.
   
https://github.com/apache/flink/blob/6f4148180ba372a2c12c1d54bea8579350af6c98/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L2568
   
https://github.com/apache/flink/blob/ceba8af39b28b91ae6f2d5cbdb1b99258a73e742/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L411
   
https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L410
   
   > Independent of the question whether to store the data locally or not, I 
would like to understand where the problem comes from because the 
SnapshotDirectory seems to use the right FileSystem implementation to check the 
existence.
   
   Now we are using `SnapshotDirectory(Path)` to create the instance, which 
will infer its scheme from the Path URI, and get the corresponding `FileSystem` 
based on that.
   
   
https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java#L66
   
   If the scheme is missing in the URI, we get the default `FileSystem` which 
can be set by `fs.default-scheme`.
   
   
https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L334
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sjwiesman opened a new pull request #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet

2019-04-23 Thread GitBox
sjwiesman opened a new pull request #8246: [hotfix] Add error message to 
precondition in HeapPriorityQueueSet
URL: https://github.com/apache/flink/pull/8246
 
 
   ## What is the purpose of the change
   Add error message to precondition in HeapPriorityQueueSet
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager

2019-04-23 Thread GitBox
liyafan82 commented on a change in pull request #8236: 
[FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
URL: https://github.com/apache/flink/pull/8236#discussion_r277931471
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -387,11 +387,11 @@ public void release(MemorySegment segment) {
 *
 * @param segments The segments to be released.
 * @throws NullPointerException Thrown, if the given collection is null.
-* @throws IllegalArgumentException Thrown, id the segments are of an 
incompatible type.
+* @throws IllegalArgumentException Thrown, if the segments are of an 
incompatible type.
 */
public void release(Collection segments) {
if (segments == null) {
-   return;
+   throw new NullPointerException();
 
 Review comment:
   > If we get a `null` to release, why can't we just skip it? I think it's a 
defensive code not a bug. Or can you post some bad situation about this?
   
   Hi Aitozi, thanks a lot for your comment. The JavaDoc for this method 
declares "@throws NullPointerException Thrown, if the given collection is 
null." So we should either throw an NPE, or change the JavaDoc, to make it 
consistent. 
   By referencing other methods in this class, I think it is preferable to make 
the method throw NPE, because the methods should follow a consistent style. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi closed pull request #4935: [Flink-7945][Metrics]Fix per partition-lag metric lost in kafka connector

2019-04-23 Thread GitBox
Aitozi closed pull request #4935: [Flink-7945][Metrics]Fix per 
partition-lag metric lost in kafka connector 
URL: https://github.com/apache/flink/pull/4935
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi closed pull request #7543: [FLINK-10996]Fix the possible state leak with CEP processing time model

2019-04-23 Thread GitBox
Aitozi closed pull request #7543: [FLINK-10996]Fix the possible state leak with 
CEP processing time model
URL: https://github.com/apache/flink/pull/7543
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API

2019-04-23 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-10972:

Affects Version/s: 1.9.0

> Enhancements to Flink Table API
> ---
>
> Key: FLINK-10972
> URL: https://issues.apache.org/jira/browse/FLINK-10972
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> [link title|http://example.com/]With the continuous efforts from the 
> community, the Flink system has been continuously improved, which has 
> attracted more and more users. Flink SQL is a canonical, widely used 
> relational query language. However, there are still some scenarios where 
> Flink SQL failed to meet user needs in terms of functionality and ease of 
> use, such as:
>  * In terms of functionality
> Iteration, user-defined window, user-defined join, user-defined GroupReduce, 
> etc. Users cannot express them with SQL;
>  * In terms of ease of use
>  * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), 
> udf2(), udf3())” can be used to accomplish the same function., with a 
> map() function returning 100 columns, one has to define or call 100 UDFs when 
> using SQL, which is quite involved.
>  * FlatMap -  e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be 
> implemented with “table.join(udtf).select()”. However, it is obvious that 
> datastream is easier to use than SQL.
> Due to the above two reasons, In this JIRAs group, we will enhance the 
> TableAPI in stages.
> ---
> The first stage we seek to support (will describe the details in the sub 
> issue) :
>  * Table.map()
>  * Table.flatMap()
>  * GroupedTable.aggregate()
>  * GroupedTable.flatAggregate()
> The FLIP can be find here: 
> [FLIP-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739]
>  
> The second part is about column operator/operations:
> 1)   Table(schema) operators
>  * Add columns
>  * Replace columns
>  * Drop columns
>  * Rename columns
> 2)Fine-grained column/row operations
>  * Column selection
>  * Row package and flatten
> See [google 
> doc|https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"

2019-04-23 Thread Kejian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kejian Li updated FLINK-12306:
--
External issue URL:   (was: https://github.com/apache/flink/pull/8245)

> Change the name of variable "log" to Upper case "LOG"
> -
>
> Key: FLINK-12306
> URL: https://issues.apache.org/jira/browse/FLINK-12306
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kejian Li
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>
> Change the name of variable "log" from lower case to upper case "LOG" to 
> correspond to other class files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Kejian-Li closed pull request #8245: Change the name of variable "log" to upper case "LOG"

2019-04-23 Thread GitBox
Kejian-Li closed pull request #8245: Change the name of variable "log" to upper 
case "LOG"
URL: https://github.com/apache/flink/pull/8245
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277851836
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
 
 Review comment:
   good point. However, as far as I can tell, catalog APIs do not care about 
the actual location more than display them as properties in SQL. What actually 
really cares about the location is the data connector, but they will retrieve 
that information directly from hive metastore.
   
   I think we we start with what we have now and extend it if necessary later


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"

2019-04-23 Thread Kejian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kejian Li updated FLINK-12306:
--
External issue URL: https://github.com/apache/flink/pull/8245

> Change the name of variable "log" to Upper case "LOG"
> -
>
> Key: FLINK-12306
> URL: https://issues.apache.org/jira/browse/FLINK-12306
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Kejian Li
>Priority: Trivial
>  Labels: pull-request-available
> Fix For: 1.9.0, 1.8.1
>
>
> Change the name of variable "log" from lower case to upper case "LOG" to 
> correspond to other class files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8245: Change the name of variable "log" to upper case "LOG"

2019-04-23 Thread GitBox
flinkbot commented on issue #8245: Change the name of variable "log" to upper 
case "LOG"
URL: https://github.com/apache/flink/pull/8245#issuecomment-486030536
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Kejian-Li opened a new pull request #8245: Change the name of variable "log" to upper case "LOG"

2019-04-23 Thread GitBox
Kejian-Li opened a new pull request #8245: Change the name of variable "log" to 
upper case "LOG"
URL: https://github.com/apache/flink/pull/8245
 
 
   
   
   ## What is the purpose of the change
   
   
   Change the name of variable "log" to upper case "LOG" to correspond to other 
class files.
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Kejian-Li closed pull request #8241: Update MainThreadValidatorUtil.java

2019-04-23 Thread GitBox
Kejian-Li closed pull request #8241: Update MainThreadValidatorUtil.java
URL: https://github.com/apache/flink/pull/8241
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"

2019-04-23 Thread Kejian Li (JIRA)
Kejian Li created FLINK-12306:
-

 Summary: Change the name of variable "log" to Upper case "LOG"
 Key: FLINK-12306
 URL: https://issues.apache.org/jira/browse/FLINK-12306
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.8.0, 1.7.2
Reporter: Kejian Li
 Fix For: 1.9.0, 1.8.1


Change the name of variable "log" from lower case to upper case "LOG" to 
correspond to other class files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Kejian-Li commented on issue #8241: Update MainThreadValidatorUtil.java

2019-04-23 Thread GitBox
Kejian-Li commented on issue #8241: Update MainThreadValidatorUtil.java
URL: https://github.com/apache/flink/pull/8241#issuecomment-486024371
 
 
   > @Kejian-Li thanks for your contribution, I think the target branch should 
be `master` and please update the title and description according to the the 
[doc](https://flink.apache.org/contribute-code.html)
   
   thanks very much


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner

2019-04-23 Thread GitBox
walterddr commented on a change in pull request #8234: [FLINK-12288] 
[table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
URL: https://github.com/apache/flink/pull/8234#discussion_r277912372
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml
 ##
 @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
 
 
   

[GitHub] [flink] xuefuz commented on a change in pull request #8205: [FLINK-12238] [hive] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8205: [FLINK-12238] [hive] 
Support database related operations in GenericHiveMetastoreCatalog and setup 
flink-connector-hive module
URL: https://github.com/apache/flink/pull/8205#discussion_r277893627
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
+ */
+public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog {
+   private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
+
+   public static final String DEFAULT_DB = "default";
+
+   private final String catalogName;
+   private final HiveConf hiveConf;
+
+   private String currentDatabase = DEFAULT_DB;
+   private IMetaStoreClient client;
+
+   public GenericHiveMetastoreCatalog(String catalogName, String 
hivemetastoreURI) {
+   this(catalogName, getHiveConf(hivemetastoreURI));
+   }
+
+   public GenericHiveMetastoreCatalog(String catalogName, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   this.catalogName = catalogName;
+
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+   LOG.info("Created GenericHiveMetastoreCatalog '{}'", 
catalogName);
+   }
+
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+

[jira] [Created] (FLINK-12305) Table API Clarification

2019-04-23 Thread Alex Barnes (JIRA)
Alex Barnes created FLINK-12305:
---

 Summary: Table API Clarification
 Key: FLINK-12305
 URL: https://issues.apache.org/jira/browse/FLINK-12305
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Affects Versions: 1.8.0
Reporter: Alex Barnes


It is not clear from the documentation if late arriving data is correctly 
handled in the Flink Table/SQL APIs. The documentation makes passing reference 
to recognizing late arriving data, but does not go into depth as to what kind 
of triggering/processing can be performed on it 

[https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time]

This old email thread on the apache-flink-users mailing list tells a different 
story - specifically that late arriving data is not supported and DataStream 
APIs need to be used instead:

[http://osdir.com/apache-flink-users/msg08110.html]

Has support been added since that email correspondence? Please consider 
reducing ambiguity in the documentation and update it to better reflect the 
current/planned state of support for late arriving data in the Table API.

Thanks,
Alex



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277853190
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 ##
 @@ -29,4 +32,15 @@
 * @return table statistics
 */
TableStats getStatistics();
+
+   /**
+* Check if the table is partitioend or not.
+*/
+   boolean isPartitioned();
+
+   /**
+* Get the partition keys of the table. This will be an empty set if 
the table is not partitioned.
+* @return partition keys of the table.
+*/
+   LinkedHashSet getPartitionKeys() throws 
TableNotPartitionedException;
 
 Review comment:
   I don't really understand the question. What do you mean restriction? Can 
you elaborate? 
   
   You are right that Hive's 
[Table](https://hive.apache.org/javadocs/r2.3.4/api/org/apache/hadoop/hive/metastore/api/Table.html)
 uses a separate field dedicated for partition keys.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277853190
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 ##
 @@ -29,4 +32,15 @@
 * @return table statistics
 */
TableStats getStatistics();
+
+   /**
+* Check if the table is partitioend or not.
+*/
+   boolean isPartitioned();
+
+   /**
+* Get the partition keys of the table. This will be an empty set if 
the table is not partitioned.
+* @return partition keys of the table.
+*/
+   LinkedHashSet getPartitionKeys() throws 
TableNotPartitionedException;
 
 Review comment:
   I don't really understand your question. Can you elaborate? 
   
   You are right that Hive's 
[Table](https://hive.apache.org/javadocs/r2.3.4/api/org/apache/hadoop/hive/metastore/api/Table.html)
 uses a separate field dedicated for partition keys.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task

2019-04-23 Thread seed Z (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824491#comment-16824491
 ] 

seed Z commented on FLINK-12296:


Unchaining the operator makes the new job be able to save the state via 
checkpoint.

It is unfortunately not able to resume from the old checkpoint (RocksDB backed 
incremental checkpoint) likely due to the way the path is composed

> Data loss silently in RocksDBStateBackend when more than one operator(has 
> states) chained in a single task 
> ---
>
> Key: FLINK-12296
> URL: https://issues.apache.org/jira/browse/FLINK-12296
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>
> As the mail list said[1], there may be a problem when more than one operator 
> chained in a single task, and all the operators have states, we'll encounter 
> data loss silently problem.
> Currently, the local directory we used is like below
> ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state),
>  
> if more than one operator chained in a single task, and all the operators 
> have states, then all the operators will share the same local 
> directory(because the vertext_id is the same), this will lead a data loss 
> problem. 
>  
> The path generation logic is below:
> {code:java}
> // LocalRecoveryDirectoryProviderImpl.java
> @Override
> public File subtaskSpecificCheckpointDirectory(long checkpointId) {
>return new File(subtaskBaseDirectory(checkpointId), 
> checkpointDirString(checkpointId));
> }
> @VisibleForTesting
> String subtaskDirString() {
>return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + 
> subtaskIndex).toString();
> }
> @VisibleForTesting
> String checkpointDirString(long checkpointId) {
>return "chk_" + checkpointId;
> }
> {code}
> [1] 
> [https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277851836
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
 
 Review comment:
   good point. How about we start with what we have now and extend it if 
necessary later


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277851366
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
+
+   /**
+* Get the PartitionSpec of the partition.
+*
+* @return the PartitionSpec of the partition.
+*/
+   PartitionSpec getPartitionSpec();
+
+   /**
+* Get a map of properties associated with the partition.
+*
+* @return a map of properties with the partition
+*/
+   Map getProperties();
+
+   /**
+* Get a deep copy of the CatalogPartition instance.
+*
+* @return a copy of CatalogPartition instance
+*/
+   CatalogPartition copy();
+
+   /**
+* Get a brief description of the database.
+*
+* @return an optional short description of the database
+*/
+   Optional getDescription();
+
+   /**
+* Get a detailed description of the database.
+*
+* @return an optional long description of the database
+*/
+   Optional getDetailedDescription();
+
+   /**
+* Represents a partition spec object.
+* Partition columns and values are NOT of strict order, and they need 
to be re-arranged to the correct order
+* by comparing with a list of strictly ordered partition keys.
+*/
+   interface PartitionSpec {
+   /**
+* Check if the current PartitionSpec contains all the 
partition columns and values of another PartitionSpec.
+*/
+   boolean contains(PartitionSpec another);
 
 Review comment:
   good point. checked Blink and it's only used in in-memory catalog impl. Will 
remove from API


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277851183
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 ##
 @@ -132,4 +134,58 @@
 */
boolean tableExists(ObjectPath objectPath) throws CatalogException;
 
+   // -- partitions --
+
+   /**
+* Gets PartitionSpec of all partitions of the table.
+*
+* @param tablePath Path of the table.
+* @return  A list of PartitionSpec of the table.
+*
+* @throws TableNotExistExceptionthrown if the table does not exist 
in the catalog.
+* @throws TableNotPartitionedException thrown if the table is 
not partitioned.
+* @throws CatalogException in case of any runtime exception
+*/
+   List listPartitions(ObjectPath 
tablePath)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException;
+
+   /**
+* Gets PartitionSpec of all partitions that is under the given 
PartitionSpec in the table .
+*
+* @param tablePath Path of the table
+* @param partitionSpecsThe partition spec to list
+* @return A list of PartitionSpec that is under the given ParitionSpec 
in the table.
+*
+* @throws TableNotExistExceptionthrown if the table does not exist 
in the catalog.
+* @throws TableNotPartitionedException thrown if the table is 
not partitioned.
+* @throws CatalogException in case of any runtime exception
+*/
+   List listPartitions(ObjectPath 
tablePath, CatalogPartition.PartitionSpec partitionSpecs)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException;
+
+   /**
+* Gets a partition of the given table.
+*
+* @param tablePath Path of the table
+* @param partitionSpecsPartition spec of partition to get
+* @return The requested partition.
+*
+* @throws TableNotExistExceptionthrown if the table does not exist 
in the catalog.
+* @throws TableNotPartitionedException thrown if the table is 
not partitioned.
+* @throws PartitionNotExistException   thrown if the partition 
is not partitioned.
+* @throws CatalogException in case of any runtime exception
+*/
+   CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartition.PartitionSpec partitionSpecs)
 
 Review comment:
   1. it will throw PartitionNotFoundException. I added additional javadoc for 
this behavior.
   2. will rename it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277849954
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
+
+   /**
+* Get the PartitionSpec of the partition.
+*
+* @return the PartitionSpec of the partition.
+*/
+   PartitionSpec getPartitionSpec();
 
 Review comment:
   sounds good


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ijuma commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.2.0

2019-04-23 Thread GitBox
ijuma commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.2.0
URL: https://github.com/apache/flink/pull/7156#issuecomment-485949235
 
 
   Thanks for the reference. I'll close this PR. Hopefully #8055 gets reviewed 
and merged more quickly than this one. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277833211
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import java.util.Set;
+
+/**
+ * CatalogManager manages all the registered ReadableCatalog instances with 
unique names.
+ * It has a concept of current catalog, which will be used when it is not 
given when referencing meta-objects.
+ */
+@PublicEvolving
+public interface CatalogManager {
+
+   /**
+* Register a catalog with a unique name.
+*
+* @param catalogName catalog name to register
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
take
+*/
+   void registerCatalog(String catalogName, ReadableCatalog catalog) 
throws CatalogAlreadyExistsException;
+
+   /**
+* Get a catalog by name.
+*
+* @param catalogName catalog name
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+   ReadableCatalog getCatalog(String catalogName) throws 
CatalogNotExistException;
+
+   /**
+* Get names of all registered catalog.
+*
+* @return a set of names of registered catalogs
+*/
+   Set getCatalogNames();
+
+   /**
+* Get the current catalog.
+*
+* @return the current catalog
+*/
+   ReadableCatalog getCurrentCatalog();
+
+   /**
+* Get name of the current database.
+*
+* @return name of the current database
+*/
+   String getCurrentDatabaseName();
 
 Review comment:
   At CatalogManager level,  It seems we only need get/setCurrentCatalog() 
because  get/setCurrentDatabase() are already defined in Catalog API, 
specifically, in ReadableCatalog class. Thus, we might want to remove 
getCurrentDatabaseName() and  setCurrentCatalogAndDatabase(), and only keep 
get/setCurrentCatalog() in CatalogManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add 
partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277790658
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 ##
 @@ -132,4 +134,58 @@
 */
boolean tableExists(ObjectPath objectPath) throws CatalogException;
 
+   // -- partitions --
+
+   /**
+* Gets PartitionSpec of all partitions of the table.
+*
+* @param tablePath Path of the table.
+* @return  A list of PartitionSpec of the table.
+*
+* @throws TableNotExistExceptionthrown if the table does not exist 
in the catalog.
+* @throws TableNotPartitionedException thrown if the table is 
not partitioned.
+* @throws CatalogException in case of any runtime exception
+*/
+   List listPartitions(ObjectPath 
tablePath)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException;
+
+   /**
+* Gets PartitionSpec of all partitions that is under the given 
PartitionSpec in the table .
+*
+* @param tablePath Path of the table
+* @param partitionSpecsThe partition spec to list
+* @return A list of PartitionSpec that is under the given ParitionSpec 
in the table.
+*
+* @throws TableNotExistExceptionthrown if the table does not exist 
in the catalog.
+* @throws TableNotPartitionedException thrown if the table is 
not partitioned.
+* @throws CatalogException in case of any runtime exception
+*/
+   List listPartitions(ObjectPath 
tablePath, CatalogPartition.PartitionSpec partitionSpecs)
 
 Review comment:
   1. partitionSpecs -> partitionSpec?
   2. Do we need to listPartitions()? The first version seemingly can be 
replaced by the second one when partitionSpec is null or empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add 
partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277804076
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
+
+   /**
+* Get the PartitionSpec of the partition.
+*
+* @return the PartitionSpec of the partition.
+*/
+   PartitionSpec getPartitionSpec();
 
 Review comment:
   PartitionSpec is more like the name of a partition. Similar to the name of a 
table or function, which isn't included in the class definition, I think 
PartitionSpec should also stand out of Partition class for consistency. 
Partition class should care mostly about things other than naming, such as 
schema, stats, properties, etc.
   
   With this change, some of the added, partition-related APIs may subject to 
changes as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add 
partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277804076
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
+
+   /**
+* Get the PartitionSpec of the partition.
+*
+* @return the PartitionSpec of the partition.
+*/
+   PartitionSpec getPartitionSpec();
 
 Review comment:
   PartitionSpec is more like the name of a partition. Similar to the name of a 
table or function, which isn't included in the class definition, I think 
PartitionSpec should also stand out of Partition class for consistency. 
Partition class should care mostly about things other than naming, such as 
schema, stats, properties, etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277803504
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 ##
 @@ -29,4 +32,15 @@
 * @return table statistics
 */
TableStats getStatistics();
+
+   /**
+* Check if the table is partitioend or not.
+*/
+   boolean isPartitioned();
+
+   /**
+* Get the partition keys of the table. This will be an empty set if 
the table is not partitioned.
+* @return partition keys of the table.
+*/
 
 Review comment:
   good catch


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] 
Add partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277801129
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Represents a partition object in catalog.
+ */
+public interface CatalogPartition {
+
+   /**
+* Get the PartitionSpec of the partition.
+*
+* @return the PartitionSpec of the partition.
+*/
+   PartitionSpec getPartitionSpec();
+
+   /**
+* Get a map of properties associated with the partition.
+*
+* @return a map of properties with the partition
+*/
+   Map getProperties();
+
+   /**
+* Get a deep copy of the CatalogPartition instance.
+*
+* @return a copy of CatalogPartition instance
+*/
+   CatalogPartition copy();
+
+   /**
+* Get a brief description of the database.
+*
+* @return an optional short description of the database
+*/
+   Optional getDescription();
+
+   /**
+* Get a detailed description of the database.
+*
+* @return an optional long description of the database
+*/
+   Optional getDetailedDescription();
+
+   /**
+* Represents a partition spec object.
+* Partition columns and values are NOT of strict order, and they need 
to be re-arranged to the correct order
+* by comparing with a list of strictly ordered partition keys.
+*/
+   interface PartitionSpec {
 
 Review comment:
   Static vs. dynamic partition is more of a concept of how to insert/load 
data. Please see 
[here](https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions)
   
   Their logical representation should be same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277796313
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ * This mapping is modeled as a strict two-level reference structure for Flink 
in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final ReadableCatalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, ReadableCatalog 
catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Looks up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName Name of sub-schema to look up.
+* @return Sub-schema with a given dbName, or null.
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(
+   new DatabaseNotExistException(catalogName, 
schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support getTable()");
+   }
+
+   @Override
+   public Set getTableNames() {
+   throw new UnsupportedOperationException(
 
 Review comment:
   ok, sounds good


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277796069
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ * This mapping is modeled as a strict two-level reference structure for Flink 
in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final ReadableCatalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, ReadableCatalog 
catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Looks up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName Name of sub-schema to look up.
+* @return Sub-schema with a given dbName, or null.
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(
+   new DatabaseNotExistException(catalogName, 
schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support getTable()");
+   }
+
+   @Override
+   public Set getTableNames() {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support 
getTableNames()");
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return new RelProtoDataType() {
+   @Override
+   public RelDataType apply(RelDataTypeFactory 
relDataTypeFactory) {
 
 Review comment:
   ok, sounds good


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git 

[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add 
partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277794804
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistException.java
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for adding an already existed partition.
 
 Review comment:
   This document can be made consistent with other similar exception classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277795351
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ * This mapping is modeled as a strict two-level reference structure for Flink 
in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final ReadableCatalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, ReadableCatalog 
catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Looks up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName Name of sub-schema to look up.
+* @return Sub-schema with a given dbName, or null.
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(
+   new DatabaseNotExistException(catalogName, 
schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support getTable()");
+   }
+
+   @Override
+   public Set getTableNames() {
+   throw new UnsupportedOperationException(
 
 Review comment:
   I agree that we should throw exception when `getTable` method is called. I 
see no reason though why can't we return empty table set here. I am not sure 
where calcite can call this method, we do not fully control the scope of this 
method. I think it doesn't harm us to return an `emptySet` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog

2019-04-23 Thread GitBox
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add 
partition related catalog APIs and implement them in GenericInMemoryCatalog
URL: https://github.com/apache/flink/pull/8222#discussion_r277795191
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to operate partition against a non-partitioned table.
 
 Review comment:
   against -> on


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-04-23 Thread GitBox
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r277793581
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ * This mapping is modeled as a strict two-level reference structure for Flink 
in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final ReadableCatalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, ReadableCatalog 
catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Looks up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName Name of sub-schema to look up.
+* @return Sub-schema with a given dbName, or null.
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(
+   new DatabaseNotExistException(catalogName, 
schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support getTable()");
+   }
+
+   @Override
+   public Set getTableNames() {
+   throw new UnsupportedOperationException(
+   "CatalogCalciteSchema does not support 
getTableNames()");
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return new RelProtoDataType() {
+   @Override
+   public RelDataType apply(RelDataTypeFactory 
relDataTypeFactory) {
 
 Review comment:
   I would prefer returning `null`, otherwise it is a bit misleading for me, 
because as far as I understand the `Schema` interface it is only for 
user/catalog specific types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

  1   2   3   4   >