This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 23d63ed  [FLINK-24245][python] Fix the problem caused by multiple jobs 
sharing the loopback mode address stored in the environment variable in PyFlink
23d63ed is described below

commit 23d63edb58b91d282c6355793851bce3406d946e
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Fri Sep 10 19:08:31 2021 +0800

    [FLINK-24245][python] Fix the problem caused by multiple jobs sharing the 
loopback mode address stored in the environment variable in PyFlink
    
    This closes #17239.
---
 .../datastream/stream_execution_environment.py     | 41 ++++++++--------
 .../tests/test_stream_execution_environment.py     | 22 ++++-----
 flink-python/pyflink/table/table_environment.py    | 56 +++++++++++-----------
 .../pyflink/table/tests/test_dependency.py         |  9 +++-
 flink-python/pyflink/testing/test_case_utils.py    |  6 +--
 5 files changed, 68 insertions(+), 66 deletions(-)

diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 1d299da..dda51d4 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -59,6 +59,7 @@ class StreamExecutionEnvironment(object):
     def __init__(self, j_stream_execution_environment, 
serializer=PickleSerializer()):
         self._j_stream_execution_environment = j_stream_execution_environment
         self.serializer = serializer
+        self._open()
 
     def get_config(self) -> ExecutionConfig:
         """
@@ -869,20 +870,33 @@ class StreamExecutionEnvironment(object):
             -> JavaObject:
         gateway = get_gateway()
         JPythonConfigUtil = 
gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
+
+        
JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
+
+        
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
+            self._j_stream_execution_environment)
+
+        JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
+            get_field_value(self._j_stream_execution_environment, 
"transformations"))
+
+        j_stream_graph = 
self._j_stream_execution_environment.getStreamGraph(clear_transformations)
+        if job_name is not None:
+            j_stream_graph.setJobName(job_name)
+        return j_stream_graph
+
+    def _open(self):
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
         j_configuration = 
get_j_env_configuration(self._j_stream_execution_environment)
 
         def startup_loopback_server():
+            from pyflink.common import Configuration
             from pyflink.fn_execution.beam.beam_worker_pool_service import \
                 BeamFnLoopbackWorkerPoolServicer
-            jvm = gateway.jvm
-            j_env = jvm.System.getenv()
-            get_field_value(j_env, "m").put(
-                'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
+            config = Configuration(j_configuration=j_configuration)
+            config.set_string(
+                "PYFLINK_LOOPBACK_SERVER_ADDRESS", 
BeamFnLoopbackWorkerPoolServicer().start())
 
-        python_worker_execution_mode = None
-        if hasattr(self, "_python_worker_execution_mode"):
-            python_worker_execution_mode = getattr(self, 
"_python_worker_execution_mode")
+        python_worker_execution_mode = 
os.environ.get('_python_worker_execution_mode')
 
         if python_worker_execution_mode is None:
             if is_local_deployment(j_configuration):
@@ -898,19 +912,6 @@ class StreamExecutionEnvironment(object):
                 "It only supports to execute the Python worker in 'loopback' 
mode and 'process' "
                 "mode, unknown mode '%s' is configured" % 
python_worker_execution_mode)
 
-        
JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
-
-        
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
-            self._j_stream_execution_environment)
-
-        JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
-            get_field_value(self._j_stream_execution_environment, 
"transformations"))
-
-        j_stream_graph = 
self._j_stream_execution_environment.getStreamGraph(clear_transformations)
-        if job_name is not None:
-            j_stream_graph.setJobName(job_name)
-        return j_stream_graph
-
     def is_unaligned_checkpoints_enabled(self):
         """
         Returns whether Unaligned Checkpoints are enabled.
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 2643d2a..c1938df 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -49,16 +49,12 @@ from pyflink.util.java_utils import get_j_env_configuration
 class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def setUp(self):
-        self.env = self.create_new_env()
+        os.environ['_python_worker_execution_mode'] = "loopback"
+        self.env = StreamExecutionEnvironment.get_execution_environment()
+        os.environ['_python_worker_execution_mode'] = "process"
+        self.env.set_parallelism(2)
         self.test_sink = DataStreamTestSinkFunction()
 
