[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: URL: https://github.com/apache/flink/pull/11768#discussion_r411828146 ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management Review comment: What about `Java Dependency`? ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management + +If third-party Java dependencies are used, you can using following code to add jars for your Python job. + +{% highlight python %} +# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster. +# NOTE: Only local file urls (start with "file://") are supported. +table_env.get_config.set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") + +# Set jar urls in "pipeline.classpaths". The jars will be added to the classpath of the cluster. +# Users should ensure the urls are accessible on both the local client and the cluster. +# NOTE: The supported schemes includes: file,ftp,http,https,jar. "hdfs" is not supported by default. +table_env.get_config.set_configuration("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") +{% endhighlight %} + +# Python Dependency Management Review comment: What about `Python Dependency`? ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management + +If third-party Java dependencies are used, you can using following code to add jars for your Python job. Review comment: Users could also specify the Java dependencies via command line arguments, could we add a link for that? ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management + +If third-party Java dependencies are used, you can using following code to add jars for your Python job. + +{% highlight python %} +# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster. +# NOTE: Only local file urls (start with "file://") are supported. +table_env.get_config.set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") + +# Set jar urls in "pipeline.classpaths". The jars will be added to the classpath of the cluster. Review comment: `Set jar urls in "pipeline.classpaths"` -> `Specify a list of jar URLs via "pipeline.classpaths"` ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management + +If third-party Java dependencies are used, you can using following code to add jars for your Python job. + +{% highlight python %} +# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster. +# NOTE: Only local file urls (start with "file://") are supported. Review comment: `Set jar urls in "pipeline.jars".` -> `Specify a list of jar URLs via "pipeline.jars"` ## File path: docs/dev/table/python/dependency_management.md ## @@ -22,7 +22,24 @@ specific language governing permissions and limitations under the License. --> -If third-party dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job. +# Java Dependency Management + +If third-party Java dependencies are used, you can using following code to add jars for your Python job. + +{% highlight python %} +# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster. +# NOTE: Only local file urls (start with "file://") ar
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409497954 ## File path: flink-python/pyflink/util/utils.py ## @@ -98,3 +98,30 @@ def is_local_deployment(j_configuration): JDeploymentOptions = jvm.org.apache.flink.configuration.DeploymentOptions return j_configuration.containsKey(JDeploymentOptions.TARGET.key()) \ and j_configuration.getString(JDeploymentOptions.TARGET.key(), None) == "local" + + +def add_jars_to_context_class_loader(jar_urls): +""" +Add jars to Python gateway server for local compilation and local execution (i.e. minicluster). +There are many component in Flink which won't be added to classpath by default. e.g. Kafka +connector, JDBC connector, CSV format etc. This utility function can be used to hot load the +jars. + +:param jar_urls: The list of jar urls. +""" +gateway = get_gateway() +# validate and normalize +jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls] +context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader() +existed_urls = [] Review comment: existing_urls This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409517022 ## File path: tools/travis_controller.sh ## @@ -148,6 +148,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then ! -path "$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-python*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar" \ ! -path "$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar" \ +! -path "$CACHE_FLINK_DIR/flink-python/target/flink-python*-tests.jar" \ Review comment: What about generating two jars for this test purpose instead of reusing the existing test jars? It would be small and also could avoid potential problems. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409496472 ## File path: flink-python/pyflink/table/table_environment.py ## @@ -1095,6 +1096,19 @@ def _set_python_executable_for_local_executor(self): and is_local_deployment(j_config): j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), sys.executable) +def _write_pipeline_jars_to_j_env(self): Review comment: Rename to _add_pipeline_jars_to_j_env_config This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409503068 ## File path: flink-python/pyflink/table/tests/test_table_environment_api.py ## @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self): self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) +def test_set_jars(self): +jar_urls = [] +func1_class_name = "org.apache.flink.table.planner.plan.stream.sql.Func1" +func2_class_name = "org.apache.flink.python.util.TestScalarFunction" +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func1_class_name, + "flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar")) +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func2_class_name, +"flink-python/target/flink-python*-tests.jar")) + +# test set the "pipeline.jars" multiple times + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() + + self.t_env.get_config().get_configuration().set_string("pipeline.jars", jar_urls[0]) + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +third_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() Review comment: third_class_loader -> second_class_loader This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409515839 ## File path: flink-python/pyflink/table/tests/test_table_environment_api.py ## @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self): self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) +def test_set_jars(self): +jar_urls = [] +func1_class_name = "org.apache.flink.table.planner.plan.stream.sql.Func1" +func2_class_name = "org.apache.flink.python.util.TestScalarFunction" +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func1_class_name, + "flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar")) +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func2_class_name, +"flink-python/target/flink-python*-tests.jar")) + +# test set the "pipeline.jars" multiple times + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() + + self.t_env.get_config().get_configuration().set_string("pipeline.jars", jar_urls[0]) + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +third_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() + +self.assertEqual(first_class_loader, third_class_loader) + +source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"]) +self.t_env.register_java_function("func1", func1_class_name) +self.t_env.register_java_function("func2", func2_class_name) +table_sink = source_sink_utils.TestAppendSink( +["a", "b"], [DataTypes.STRING(), DataTypes.STRING()]) +self.t_env.register_table_sink("sink", table_sink) +source.select("func1(a.cast(int), b), func2(a.cast(int), b)").insert_into("sink") +self.t_env.execute("test") +actual = source_sink_utils.results() +expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello'] +self.assert_equals(actual, expected) + +def validate_and_return_unloaded_jar_url(self, func_class_name, jar_filename_pattern): Review comment: Split it into two methods to make it more clear? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.
dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink. URL: https://github.com/apache/flink/pull/11768#discussion_r409515490 ## File path: flink-python/pyflink/table/tests/test_table_environment_api.py ## @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self): self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) +def test_set_jars(self): +jar_urls = [] +func1_class_name = "org.apache.flink.table.planner.plan.stream.sql.Func1" +func2_class_name = "org.apache.flink.python.util.TestScalarFunction" +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func1_class_name, + "flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar")) +jar_urls.extend(self.validate_and_return_unloaded_jar_url( +func2_class_name, +"flink-python/target/flink-python*-tests.jar")) + +# test set the "pipeline.jars" multiple times + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +first_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() + + self.t_env.get_config().get_configuration().set_string("pipeline.jars", jar_urls[0]) + self.t_env.get_config().get_configuration().set_string("pipeline.jars", ";".join(jar_urls)) +third_class_loader = get_gateway().jvm.Thread.currentThread().getContextClassLoader() + +self.assertEqual(first_class_loader, third_class_loader) + +source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"]) +self.t_env.register_java_function("func1", func1_class_name) +self.t_env.register_java_function("func2", func2_class_name) +table_sink = source_sink_utils.TestAppendSink( +["a", "b"], [DataTypes.STRING(), DataTypes.STRING()]) +self.t_env.register_table_sink("sink", table_sink) +source.select("func1(a.cast(int), b), func2(a.cast(int), b)").insert_into("sink") +self.t_env.execute("test") +actual = source_sink_utils.results() +expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello'] +self.assert_equals(actual, expected) + +def validate_and_return_unloaded_jar_url(self, func_class_name, jar_filename_pattern): +test_jars = glob.glob(os.path.join(_find_flink_source_root(), jar_filename_pattern)) +if not test_jars: +self.fail("'%s' is not available. Please compile the test jars first." + % jar_filename_pattern) +try: +self.t_env.register_java_function("func", func_class_name) +except Py4JJavaError: +pass +else: +self.fail("The scalar function '%s' should not been loaded before this test case. " Review comment: Change the error message to `The scalar function '%s' should not be able to be loaded. ` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services