[jira] [Commented] (FLINK-12308) Support python language in Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824834#comment-16824834 ] Dian Fu commented on FLINK-12308: - Thanks [~sunjincheng121] for creating this ticket. A big +1 on this feature. > Support python language in Flink Table API > -- > > Key: FLINK-12308 > URL: https://issues.apache.org/jira/browse/FLINK-12308 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI, the > Table API will become the first-class citizen. Table API is declarative, and > can be automatically optimized, which is mentioned in the Flink mid-term > roadmap by Stephan. So, first considering supporting Python at the Table > level to cater to the current large number of analytics users. And Flink's > goal for Python Table API as follows: > * Users can write Flink Table API job in Python, and should mirror Java / > Scala Table API > * Users can submit Python Table API job in the following ways: > ** Submit a job with python script, integrate with `flink run` > ** Submit a job with python script by REST service > ** Submit a job in an interactive way, similar `scala-shell` > ** Local debug in IDE. > * Users can write custom functions(UDF, UDTF, UDAF) > * Pandas functions can be used in Flink Python Table API > A more detailed description can be found in FLIP-38(Will be done soon). > For the API level, we make the following plan: > * The short-term: > We may initially go with a simple approach to map the Python Table API to > the Java Table API via Py4J. > * The long-term: > We may need to create a Python API that follows the same structure as > Flink's Table API that produces the language-independent DAG. (As Stephan > already motioned on the [mailing > thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
wuchong commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#issuecomment-486071536 Thanks @godfreyhe . Hi @walterddr @twalthr @KurtYoung , I will merge this if you have no other comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
godfreyhe commented on issue #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#issuecomment-486071031 LGTM, +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on issue #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#issuecomment-486070424 Pushed more updates based on review feedback here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12308) Support python language in Flink Table API
sunjincheng created FLINK-12308: --- Summary: Support python language in Flink Table API Key: FLINK-12308 URL: https://issues.apache.org/jira/browse/FLINK-12308 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.9.0 Reporter: sunjincheng Assignee: sunjincheng At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI, the Table API will become the first-class citizen. Table API is declarative, and can be automatically optimized, which is mentioned in the Flink mid-term roadmap by Stephan. So, first considering supporting Python at the Table level to cater to the current large number of analytics users. And Flink's goal for Python Table API as follows: * Users can write Flink Table API job in Python, and should mirror Java / Scala Table API * Users can submit Python Table API job in the following ways: ** Submit a job with python script, integrate with `flink run` ** Submit a job with python script by REST service ** Submit a job in an interactive way, similar `scala-shell` ** Local debug in IDE. * Users can write custom functions(UDF, UDTF, UDAF) * Pandas functions can be used in Flink Python Table API A more detailed description can be found in FLIP-38(Will be done soon). For the API level, we make the following plan: * The short-term: We may initially go with a simple approach to map the Python Table API to the Java Table API via Py4J. * The long-term: We may need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG. (As Stephan already motioned on the [mailing thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Kejian-Li closed pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files.
Kejian-Li closed pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files. URL: https://github.com/apache/flink/pull/8247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Kejian-Li opened a new pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files.
Kejian-Li opened a new pull request #8247: Change the name of variable "log" into "LOG" to correspond to other class files. URL: https://github.com/apache/flink/pull/8247 ## What is the purpose of the change change the name of variable "log" into "LOG" to correspond to other class files. ## Brief change log log -> LOG ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277954981 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); Review comment: close() will be called after each unit test finishes to remove side-effects between tests. Leaving dropFunction() within unit tests, for example here, may mislead people that we are intentionally testing it, though it's only used for cleanup purpose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277953303 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); Review comment: I think having it here is okay so that it doesn't pass the side-effect to other test cases. It's test code and harmless. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277954019 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java ## @@ -132,4 +133,36 @@ */ boolean tableExists(ObjectPath objectPath) throws CatalogException; + // -- functions -- + + /** +* List the names of all functions in the given database. An empty list is returned if none is registered. +* +* @param dbNameName of the database. Review comment: I prefer reformatting happens in a separate PR as it would otherwise pollute the review here. However, I have reformatted the newly added methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277953303 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); Review comment: I think having it here is okay so that it doesn't pass the side-effect to other test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277953161 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.table.catalog.ObjectPath; + +/** + * Exception for trying to create a function that already exists. + */ +public class FunctionAlreadyExistException extends Exception { Review comment: The current name is consistent with other exceptions. Exception renaming will be handled by a separate JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277952965 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function +*/ + Map getProperties(); Review comment: Properties store any property that's not captured in the class definition. Hive Jar URI would be a very good example. Other things could be function_creation_time, last_modified_time, etc. Since this is just API, each catalog can have its specific properties that it needs in order to make a function work. Not sure about what you meant by "customize JarResource". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
xuefuz commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277950610 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { Review comment: Yes, this is what we have in FLIP, but I think it's okay to leave this out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-4653) Refactor JobClientActor to adapt to the new Rpc framework and new cluster managerment
[ https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jing Zhang closed FLINK-4653. - Resolution: Invalid > Refactor JobClientActor to adapt to the new Rpc framework and new cluster > managerment > - > > Key: FLINK-4653 > URL: https://issues.apache.org/jira/browse/FLINK-4653 > Project: Flink > Issue Type: Sub-task > Components: Command Line Client >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > > 1. Create a RpcEndpoint(temporary named JobInfoTracker) and > RpcGateway(temporary named JobInfoTrackerGateway) to replace the old > JobClientActor. > 2. Change rpc message communication in JobClientActor to rpc method call to > apply to the new rpc framework. > 3. JobInfoTracker is responsible for waiting for the jobStateChange and > jobResult util job complete. But it is not responsible for submitting new job > because jobSubmission behavior is different in different cluster -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277950481 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import java.util.Set; + +/** + * CatalogManager manages all the registered ReadableCatalog instances with unique names. + * It has a concept of current catalog, which will be used when it is not given when referencing meta-objects. + */ +@PublicEvolving +public interface CatalogManager { + + /** +* Register a catalog with a unique name. +* +* @param catalogName catalog name to register +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already take +*/ + void registerCatalog(String catalogName, ReadableCatalog catalog) throws CatalogAlreadyExistsException; + + /** +* Get a catalog by name. +* +* @param catalogName catalog name +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + ReadableCatalog getCatalog(String catalogName) throws CatalogNotExistException; + + /** +* Get names of all registered catalog. +* +* @return a set of names of registered catalogs +*/ + Set getCatalogNames(); + + /** +* Get the current catalog. +* +* @return the current catalog +*/ + ReadableCatalog getCurrentCatalog(); + + /** +* Get name of the current database. +* +* @return name of the current database +*/ + String getCurrentDatabaseName(); Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277948967 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); Review comment: missing catalog name in the message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949377 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions_DatabaseNotExistException() throws Exception{ + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.listFunctions(db1); + } + + @Test + public void testGetFunction_FunctionNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(FunctionNotExistException.class); +
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277948682 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949973 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function Review comment: add an empty line between comment and `@return` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r27795 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949417 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions_DatabaseNotExistException() throws Exception{ + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.listFunctions(db1); + } + + @Test + public void testGetFunction_FunctionNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(FunctionNotExistException.class); +
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277950050 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function +*/ + Map getProperties(); + + /** +* Create a deep copy of the function. +* @return a deep copy of "this" instance Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277948110 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.table.catalog.ObjectPath; + +/** + * Exception for trying to create a function that already exists. + */ +public class FunctionAlreadyExistException extends Exception { Review comment: ```suggestion public class FunctionAlreadyExistsException extends Exception { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949318 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); Review comment: remove this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949341 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0)); + + catalog.dropFunction(path1, false); Review comment: remove these This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949437 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions_DatabaseNotExistException() throws Exception{ + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.listFunctions(db1); + } + + @Test + public void testGetFunction_FunctionNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(FunctionNotExistException.class); +
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277950075 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java ## @@ -132,4 +133,36 @@ */ boolean tableExists(ObjectPath objectPath) throws CatalogException; + // -- functions -- + + /** +* List the names of all functions in the given database. An empty list is returned if none is registered. +* +* @param dbNameName of the database. Review comment: replace tab with space, also should use lower case for starting character, like "name of the database" all javadoc in this file needs some reformatting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949134 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); Review comment: ```suggestion exception.expectMessage("Database db1 does not exist in catalog"); ``` seems no need to have an upper "C". Change the "C" to "c" in DatabaseNotExistException? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277950023 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function Review comment: empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277947738 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * A generic catalog function implementation. + */ +public class GenericCatalogFunction implements CatalogFunction { + + private final String clazzName; // Fully qualified class name of the function Review comment: className? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277948736 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277948627 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); Review comment: we don't need these two drop commands, as they are already taken care in close() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277949400 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java ## @@ -562,6 +568,158 @@ public void testRenameView() throws Exception { catalog.dropTable(viewPath2, false); } + // -- functions -- + + @Test + public void testCreateFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + assertFalse(catalog.functionExists(path1)); + + catalog.createFunction(path1, createFunction(), false); + + assertTrue(catalog.functionExists(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testCreateFunction_DatabaseNotExistException() throws Exception { + assertFalse(catalog.databaseExists(db1)); + + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.createFunction(path1, createFunction(), false); + + exception.expect(FunctionAlreadyExistException.class); + exception.expectMessage("Function db1.t1 already exists in Catalog"); + catalog.createFunction(path1, createFunction(), false); + } + + @Test + public void testCreateFunction_FunctionAlreadyExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.createFunction(path1, createAnotherFunction(), true); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + CatalogTestUtil.checkEquals(func, catalog.getFunction(path1)); + + CatalogFunction newFunc = createAnotherFunction(); + catalog.alterFunction(path1, newFunc, false); + + assertNotEquals(func, catalog.getFunction(path1)); + CatalogTestUtil.checkEquals(newFunc, catalog.getFunction(path1)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testAlterFunction_FunctionNotExistException() throws Exception { + exception.expect(FunctionNotExistException.class); + exception.expectMessage("Function db1.nonexist does not exist in Catalog"); + catalog.alterFunction(nonExistObjectPath, createFunction(), false); + } + + @Test + public void testAlterFunction_FunctionNotExist_ignored() throws Exception { + catalog.createDatabase(db1, createDb(), false); + catalog.alterFunction(nonExistObjectPath, createFunction(), true); + + assertFalse(catalog.functionExists(nonExistObjectPath)); + + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + CatalogFunction func = createFunction(); + catalog.createFunction(path1, func, false); + + assertEquals(path1.getObjectName(), catalog.listFunctions(db1).get(0)); + + catalog.dropFunction(path1, false); + catalog.dropDatabase(db1, false); + } + + @Test + public void testListFunctions_DatabaseNotExistException() throws Exception{ + exception.expect(DatabaseNotExistException.class); + exception.expectMessage("Database db1 does not exist in Catalog"); + catalog.listFunctions(db1); + } + + @Test + public void testGetFunction_FunctionNotExistException() throws Exception { + catalog.createDatabase(db1, createDb(), false); + + exception.expect(FunctionNotExistException.class); +
[jira] [Created] (FLINK-12307) Support translation from StreamExecWindowJoin to StreamTransformation.
Jing Zhang created FLINK-12307: -- Summary: Support translation from StreamExecWindowJoin to StreamTransformation. Key: FLINK-12307 URL: https://issues.apache.org/jira/browse/FLINK-12307 Project: Flink Issue Type: Task Components: Table SQL / API Reporter: Jing Zhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#discussion_r277949412 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml ## @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
eaglewatcherwb commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r277948095 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Objects; + +/** + * Id identifying {@link ExecutionVertex}. + */ +public class ExecutionVertexID { Review comment: Shall we use `SchedulingExecutionVertexID` to identify this id is used in scheduler more clearly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
JingsongLi commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#discussion_r277948264 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml ## @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
[GitHub] [flink] wuchong commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
wuchong commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#discussion_r277944655 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml ## @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
[jira] [Commented] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
[ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824781#comment-16824781 ] Shengnan YU commented on FLINK-11848: - It does not work, I have looked up the source code. Flink partition discovery get the topic list from consumer metadata. However the warning occurs when consumer fetch metadata from cluster. It is a issue with kafka-client not flink actually. Thank you very much for help. > Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION > > > Key: FLINK-11848 > URL: https://issues.apache.org/jira/browse/FLINK-11848 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4 >Reporter: Shengnan YU >Assignee: frank wang >Priority: Major > > Recently we are doing some streaming jobs with apache flink. There are > multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex > pattern to let a consumer to consume those topics. However, if we delete some > older topics, it seems that the metadata in consumer does not update properly > so It still remember those outdated topic in its topic list, which leads to > *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It > seems to occur in producer as well. Any idea to solve this problem? Thank you > very much! > > Example to reproduce problem: > There are multiple kafka topics which are > "test20190310","test20190311","test20190312" for instance. I run the job and > everything is ok. Then if I delete topic "test20190310", the consumer does > not perceive the topic is deleted, it will still go fetching metadata of that > topic. In taskmanager's log, unknown errors display. > {code:java} > public static void main(String []args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.rest", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > props.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, >"120"); > Pattern topics = Pattern.compile("^test.*$"); > FlinkKafkaConsumer011 consumer = new > FlinkKafkaConsumer011<>(topics, new SimpleStringSchema(), props); > DataStream stream = env.addSource(consumer); > stream.writeToSocket("localhost", 4, new SimpleStringSchema()); > env.execute("test"); > } > } > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] link3280 edited a comment on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem
link3280 edited a comment on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem URL: https://github.com/apache/flink/pull/8068#issuecomment-486042889 > Moreover, I was wondering whether `SnapshotDirectory.temporary` should always return a local `SnapshotDirectory`. If this should be the case, then one could change the signature to `SnapshotDirectory.temporary(File)` in order to avoid the wrong usage. Maybe @StefanRRichter could help to answer this question. IMHO, we should only use the local file system to store task local snapshots for efficiency and simplicity, and I'm not sure if Rocksdb can checkpoint to other file systems. Another alternative to avoid wrong `FileSystem` could be using `SnapshotDirectory(Path, FileSystem)` instead when creating temporary and permanent snapshot directories. I've updated the code and added a test on the ground of the above points, and will update the PR if @StefanRRichter confirms that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task
[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-12296: -- Description: As the mail list said[1], there may be a problem when more than one operator chained in a single task, and all the operators have states, we'll encounter data loss silently problem. Currently, the local directory we used is like below ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), if more than one operator chained in a single task, and all the operators have states, then all the operators will share the same local directory(because the vertext_id is the same), this will lead a data loss problem. The path generation logic is below: {code:java} // LocalRecoveryDirectoryProviderImpl.java @Override public File subtaskSpecificCheckpointDirectory(long checkpointId) { return new File(subtaskBaseDirectory(checkpointId), checkpointDirString(checkpointId)); } @VisibleForTesting String subtaskDirString() { return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + subtaskIndex).toString(); } @VisibleForTesting String checkpointDirString(long checkpointId) { return "chk_" + checkpointId; } {code} [1] [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] was: As the mail list said[1], there may be a problem when more than one operator chained in a single task, and all the operators have states, we'll encounter data loss silently problem. Currently, the local directory we used is like below ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), if more than one operator chained in a single task, and all the operators have states, then all the operators will share the same local directory(because the vertext_id is the same), this will lead a data loss problem. The path generation logic is below: {code:java} // LocalRecoveryDirectoryProviderImpl.java @Override public File subtaskSpecificCheckpointDirectory(long checkpointId) { return new File(subtaskBaseDirectory(checkpointId), checkpointDirString(checkpointId)); } @VisibleForTesting String subtaskDirString() { return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + subtaskIndex).toString(); } @VisibleForTesting String checkpointDirString(long checkpointId) { return "chk_" + checkpointId; } {code} [1] [https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7] > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > --- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Critical > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { >return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { >return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { >return "chk_" + checkpointId; > } > {code} > [1] > [http://mail-archives.apache.org/mod_mbox/flink-user/201904.mbox/%3cm2ef5tpfwy.wl-nings...@gmail.com%3E] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824772#comment-16824772 ] Jark Wu commented on FLINK-11614: - I have left comments in your pull request. [~yangfei] > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8200: [FLINK-11614] [docs-zh] Translate the "Configuring Dependencies" page into Chinese
wuchong commented on issue #8200: [FLINK-11614] [docs-zh] Translate the "Configuring Dependencies" page into Chinese URL: https://github.com/apache/flink/pull/8200#issuecomment-486050262 Hi @Leev , please do not MERGE master. You can use `IDEA > VCS > Git > Rebase` tool to rebase commits if you are using IntelliJ IDEA. You can also create a new clean branch, and copy the changes to the new branch, and then force update your remote Leev:FLINK-11614 branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11636) Translate "State Schema Evolution" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824762#comment-16824762 ] Congxian Qiu(klion26) commented on FLINK-11636: --- [~yangfei] Now I'm not working on it, please assign it to yourself if you want. > Translate "State Schema Evolution" into Chinese > --- > > Key: FLINK-11636 > URL: https://issues.apache.org/jira/browse/FLINK-11636 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Priority: Major > > doc locates in flink/docs/dev/stream/state/schema_evolution.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277934970 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function +*/ + Map getProperties(); + + /** +* Create a deep copy of the function. +* @return a deep copy of "this" instance +*/ + CatalogFunction copy(); + + /** +* Get a brief description of the table or view. +* +* @return an optional short description of the table/view +*/ + Optional getDescription(); + + /** +* Get a detailed description of the table or view. Review comment: table or view => function? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277936243 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function +*/ + Map getProperties(); Review comment: What are the properties? Any choice to customize JarResource? like `HiveFunction.resourceUris` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277932067 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); Review comment: Why not ClassName? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277937225 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java ## @@ -131,4 +133,47 @@ void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfE void alterTable(ObjectPath tableName, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException; + // -- functions -- + + /** +* Create a function. +* +* @param functionPath Path of the function +* @param function The function to be created Review comment: Comments can be considered for alignment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277934671 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { Review comment: Any other source? I don't quite understand the meaning of adding this. This method doesn't seem to work. Wait until we add other Sources? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277936838 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -269,4 +273,79 @@ public boolean tableExists(ObjectPath tablePath) { return tablePath != null && databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath); } + // -- functions -- + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException { + checkArgument(functionPath != null); Review comment: prefer `checkNotNull` to these `checkArgument` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs
JingsongLi commented on a change in pull request #8212: [FLINK-11519][table] Add function related catalog APIs URL: https://github.com/apache/flink/pull/8212#discussion_r277934944 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Interface for a function in a catalog. + */ +public interface CatalogFunction { + + /** +* Get the source of the function. Right now, CLASS only. +* @return the source of the function +*/ + default Source getSource() { + return Source.CLASS; + } + + /** +* Get the full name of the class backing the function. +* @return the full name of the class +*/ + String getClazzName(); + + /** +* Get the properties of the function. +* @return the properties of the function +*/ + Map getProperties(); + + /** +* Create a deep copy of the function. +* @return a deep copy of "this" instance +*/ + CatalogFunction copy(); + + /** +* Get a brief description of the table or view. Review comment: table or view => function? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11636) Translate "State Schema Evolution" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824761#comment-16824761 ] YangFei commented on FLINK-11636: - Hi [~klion26] , I want to know if you are working on this issue .If not , can I assign this issue to myself, thanks > Translate "State Schema Evolution" into Chinese > --- > > Key: FLINK-11636 > URL: https://issues.apache.org/jira/browse/FLINK-11636 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Priority: Major > > doc locates in flink/docs/dev/stream/state/schema_evolution.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824755#comment-16824755 ] YangFei commented on FLINK-11614: - Hi [~jark]. I have made some changes after reading [“Flink Translation Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications]. waiting to review. thanks > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YangFei updated FLINK-11614: Comment: was deleted (was: Hi JarkWu . I have made some changes after reading [“Flink Translation Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications]. waiting to review. thanks) > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11614) Translate the "Configuring Dependencies" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824754#comment-16824754 ] YangFei commented on FLINK-11614: - Hi JarkWu . I have made some changes after reading [“Flink Translation Specifications”|https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications]. waiting to review. thanks > Translate the "Configuring Dependencies" page into Chinese > -- > > Key: FLINK-11614 > URL: https://issues.apache.org/jira/browse/FLINK-11614 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Jark Wu >Assignee: YangFei >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/dev/projectsetup/dependencies.html > The markdown file is located in flink/docs/dev/projectsetup/dependencies.zh.md > The markdown file will be created once FLINK-11529 is merged. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem URL: https://github.com/apache/flink/pull/8068#issuecomment-486042889 > Moreover, I was wondering whether `SnapshotDirectory.temporary` should always return a local `SnapshotDirectory`. If this should be the case, then one could change the signature to `SnapshotDirectory.temporary(File)` in order to avoid the wrong usage. Maybe @StefanRRichter could help to answer this question. IMHO, we should only use the local file system to store task local snapshots for efficiency and simplicity, and I'm not sure if Rocksdb can checkpoint to other file systems. Another alternative to avoid wrong `FileSystem` could be using `SnapshotDirectory(Path, FileSystem)` instead when creating temporary and permanent snapshot directories. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager URL: https://github.com/apache/flink/pull/8236#discussion_r277935627 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -387,11 +387,11 @@ public void release(MemorySegment segment) { * * @param segments The segments to be released. * @throws NullPointerException Thrown, if the given collection is null. -* @throws IllegalArgumentException Thrown, id the segments are of an incompatible type. +* @throws IllegalArgumentException Thrown, if the segments are of an incompatible type. */ public void release(Collection segments) { if (segments == null) { - return; + throw new NullPointerException(); Review comment: That sounds reasonable. Thank you for your comments. Hi @StephanEwen, would you please give some comments? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11945) Support over aggregation for blink streaming runtime
[ https://issues.apache.org/jira/browse/FLINK-11945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-11945: Summary: Support over aggregation for blink streaming runtime (was: Support Over aggregation for SQL) > Support over aggregation for blink streaming runtime > > > Key: FLINK-11945 > URL: https://issues.apache.org/jira/browse/FLINK-11945 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jark Wu >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > This is a simple port the over aggregation implementation from > {{flink-table-planner}} to {{flink-table-planner-blink}}. > Note: please also convert the over function implementation from Scala to Java. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Aitozi commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
Aitozi commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager URL: https://github.com/apache/flink/pull/8236#discussion_r277933794 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -387,11 +387,11 @@ public void release(MemorySegment segment) { * * @param segments The segments to be released. * @throws NullPointerException Thrown, if the given collection is null. -* @throws IllegalArgumentException Thrown, id the segments are of an incompatible type. +* @throws IllegalArgumentException Thrown, if the segments are of an incompatible type. */ public void release(Collection segments) { if (segments == null) { - return; + throw new NullPointerException(); Review comment: It's truly inconsistent, but this fix will change the default behaviour of the release operation, and the `release(MemorySegment segment)` also skip the null... I think may be it need @StephanEwen author of the `MemoryManager.java` to confirm, Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12304) AvroInputFormat should support schema evolution
[ https://issues.apache.org/jira/browse/FLINK-12304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-12304: Assignee: vinoyang > AvroInputFormat should support schema evolution > --- > > Key: FLINK-12304 > URL: https://issues.apache.org/jira/browse/FLINK-12304 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.8.0 >Reporter: John >Assignee: vinoyang >Priority: Major > > From the avro spec: > _A reader of Avro data, whether from an RPC or a file, can always parse that > data because its schema is provided. But that schema may not be exactly the > schema that was expected. For example, if the data was written with a > different version of the software than it is read, then records may have had > fields added or removed._ > The AvroInputFormat should allow the application to supply a reader's schema > to support cases where data was written with an old version of a schema and > needs to be read with a newer version. The reader's schema can have addition > fields with defaults so that the old schema can be adapted to the new. The > underlying avro java library supports schema resolution, so adding support in > AvroInputFormat should be straight forward. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet
flinkbot commented on issue #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet URL: https://github.com/apache/flink/pull/8246#issuecomment-486039200 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem
link3280 commented on issue #8068: [FLINK-12042][StateBackends] Fix RocksDBStateBackend's mistaken usage of default filesystem URL: https://github.com/apache/flink/pull/8068#issuecomment-486039068 Thanks for the review @tillrohrmann! To answer the first three questions: the `IllegalStateException` should be thrown by the following lines in 1.6.4, 1.7.2 and 1.8.0 respectively. https://github.com/apache/flink/blob/6f4148180ba372a2c12c1d54bea8579350af6c98/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L2568 https://github.com/apache/flink/blob/ceba8af39b28b91ae6f2d5cbdb1b99258a73e742/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L411 https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L410 > Independent of the question whether to store the data locally or not, I would like to understand where the problem comes from because the SnapshotDirectory seems to use the right FileSystem implementation to check the existence. Now we are using `SnapshotDirectory(Path)` to create the instance, which will infer its scheme from the Path URI, and get the corresponding `FileSystem` based on that. https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java#L66 If the scheme is missing in the URI, we get the default `FileSystem` which can be set by `fs.default-scheme`. https://github.com/apache/flink/blob/4caec0d4bab497d7f9a8d9fec4680089117593df/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L334 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sjwiesman opened a new pull request #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet
sjwiesman opened a new pull request #8246: [hotfix] Add error message to precondition in HeapPriorityQueueSet URL: https://github.com/apache/flink/pull/8246 ## What is the purpose of the change Add error message to precondition in HeapPriorityQueueSet ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager
liyafan82 commented on a change in pull request #8236: [FLINK-12289][flink-runtime]Fix bugs and typos in Memory manager URL: https://github.com/apache/flink/pull/8236#discussion_r277931471 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -387,11 +387,11 @@ public void release(MemorySegment segment) { * * @param segments The segments to be released. * @throws NullPointerException Thrown, if the given collection is null. -* @throws IllegalArgumentException Thrown, id the segments are of an incompatible type. +* @throws IllegalArgumentException Thrown, if the segments are of an incompatible type. */ public void release(Collection segments) { if (segments == null) { - return; + throw new NullPointerException(); Review comment: > If we get a `null` to release, why can't we just skip it? I think it's a defensive code not a bug. Or can you post some bad situation about this? Hi Aitozi, thanks a lot for your comment. The JavaDoc for this method declares "@throws NullPointerException Thrown, if the given collection is null." So we should either throw an NPE, or change the JavaDoc, to make it consistent. By referencing other methods in this class, I think it is preferable to make the method throw NPE, because the methods should follow a consistent style. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi closed pull request #4935: [Flink-7945][Metrics]Fix per partition-lag metric lost in kafka connector
Aitozi closed pull request #4935: [Flink-7945][Metrics]Fix per partition-lag metric lost in kafka connector URL: https://github.com/apache/flink/pull/4935 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi closed pull request #7543: [FLINK-10996]Fix the possible state leak with CEP processing time model
Aitozi closed pull request #7543: [FLINK-10996]Fix the possible state leak with CEP processing time model URL: https://github.com/apache/flink/pull/7543 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10972) Enhancements to Flink Table API
[ https://issues.apache.org/jira/browse/FLINK-10972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-10972: Affects Version/s: 1.9.0 > Enhancements to Flink Table API > --- > > Key: FLINK-10972 > URL: https://issues.apache.org/jira/browse/FLINK-10972 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > [link title|http://example.com/]With the continuous efforts from the > community, the Flink system has been continuously improved, which has > attracted more and more users. Flink SQL is a canonical, widely used > relational query language. However, there are still some scenarios where > Flink SQL failed to meet user needs in terms of functionality and ease of > use, such as: > * In terms of functionality > Iteration, user-defined window, user-defined join, user-defined GroupReduce, > etc. Users cannot express them with SQL; > * In terms of ease of use > * Map - e.g. “dataStream.map(mapFun)”. Although “table.select(udf1(), > udf2(), udf3())” can be used to accomplish the same function., with a > map() function returning 100 columns, one has to define or call 100 UDFs when > using SQL, which is quite involved. > * FlatMap - e.g. “dataStrem.flatmap(flatMapFun)”. Similarly, it can be > implemented with “table.join(udtf).select()”. However, it is obvious that > datastream is easier to use than SQL. > Due to the above two reasons, In this JIRAs group, we will enhance the > TableAPI in stages. > --- > The first stage we seek to support (will describe the details in the sub > issue) : > * Table.map() > * Table.flatMap() > * GroupedTable.aggregate() > * GroupedTable.flatAggregate() > The FLIP can be find here: > [FLIP-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739] > > The second part is about column operator/operations: > 1) Table(schema) operators > * Add columns > * Replace columns > * Drop columns > * Rename columns > 2)Fine-grained column/row operations > * Column selection > * Row package and flatten > See [google > doc|https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"
[ https://issues.apache.org/jira/browse/FLINK-12306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kejian Li updated FLINK-12306: -- External issue URL: (was: https://github.com/apache/flink/pull/8245) > Change the name of variable "log" to Upper case "LOG" > - > > Key: FLINK-12306 > URL: https://issues.apache.org/jira/browse/FLINK-12306 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kejian Li >Priority: Trivial > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > > Change the name of variable "log" from lower case to upper case "LOG" to > correspond to other class files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Kejian-Li closed pull request #8245: Change the name of variable "log" to upper case "LOG"
Kejian-Li closed pull request #8245: Change the name of variable "log" to upper case "LOG" URL: https://github.com/apache/flink/pull/8245 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277851836 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { Review comment: good point. However, as far as I can tell, catalog APIs do not care about the actual location more than display them as properties in SQL. What actually really cares about the location is the data connector, but they will retrieve that information directly from hive metastore. I think we we start with what we have now and extend it if necessary later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"
[ https://issues.apache.org/jira/browse/FLINK-12306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kejian Li updated FLINK-12306: -- External issue URL: https://github.com/apache/flink/pull/8245 > Change the name of variable "log" to Upper case "LOG" > - > > Key: FLINK-12306 > URL: https://issues.apache.org/jira/browse/FLINK-12306 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.7.2, 1.8.0 >Reporter: Kejian Li >Priority: Trivial > Labels: pull-request-available > Fix For: 1.9.0, 1.8.1 > > > Change the name of variable "log" from lower case to upper case "LOG" to > correspond to other class files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8245: Change the name of variable "log" to upper case "LOG"
flinkbot commented on issue #8245: Change the name of variable "log" to upper case "LOG" URL: https://github.com/apache/flink/pull/8245#issuecomment-486030536 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Kejian-Li opened a new pull request #8245: Change the name of variable "log" to upper case "LOG"
Kejian-Li opened a new pull request #8245: Change the name of variable "log" to upper case "LOG" URL: https://github.com/apache/flink/pull/8245 ## What is the purpose of the change Change the name of variable "log" to upper case "LOG" to correspond to other class files. ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Kejian-Li closed pull request #8241: Update MainThreadValidatorUtil.java
Kejian-Li closed pull request #8241: Update MainThreadValidatorUtil.java URL: https://github.com/apache/flink/pull/8241 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12306) Change the name of variable "log" to Upper case "LOG"
Kejian Li created FLINK-12306: - Summary: Change the name of variable "log" to Upper case "LOG" Key: FLINK-12306 URL: https://issues.apache.org/jira/browse/FLINK-12306 Project: Flink Issue Type: Improvement Affects Versions: 1.8.0, 1.7.2 Reporter: Kejian Li Fix For: 1.9.0, 1.8.1 Change the name of variable "log" from lower case to upper case "LOG" to correspond to other class files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Kejian-Li commented on issue #8241: Update MainThreadValidatorUtil.java
Kejian-Li commented on issue #8241: Update MainThreadValidatorUtil.java URL: https://github.com/apache/flink/pull/8241#issuecomment-486024371 > @Kejian-Li thanks for your contribution, I think the target branch should be `master` and please update the title and description according to the the [doc](https://flink.apache.org/contribute-code.html) thanks very much This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner
walterddr commented on a change in pull request #8234: [FLINK-12288] [table-planner-blink] Bump Calcite dependency to 1.19.0 in blink planner URL: https://github.com/apache/flink/pull/8234#discussion_r277912372 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/CalcTest.xml ## @@ -85,7 +85,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
[GitHub] [flink] xuefuz commented on a change in pull request #8205: [FLINK-12238] [hive] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module
xuefuz commented on a change in pull request #8205: [FLINK-12238] [hive] Support database related operations in GenericHiveMetastoreCatalog and setup flink-connector-hive module URL: https://github.com/apache/flink/pull/8205#discussion_r277893627 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive; + +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. + */ +public class GenericHiveMetastoreCatalog implements ReadableWritableCatalog { + private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class); + + public static final String DEFAULT_DB = "default"; + + private final String catalogName; + private final HiveConf hiveConf; + + private String currentDatabase = DEFAULT_DB; + private IMetaStoreClient client; + + public GenericHiveMetastoreCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, getHiveConf(hivemetastoreURI)); + } + + public GenericHiveMetastoreCatalog(String catalogName, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + this.catalogName = catalogName; + + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); + LOG.info("Created GenericHiveMetastoreCatalog '{}'", catalogName); + } + + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); +
[jira] [Created] (FLINK-12305) Table API Clarification
Alex Barnes created FLINK-12305: --- Summary: Table API Clarification Key: FLINK-12305 URL: https://issues.apache.org/jira/browse/FLINK-12305 Project: Flink Issue Type: Improvement Components: Project Website Affects Versions: 1.8.0 Reporter: Alex Barnes It is not clear from the documentation if late arriving data is correctly handled in the Flink Table/SQL APIs. The documentation makes passing reference to recognizing late arriving data, but does not go into depth as to what kind of triggering/processing can be performed on it [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time] This old email thread on the apache-flink-users mailing list tells a different story - specifically that late arriving data is not supported and DataStream APIs need to be used instead: [http://osdir.com/apache-flink-users/msg08110.html] Has support been added since that email correspondence? Please consider reducing ambiguity in the documentation and update it to better reflect the current/planned state of support for late arriving data in the Table API. Thanks, Alex -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277853190 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java ## @@ -29,4 +32,15 @@ * @return table statistics */ TableStats getStatistics(); + + /** +* Check if the table is partitioend or not. +*/ + boolean isPartitioned(); + + /** +* Get the partition keys of the table. This will be an empty set if the table is not partitioned. +* @return partition keys of the table. +*/ + LinkedHashSet getPartitionKeys() throws TableNotPartitionedException; Review comment: I don't really understand the question. What do you mean restriction? Can you elaborate? You are right that Hive's [Table](https://hive.apache.org/javadocs/r2.3.4/api/org/apache/hadoop/hive/metastore/api/Table.html) uses a separate field dedicated for partition keys. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277853190 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java ## @@ -29,4 +32,15 @@ * @return table statistics */ TableStats getStatistics(); + + /** +* Check if the table is partitioend or not. +*/ + boolean isPartitioned(); + + /** +* Get the partition keys of the table. This will be an empty set if the table is not partitioned. +* @return partition keys of the table. +*/ + LinkedHashSet getPartitionKeys() throws TableNotPartitionedException; Review comment: I don't really understand your question. Can you elaborate? You are right that Hive's [Table](https://hive.apache.org/javadocs/r2.3.4/api/org/apache/hadoop/hive/metastore/api/Table.html) uses a separate field dedicated for partition keys. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12296) Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task
[ https://issues.apache.org/jira/browse/FLINK-12296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824491#comment-16824491 ] seed Z commented on FLINK-12296: Unchaining the operator makes the new job be able to save the state via checkpoint. It is unfortunately not able to resume from the old checkpoint (RocksDB backed incremental checkpoint) likely due to the way the path is composed > Data loss silently in RocksDBStateBackend when more than one operator(has > states) chained in a single task > --- > > Key: FLINK-12296 > URL: https://issues.apache.org/jira/browse/FLINK-12296 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.6.3, 1.6.4, 1.7.2, 1.8.0 >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Critical > > As the mail list said[1], there may be a problem when more than one operator > chained in a single task, and all the operators have states, we'll encounter > data loss silently problem. > Currently, the local directory we used is like below > ../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state), > > if more than one operator chained in a single task, and all the operators > have states, then all the operators will share the same local > directory(because the vertext_id is the same), this will lead a data loss > problem. > > The path generation logic is below: > {code:java} > // LocalRecoveryDirectoryProviderImpl.java > @Override > public File subtaskSpecificCheckpointDirectory(long checkpointId) { >return new File(subtaskBaseDirectory(checkpointId), > checkpointDirString(checkpointId)); > } > @VisibleForTesting > String subtaskDirString() { >return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" + > subtaskIndex).toString(); > } > @VisibleForTesting > String checkpointDirString(long checkpointId) { >return "chk_" + checkpointId; > } > {code} > [1] > [https://app.smartmailcloud.com/web-share/MDkE4DArUT2eoSv86xq772I1HDgMNTVhLEmsnbQ7] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277851836 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { Review comment: good point. How about we start with what we have now and extend it if necessary later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277851366 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { + + /** +* Get the PartitionSpec of the partition. +* +* @return the PartitionSpec of the partition. +*/ + PartitionSpec getPartitionSpec(); + + /** +* Get a map of properties associated with the partition. +* +* @return a map of properties with the partition +*/ + Map getProperties(); + + /** +* Get a deep copy of the CatalogPartition instance. +* +* @return a copy of CatalogPartition instance +*/ + CatalogPartition copy(); + + /** +* Get a brief description of the database. +* +* @return an optional short description of the database +*/ + Optional getDescription(); + + /** +* Get a detailed description of the database. +* +* @return an optional long description of the database +*/ + Optional getDetailedDescription(); + + /** +* Represents a partition spec object. +* Partition columns and values are NOT of strict order, and they need to be re-arranged to the correct order +* by comparing with a list of strictly ordered partition keys. +*/ + interface PartitionSpec { + /** +* Check if the current PartitionSpec contains all the partition columns and values of another PartitionSpec. +*/ + boolean contains(PartitionSpec another); Review comment: good point. checked Blink and it's only used in in-memory catalog impl. Will remove from API This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277851183 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java ## @@ -132,4 +134,58 @@ */ boolean tableExists(ObjectPath objectPath) throws CatalogException; + // -- partitions -- + + /** +* Gets PartitionSpec of all partitions of the table. +* +* @param tablePath Path of the table. +* @return A list of PartitionSpec of the table. +* +* @throws TableNotExistExceptionthrown if the table does not exist in the catalog. +* @throws TableNotPartitionedException thrown if the table is not partitioned. +* @throws CatalogException in case of any runtime exception +*/ + List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException; + + /** +* Gets PartitionSpec of all partitions that is under the given PartitionSpec in the table . +* +* @param tablePath Path of the table +* @param partitionSpecsThe partition spec to list +* @return A list of PartitionSpec that is under the given ParitionSpec in the table. +* +* @throws TableNotExistExceptionthrown if the table does not exist in the catalog. +* @throws TableNotPartitionedException thrown if the table is not partitioned. +* @throws CatalogException in case of any runtime exception +*/ + List listPartitions(ObjectPath tablePath, CatalogPartition.PartitionSpec partitionSpecs) + throws TableNotExistException, TableNotPartitionedException, CatalogException; + + /** +* Gets a partition of the given table. +* +* @param tablePath Path of the table +* @param partitionSpecsPartition spec of partition to get +* @return The requested partition. +* +* @throws TableNotExistExceptionthrown if the table does not exist in the catalog. +* @throws TableNotPartitionedException thrown if the table is not partitioned. +* @throws PartitionNotExistException thrown if the partition is not partitioned. +* @throws CatalogException in case of any runtime exception +*/ + CatalogPartition getPartition(ObjectPath tablePath, CatalogPartition.PartitionSpec partitionSpecs) Review comment: 1. it will throw PartitionNotFoundException. I added additional javadoc for this behavior. 2. will rename it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277849954 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { + + /** +* Get the PartitionSpec of the partition. +* +* @return the PartitionSpec of the partition. +*/ + PartitionSpec getPartitionSpec(); Review comment: sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ijuma commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.2.0
ijuma commented on issue #7156: [FLINK-10967] Update kafka dependency to 2.2.0 URL: https://github.com/apache/flink/pull/7156#issuecomment-485949235 Thanks for the reference. I'll close this PR. Hopefully #8055 gets reviewed and merged more quickly than this one. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
xuefuz commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277833211 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogManager.java ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; + +import java.util.Set; + +/** + * CatalogManager manages all the registered ReadableCatalog instances with unique names. + * It has a concept of current catalog, which will be used when it is not given when referencing meta-objects. + */ +@PublicEvolving +public interface CatalogManager { + + /** +* Register a catalog with a unique name. +* +* @param catalogName catalog name to register +* @param catalog catalog to register +* @throws CatalogAlreadyExistsException thrown if the name is already take +*/ + void registerCatalog(String catalogName, ReadableCatalog catalog) throws CatalogAlreadyExistsException; + + /** +* Get a catalog by name. +* +* @param catalogName catalog name +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + ReadableCatalog getCatalog(String catalogName) throws CatalogNotExistException; + + /** +* Get names of all registered catalog. +* +* @return a set of names of registered catalogs +*/ + Set getCatalogNames(); + + /** +* Get the current catalog. +* +* @return the current catalog +*/ + ReadableCatalog getCurrentCatalog(); + + /** +* Get name of the current database. +* +* @return name of the current database +*/ + String getCurrentDatabaseName(); Review comment: At CatalogManager level, It seems we only need get/setCurrentCatalog() because get/setCurrentDatabase() are already defined in Catalog API, specifically, in ReadableCatalog class. Thus, we might want to remove getCurrentDatabaseName() and setCurrentCatalogAndDatabase(), and only keep get/setCurrentCatalog() in CatalogManager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277790658 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java ## @@ -132,4 +134,58 @@ */ boolean tableExists(ObjectPath objectPath) throws CatalogException; + // -- partitions -- + + /** +* Gets PartitionSpec of all partitions of the table. +* +* @param tablePath Path of the table. +* @return A list of PartitionSpec of the table. +* +* @throws TableNotExistExceptionthrown if the table does not exist in the catalog. +* @throws TableNotPartitionedException thrown if the table is not partitioned. +* @throws CatalogException in case of any runtime exception +*/ + List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException; + + /** +* Gets PartitionSpec of all partitions that is under the given PartitionSpec in the table . +* +* @param tablePath Path of the table +* @param partitionSpecsThe partition spec to list +* @return A list of PartitionSpec that is under the given ParitionSpec in the table. +* +* @throws TableNotExistExceptionthrown if the table does not exist in the catalog. +* @throws TableNotPartitionedException thrown if the table is not partitioned. +* @throws CatalogException in case of any runtime exception +*/ + List listPartitions(ObjectPath tablePath, CatalogPartition.PartitionSpec partitionSpecs) Review comment: 1. partitionSpecs -> partitionSpec? 2. Do we need to listPartitions()? The first version seemingly can be replaced by the second one when partitionSpec is null or empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277804076 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { + + /** +* Get the PartitionSpec of the partition. +* +* @return the PartitionSpec of the partition. +*/ + PartitionSpec getPartitionSpec(); Review comment: PartitionSpec is more like the name of a partition. Similar to the name of a table or function, which isn't included in the class definition, I think PartitionSpec should also stand out of Partition class for consistency. Partition class should care mostly about things other than naming, such as schema, stats, properties, etc. With this change, some of the added, partition-related APIs may subject to changes as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277804076 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { + + /** +* Get the PartitionSpec of the partition. +* +* @return the PartitionSpec of the partition. +*/ + PartitionSpec getPartitionSpec(); Review comment: PartitionSpec is more like the name of a partition. Similar to the name of a table or function, which isn't included in the class definition, I think PartitionSpec should also stand out of Partition class for consistency. Partition class should care mostly about things other than naming, such as schema, stats, properties, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277803504 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java ## @@ -29,4 +32,15 @@ * @return table statistics */ TableStats getStatistics(); + + /** +* Check if the table is partitioend or not. +*/ + boolean isPartitioned(); + + /** +* Get the partition keys of the table. This will be an empty set if the table is not partitioned. +* @return partition keys of the table. +*/ Review comment: good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
bowenli86 commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277801129 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a partition object in catalog. + */ +public interface CatalogPartition { + + /** +* Get the PartitionSpec of the partition. +* +* @return the PartitionSpec of the partition. +*/ + PartitionSpec getPartitionSpec(); + + /** +* Get a map of properties associated with the partition. +* +* @return a map of properties with the partition +*/ + Map getProperties(); + + /** +* Get a deep copy of the CatalogPartition instance. +* +* @return a copy of CatalogPartition instance +*/ + CatalogPartition copy(); + + /** +* Get a brief description of the database. +* +* @return an optional short description of the database +*/ + Optional getDescription(); + + /** +* Get a detailed description of the database. +* +* @return an optional long description of the database +*/ + Optional getDetailedDescription(); + + /** +* Represents a partition spec object. +* Partition columns and values are NOT of strict order, and they need to be re-arranged to the correct order +* by comparing with a list of strictly ordered partition keys. +*/ + interface PartitionSpec { Review comment: Static vs. dynamic partition is more of a concept of how to insert/load data. Please see [here](https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions) Their logical representation should be same. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277796313 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + * This mapping is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name]. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final ReadableCatalog catalog; + + public CatalogCalciteSchema(String catalogName, ReadableCatalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Looks up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName Name of sub-schema to look up. +* @return Sub-schema with a given dbName, or null. +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException( + new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTable()"); + } + + @Override + public Set getTableNames() { + throw new UnsupportedOperationException( Review comment: ok, sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277796069 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + * This mapping is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name]. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final ReadableCatalog catalog; + + public CatalogCalciteSchema(String catalogName, ReadableCatalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Looks up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName Name of sub-schema to look up. +* @return Sub-schema with a given dbName, or null. +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException( + new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTable()"); + } + + @Override + public Set getTableNames() { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTableNames()"); + } + + @Override + public RelProtoDataType getType(String name) { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory relDataTypeFactory) { Review comment: ok, sounds good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git
[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277794804 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistException.java ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.ObjectPath; + +/** + * Exception for adding an already existed partition. Review comment: This document can be made consistent with other similar exception classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277795351 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + * This mapping is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name]. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final ReadableCatalog catalog; + + public CatalogCalciteSchema(String catalogName, ReadableCatalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Looks up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName Name of sub-schema to look up. +* @return Sub-schema with a given dbName, or null. +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException( + new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTable()"); + } + + @Override + public Set getTableNames() { + throw new UnsupportedOperationException( Review comment: I agree that we should throw exception when `getTable` method is called. I see no reason though why can't we return empty table set here. I am not sure where calcite can call this method, we do not fully control the scope of this method. I think it doesn't harm us to return an `emptySet` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog
xuefuz commented on a change in pull request #8222: [FLINK-11518] [table] Add partition related catalog APIs and implement them in GenericInMemoryCatalog URL: https://github.com/apache/flink/pull/8222#discussion_r277795191 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.table.catalog.ObjectPath; + +/** + * Exception for trying to operate partition against a non-partitioned table. Review comment: against -> on This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r277793581 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.type.SqlTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * A mapping between Flink's catalog and Calcite's schema. This enables to look up and access tables + * in SQL queries without registering tables in advance. Databases are registered as sub-schemas in the schema. + * This mapping is modeled as a strict two-level reference structure for Flink in Calcite, + * the full path of tables and views is of format [catalog_name].[db_name].[meta-object_name]. + */ +@Internal +public class CatalogCalciteSchema implements Schema { + private static final Logger LOGGER = LoggerFactory.getLogger(CatalogCalciteSchema.class); + + private final String catalogName; + private final ReadableCatalog catalog; + + public CatalogCalciteSchema(String catalogName, ReadableCatalog catalog) { + this.catalogName = catalogName; + this.catalog = catalog; + } + + /** +* Looks up a sub-schema (database) by the given sub-schema name. +* +* @param schemaName Name of sub-schema to look up. +* @return Sub-schema with a given dbName, or null. +*/ + @Override + public Schema getSubSchema(String schemaName) { + + if (catalog.databaseExists(schemaName)) { + return new DatabaseCalciteSchema(schemaName, catalog); + } else { + LOGGER.error(String.format("Schema %s does not exist in catalog %s", schemaName, catalogName)); + throw new CatalogException( + new DatabaseNotExistException(catalogName, schemaName)); + } + } + + @Override + public Set getSubSchemaNames() { + return new HashSet<>(catalog.listDatabases()); + } + + @Override + public Table getTable(String name) { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTable()"); + } + + @Override + public Set getTableNames() { + throw new UnsupportedOperationException( + "CatalogCalciteSchema does not support getTableNames()"); + } + + @Override + public RelProtoDataType getType(String name) { + return new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory relDataTypeFactory) { Review comment: I would prefer returning `null`, otherwise it is a bit misleading for me, because as far as I understand the `Schema` interface it is only for user/catalog specific types. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the