-    @staticmethod
-    def create_new_env(execution_mode='process'):
-        env = StreamExecutionEnvironment.get_execution_environment()
-        env.set_parallelism(2)
-        env._execution_mode = execution_mode
-        return env
-
     def test_get_config(self):
         execution_config = self.env.get_config()
 
@@ -343,7 +339,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def test_add_python_file(self):
         import uuid
-        env = self.create_new_env("loopback")
+        env = self.env
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + 
str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -394,7 +390,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def test_add_python_file_2(self):
         import uuid
-        env = self.create_new_env("loopback")
+        env = self.env
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + 
str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -462,7 +458,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
     def test_set_requirements_with_cached_directory(self):
         import uuid
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + 
str(uuid.uuid4()))
         with open(requirements_txt_path, 'w') as f:
             f.write("python-package1==0.0.0")
@@ -508,7 +504,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         import uuid
         import shutil
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         archive_dir_path = os.path.join(tmp_dir, "archive_" + 
str(uuid.uuid4()))
         os.mkdir(archive_dir_path)
         with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
@@ -535,7 +531,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         import sys
         python_exec = sys.executable
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         python_exec_link_path = os.path.join(tmp_dir, "py_exec")
         os.symlink(python_exec, python_exec_link_path)
         env.set_python_executable(python_exec_link_path)
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index 1e10a0b..9562fcc 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -48,7 +48,7 @@ from pyflink.table.udf import UserDefinedFunctionWrapper, 
AggregateFunction, uda
 from pyflink.table.utils import to_expression_jarray
 from pyflink.util import java_utils
 from pyflink.util.java_utils import get_j_env_configuration, 
is_local_deployment, load_java_class, \
-    to_j_explain_detail_arr, to_jarray, get_field, get_field_value
+    to_j_explain_detail_arr, to_jarray, get_field
 
 __all__ = [
     'StreamTableEnvironment',
@@ -96,6 +96,7 @@ class TableEnvironment(object):
         # python.executable.
         self._set_python_executable_for_local_executor()
         self._config_chaining_optimization()
+        self._open()
 
     @staticmethod
     def create(environment_settings: EnvironmentSettings) -> 
'TableEnvironment':
@@ -1747,18 +1748,39 @@ class TableEnvironment(object):
         self._add_jars_to_j_env_config(jars_key)
         self._add_jars_to_j_env_config(classpaths_key)
 
+    def _wrap_aggregate_function_if_needed(self, function) -> 
UserDefinedFunctionWrapper:
+        if isinstance(function, AggregateFunction):
+            function = udaf(function,
+                            result_type=function.get_result_type(),
+                            accumulator_type=function.get_accumulator_type(),
+                            name=str(function.__class__.__name__))
+        elif isinstance(function, TableAggregateFunction):
+            function = udtaf(function,
+                             result_type=function.get_result_type(),
+                             accumulator_type=function.get_accumulator_type(),
+                             name=str(function.__class__.__name__))
+        return function
+
+    def _config_chaining_optimization(self):
+        JChainingOptimizingExecutor = 
get_gateway().jvm.org.apache.flink.table.executor.python.\
+            ChainingOptimizingExecutor
+        exec_env_field = get_field(self._j_tenv.getClass(), "execEnv")
+        exec_env_field.set(self._j_tenv,
+                           
JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv)))
+
+    def _open(self):
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
         def startup_loopback_server():
+            from pyflink.common import Configuration
             from pyflink.fn_execution.beam.beam_worker_pool_service import \
                 BeamFnLoopbackWorkerPoolServicer
 
-            j_env = jvm.System.getenv()
-            get_field_value(j_env, "m").put(
-                'PYFLINK_LOOPBACK_SERVER_ADDRESS', 
BeamFnLoopbackWorkerPoolServicer().start())
+            j_configuration = get_j_env_configuration(self._get_j_env())
+            config = Configuration(j_configuration=j_configuration)
+            config.set_string(
+                "PYFLINK_LOOPBACK_SERVER_ADDRESS", 
BeamFnLoopbackWorkerPoolServicer().start())
 
