[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-20 Thread GitBox


dianfu commented on a change in pull request #11768:
URL: https://github.com/apache/flink/pull/11768#discussion_r411828146



##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management

Review comment:
   What about `Java Dependency`?

##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management
+
+If third-party Java dependencies are used, you can using following code to add 
jars for your Python job.
+
+{% highlight python %}
+# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster.
+# NOTE: Only local file urls (start with "file://") are supported.
+table_env.get_config.set_configuration("pipeline.jars", 
"file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+
+# Set jar urls in "pipeline.classpaths". The jars will be added to the 
classpath of the cluster.
+# Users should ensure the urls are accessible on both the local client and the 
cluster.
+# NOTE: The supported schemes includes: file,ftp,http,https,jar. "hdfs" is not 
supported by default.
+table_env.get_config.set_configuration("pipeline.classpaths", 
"file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+{% endhighlight %}
+
+# Python Dependency Management

Review comment:
   What about `Python Dependency`?

##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management
+
+If third-party Java dependencies are used, you can using following code to add 
jars for your Python job.

Review comment:
   Users could also specify the Java dependencies via command line 
arguments, could we add a link for that?

##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management
+
+If third-party Java dependencies are used, you can using following code to add 
jars for your Python job.
+
+{% highlight python %}
+# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster.
+# NOTE: Only local file urls (start with "file://") are supported.
+table_env.get_config.set_configuration("pipeline.jars", 
"file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+
+# Set jar urls in "pipeline.classpaths". The jars will be added to the 
classpath of the cluster.

Review comment:
   `Set jar urls in "pipeline.classpaths"` ->  `Specify a list of jar URLs 
via "pipeline.classpaths"`

##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management
+
+If third-party Java dependencies are used, you can using following code to add 
jars for your Python job.
+
+{% highlight python %}
+# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster.
+# NOTE: Only local file urls (start with "file://") are supported.

Review comment:
   `Set jar urls in "pipeline.jars".` -> `Specify a list of jar URLs via 
"pipeline.jars"`

##
File path: docs/dev/table/python/dependency_management.md
##
@@ -22,7 +22,24 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-If third-party dependencies are used, you can specify the dependencies with 
the following Python Table APIs or through command line arguments directly when submitting the 
job.
+# Java Dependency Management
+
+If third-party Java dependencies are used, you can using following code to add 
jars for your Python job.
+
+{% highlight python %}
+# Set jar urls in "pipeline.jars". The jars will be uploaded to the cluster.
+# NOTE: Only local file urls (start with "file://") 

[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409497954
 
 

 ##
 File path: flink-python/pyflink/util/utils.py
 ##
 @@ -98,3 +98,30 @@ def is_local_deployment(j_configuration):
 JDeploymentOptions = jvm.org.apache.flink.configuration.DeploymentOptions
 return j_configuration.containsKey(JDeploymentOptions.TARGET.key()) \
 and j_configuration.getString(JDeploymentOptions.TARGET.key(), None) 
== "local"
+
+
+def add_jars_to_context_class_loader(jar_urls):
+"""
+Add jars to Python gateway server for local compilation and local 
execution (i.e. minicluster).
+There are many component in Flink which won't be added to classpath by 
default. e.g. Kafka
+connector, JDBC connector, CSV format etc. This utility function can be 
used to hot load the
+jars.
+
+:param jar_urls: The list of jar urls.
+"""
+gateway = get_gateway()
+# validate and normalize
+jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]
+context_classloader = 
gateway.jvm.Thread.currentThread().getContextClassLoader()
+existed_urls = []
 
 Review comment:
   existing_urls


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409517022
 
 

 ##
 File path: tools/travis_controller.sh
 ##
 @@ -148,6 +148,8 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then
 ! -path 
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-python*.jar" \
 ! -path 
"$CACHE_FLINK_DIR/flink-connectors/flink-connector-elasticsearch-base/target/flink-*.jar"
 \
 ! -path 
"$CACHE_FLINK_DIR/flink-connectors/flink-connector-kafka-base/target/flink-*.jar"
 \
+! -path 
"$CACHE_FLINK_DIR/flink-python/target/flink-python*-tests.jar" \
 
 Review comment:
   What about generating two jars for this test purpose instead of reusing the 
existing test jars? It would be small and also could avoid potential problems.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409515839
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##
 @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self):
 
 self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+def test_set_jars(self):
+jar_urls = []
+func1_class_name = 
"org.apache.flink.table.planner.plan.stream.sql.Func1"
+func2_class_name = "org.apache.flink.python.util.TestScalarFunction"
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func1_class_name,
+
"flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar"))
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func2_class_name,
+"flink-python/target/flink-python*-tests.jar"))
+
+# test set the "pipeline.jars" multiple times
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+first_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
jar_urls[0])
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+third_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+
+self.assertEqual(first_class_loader, third_class_loader)
+
+source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", 
"b"])
+self.t_env.register_java_function("func1", func1_class_name)
+self.t_env.register_java_function("func2", func2_class_name)
+table_sink = source_sink_utils.TestAppendSink(
+["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
+self.t_env.register_table_sink("sink", table_sink)
+source.select("func1(a.cast(int), b), func2(a.cast(int), 
b)").insert_into("sink")
+self.t_env.execute("test")
+actual = source_sink_utils.results()
+expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+self.assert_equals(actual, expected)
+
+def validate_and_return_unloaded_jar_url(self, func_class_name, 
jar_filename_pattern):
 
 Review comment:
   Split it into two methods to make it more clear?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409496472
 
 

 ##
 File path: flink-python/pyflink/table/table_environment.py
 ##
 @@ -1095,6 +1096,19 @@ def _set_python_executable_for_local_executor(self):
 and is_local_deployment(j_config):
 j_config.setString(jvm.PythonOptions.PYTHON_EXECUTABLE.key(), 
sys.executable)
 
+def _write_pipeline_jars_to_j_env(self):
 
 Review comment:
   Rename to _add_pipeline_jars_to_j_env_config


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409503068
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##
 @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self):
 
 self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+def test_set_jars(self):
+jar_urls = []
+func1_class_name = 
"org.apache.flink.table.planner.plan.stream.sql.Func1"
+func2_class_name = "org.apache.flink.python.util.TestScalarFunction"
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func1_class_name,
+
"flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar"))
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func2_class_name,
+"flink-python/target/flink-python*-tests.jar"))
+
+# test set the "pipeline.jars" multiple times
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+first_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
jar_urls[0])
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+third_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
 
 Review comment:
   third_class_loader -> second_class_loader


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11768: [FLINK-16943][python] Support set the configuration option "pipeline.jars" in PyFlink.