-        python_worker_execution_mode = None
-        if hasattr(self, "_python_worker_execution_mode"):
-            python_worker_execution_mode = getattr(self, 
"_python_worker_execution_mode")
+        python_worker_execution_mode = 
os.environ.get('_python_worker_execution_mode')
 
         if python_worker_execution_mode is None:
             if is_local_deployment(get_j_env_configuration(self._get_j_env())):
@@ -1774,26 +1796,6 @@ class TableEnvironment(object):
                 "It only supports to execute the Python worker in 'loopback' 
mode and 'process' "
                 "mode, unknown mode '%s' is configured" % 
python_worker_execution_mode)
 
-    def _wrap_aggregate_function_if_needed(self, function) -> 
UserDefinedFunctionWrapper:
-        if isinstance(function, AggregateFunction):
-            function = udaf(function,
-                            result_type=function.get_result_type(),
-                            accumulator_type=function.get_accumulator_type(),
-                            name=str(function.__class__.__name__))
-        elif isinstance(function, TableAggregateFunction):
-            function = udtaf(function,
-                             result_type=function.get_result_type(),
-                             accumulator_type=function.get_accumulator_type(),
-                             name=str(function.__class__.__name__))
-        return function
-
-    def _config_chaining_optimization(self):
-        JChainingOptimizingExecutor = 
get_gateway().jvm.org.apache.flink.table.executor.python.\
-            ChainingOptimizingExecutor
-        exec_env_field = get_field(self._j_tenv.getClass(), "execEnv")
-        exec_env_field.set(self._j_tenv,
-                           
JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv)))
-
 
 class StreamTableEnvironment(TableEnvironment):
 
diff --git a/flink-python/pyflink/table/tests/test_dependency.py 
b/flink-python/pyflink/table/tests/test_dependency.py
index 5a8e7ab..8eeaee4 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -74,8 +74,13 @@ class StreamDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
 
     def setUp(self):
         super(StreamDependencyTests, self).setUp()
-        self.st_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
-        self.st_env._execution_mode = "loopback"
+        origin_execution_mode = os.environ['_python_worker_execution_mode']
+        os.environ['_python_worker_execution_mode'] = "loopback"
+        try:
+            self.st_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+        finally:
+            if origin_execution_mode is not None:
+                os.environ['_python_worker_execution_mode'] = 
origin_execution_mode
 
     def test_set_requirements_without_cached_directory(self):
         requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
diff --git a/flink-python/pyflink/testing/test_case_utils.py 
b/flink-python/pyflink/testing/test_case_utils.py
index 0b36750..3849d0c 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -83,6 +83,7 @@ class PyFlinkTestCase(unittest.TestCase):
         cls.tempdir = tempfile.mkdtemp()
 
         os.environ["FLINK_TESTING"] = "1"
+        os.environ['_python_worker_execution_mode'] = "process"
         _find_flink_home()
 
         logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])
@@ -90,6 +91,7 @@ class PyFlinkTestCase(unittest.TestCase):
     @classmethod
     def tearDownClass(cls):
         shutil.rmtree(cls.tempdir, ignore_errors=True)
+        del os.environ['_python_worker_execution_mode']
 
     @classmethod
     def assert_equals(cls, actual, expected):
@@ -136,7 +138,6 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase):
         
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._execution_mode = "process"
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -150,7 +151,6 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase):
         
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._execution_mode = "process"
 
 
 class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -163,7 +163,6 @@ class PyFlinkStreamingTestCase(PyFlinkTestCase):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-        self.env._execution_mode = "process"
 
 
 class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -176,7 +175,6 @@ class PyFlinkBatchTestCase(PyFlinkTestCase):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
-        self.env._execution_mode = "process"
 
 
 class PythonAPICompletenessTestCase(object):

Reply via email to