2020-04-16 Thread GitBox
dianfu commented on a change in pull request #11768: [FLINK-16943][python] 
Support set the configuration option "pipeline.jars" in PyFlink.
URL: https://github.com/apache/flink/pull/11768#discussion_r409515490
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table_environment_api.py
 ##
 @@ -329,6 +334,54 @@ def test_table_environment_with_blink_planner(self):
 
 self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
+def test_set_jars(self):
+jar_urls = []
+func1_class_name = 
"org.apache.flink.table.planner.plan.stream.sql.Func1"
+func2_class_name = "org.apache.flink.python.util.TestScalarFunction"
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func1_class_name,
+
"flink-table/flink-table-planner-blink/target/flink-table-planner-blink*-tests.jar"))
+jar_urls.extend(self.validate_and_return_unloaded_jar_url(
+func2_class_name,
+"flink-python/target/flink-python*-tests.jar"))
+
+# test set the "pipeline.jars" multiple times
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+first_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
jar_urls[0])
+
self.t_env.get_config().get_configuration().set_string("pipeline.jars", 
";".join(jar_urls))
+third_class_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+
+self.assertEqual(first_class_loader, third_class_loader)
+
+source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", 
"b"])
+self.t_env.register_java_function("func1", func1_class_name)
+self.t_env.register_java_function("func2", func2_class_name)
+table_sink = source_sink_utils.TestAppendSink(
+["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
+self.t_env.register_table_sink("sink", table_sink)
+source.select("func1(a.cast(int), b), func2(a.cast(int), 
b)").insert_into("sink")
+self.t_env.execute("test")
+actual = source_sink_utils.results()
+expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+self.assert_equals(actual, expected)
+
+def validate_and_return_unloaded_jar_url(self, func_class_name, 
jar_filename_pattern):
+test_jars = glob.glob(os.path.join(_find_flink_source_root(), 
jar_filename_pattern))
+if not test_jars:
+self.fail("'%s' is not available. Please compile the test jars 
first."
+  % jar_filename_pattern)
+try:
+self.t_env.register_java_function("func", func_class_name)
+except Py4JJavaError:
+pass
+else:
+self.fail("The scalar function '%s' should not been loaded before 
this test case. "
 
 Review comment:
   Change the error message to `The scalar function '%s' should not be able to 
be loaded. ` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services