This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 23d63ed [FLINK-24245][python] Fix the problem caused by multiple jobs sharing the loopback mode address stored in the environment variable in PyFlink 23d63ed is described below commit 23d63edb58b91d282c6355793851bce3406d946e Author: huangxingbo <hxbks...@gmail.com> AuthorDate: Fri Sep 10 19:08:31 2021 +0800 [FLINK-24245][python] Fix the problem caused by multiple jobs sharing the loopback mode address stored in the environment variable in PyFlink This closes #17239. --- .../datastream/stream_execution_environment.py | 41 ++++++++-------- .../tests/test_stream_execution_environment.py | 22 ++++----- flink-python/pyflink/table/table_environment.py | 56 +++++++++++----------- .../pyflink/table/tests/test_dependency.py | 9 +++- flink-python/pyflink/testing/test_case_utils.py | 6 +-- 5 files changed, 68 insertions(+), 66 deletions(-) diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index 1d299da..dda51d4 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -59,6 +59,7 @@ class StreamExecutionEnvironment(object): def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()): self._j_stream_execution_environment = j_stream_execution_environment self.serializer = serializer + self._open() def get_config(self) -> ExecutionConfig: """ @@ -869,20 +870,33 @@ class StreamExecutionEnvironment(object): -> JavaObject: gateway = get_gateway() JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil + + JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment) + + gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply( + self._j_stream_execution_environment) + + JPythonConfigUtil.setPartitionCustomOperatorNumPartitions( + get_field_value(self._j_stream_execution_environment, "transformations")) + + j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations) + if job_name is not None: + j_stream_graph.setJobName(job_name) + return j_stream_graph + + def _open(self): # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster j_configuration = get_j_env_configuration(self._j_stream_execution_environment) def startup_loopback_server(): + from pyflink.common import Configuration from pyflink.fn_execution.beam.beam_worker_pool_service import \ BeamFnLoopbackWorkerPoolServicer - jvm = gateway.jvm - j_env = jvm.System.getenv() - get_field_value(j_env, "m").put( - 'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start()) + config = Configuration(j_configuration=j_configuration) + config.set_string( + "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start()) - python_worker_execution_mode = None - if hasattr(self, "_python_worker_execution_mode"): - python_worker_execution_mode = getattr(self, "_python_worker_execution_mode") + python_worker_execution_mode = os.environ.get('_python_worker_execution_mode') if python_worker_execution_mode is None: if is_local_deployment(j_configuration): @@ -898,19 +912,6 @@ class StreamExecutionEnvironment(object): "It only supports to execute the Python worker in 'loopback' mode and 'process' " "mode, unknown mode '%s' is configured" % python_worker_execution_mode) - JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment) - - gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply( - self._j_stream_execution_environment) - - JPythonConfigUtil.setPartitionCustomOperatorNumPartitions( - get_field_value(self._j_stream_execution_environment, "transformations")) - - j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations) - if job_name is not None: - j_stream_graph.setJobName(job_name) - return j_stream_graph - def is_unaligned_checkpoints_enabled(self): """ Returns whether Unaligned Checkpoints are enabled. diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 2643d2a..c1938df 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -49,16 +49,12 @@ from pyflink.util.java_utils import get_j_env_configuration class StreamExecutionEnvironmentTests(PyFlinkTestCase): def setUp(self): - self.env = self.create_new_env() + os.environ['_python_worker_execution_mode'] = "loopback" + self.env = StreamExecutionEnvironment.get_execution_environment() + os.environ['_python_worker_execution_mode'] = "process" + self.env.set_parallelism(2) self.test_sink = DataStreamTestSinkFunction() - @staticmethod - def create_new_env(execution_mode='process'): - env = StreamExecutionEnvironment.get_execution_environment() - env.set_parallelism(2) - env._execution_mode = execution_mode - return env - def test_get_config(self): execution_config = self.env.get_config() @@ -343,7 +339,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def test_add_python_file(self): import uuid - env = self.create_new_env("loopback") + env = self.env python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) os.mkdir(python_file_dir) python_file_path = os.path.join(python_file_dir, "test_dep1.py") @@ -394,7 +390,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def test_add_python_file_2(self): import uuid - env = self.create_new_env("loopback") + env = self.env python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) os.mkdir(python_file_dir) python_file_path = os.path.join(python_file_dir, "test_dep1.py") @@ -462,7 +458,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def test_set_requirements_with_cached_directory(self): import uuid tmp_dir = self.tempdir - env = self.create_new_env("loopback") + env = self.env requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4())) with open(requirements_txt_path, 'w') as f: f.write("python-package1==0.0.0") @@ -508,7 +504,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): import uuid import shutil tmp_dir = self.tempdir - env = self.create_new_env("loopback") + env = self.env archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4())) os.mkdir(archive_dir_path) with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f: @@ -535,7 +531,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): import sys python_exec = sys.executable tmp_dir = self.tempdir - env = self.create_new_env("loopback") + env = self.env python_exec_link_path = os.path.join(tmp_dir, "py_exec") os.symlink(python_exec, python_exec_link_path) env.set_python_executable(python_exec_link_path) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 1e10a0b..9562fcc 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -48,7 +48,7 @@ from pyflink.table.udf import UserDefinedFunctionWrapper, AggregateFunction, uda from pyflink.table.utils import to_expression_jarray from pyflink.util import java_utils from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \ - to_j_explain_detail_arr, to_jarray, get_field, get_field_value + to_j_explain_detail_arr, to_jarray, get_field __all__ = [ 'StreamTableEnvironment', @@ -96,6 +96,7 @@ class TableEnvironment(object): # python.executable. self._set_python_executable_for_local_executor() self._config_chaining_optimization() + self._open() @staticmethod def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment': @@ -1747,18 +1748,39 @@ class TableEnvironment(object): self._add_jars_to_j_env_config(jars_key) self._add_jars_to_j_env_config(classpaths_key) + def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper: + if isinstance(function, AggregateFunction): + function = udaf(function, + result_type=function.get_result_type(), + accumulator_type=function.get_accumulator_type(), + name=str(function.__class__.__name__)) + elif isinstance(function, TableAggregateFunction): + function = udtaf(function, + result_type=function.get_result_type(), + accumulator_type=function.get_accumulator_type(), + name=str(function.__class__.__name__)) + return function + + def _config_chaining_optimization(self): + JChainingOptimizingExecutor = get_gateway().jvm.org.apache.flink.table.executor.python.\ + ChainingOptimizingExecutor + exec_env_field = get_field(self._j_tenv.getClass(), "execEnv") + exec_env_field.set(self._j_tenv, + JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv))) + + def _open(self): # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster def startup_loopback_server(): + from pyflink.common import Configuration from pyflink.fn_execution.beam.beam_worker_pool_service import \ BeamFnLoopbackWorkerPoolServicer - j_env = jvm.System.getenv() - get_field_value(j_env, "m").put( - 'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start()) + j_configuration = get_j_env_configuration(self._get_j_env()) + config = Configuration(j_configuration=j_configuration) + config.set_string( + "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start()) - python_worker_execution_mode = None - if hasattr(self, "_python_worker_execution_mode"): - python_worker_execution_mode = getattr(self, "_python_worker_execution_mode") + python_worker_execution_mode = os.environ.get('_python_worker_execution_mode') if python_worker_execution_mode is None: if is_local_deployment(get_j_env_configuration(self._get_j_env())): @@ -1774,26 +1796,6 @@ class TableEnvironment(object): "It only supports to execute the Python worker in 'loopback' mode and 'process' " "mode, unknown mode '%s' is configured" % python_worker_execution_mode) - def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper: - if isinstance(function, AggregateFunction): - function = udaf(function, - result_type=function.get_result_type(), - accumulator_type=function.get_accumulator_type(), - name=str(function.__class__.__name__)) - elif isinstance(function, TableAggregateFunction): - function = udtaf(function, - result_type=function.get_result_type(), - accumulator_type=function.get_accumulator_type(), - name=str(function.__class__.__name__)) - return function - - def _config_chaining_optimization(self): - JChainingOptimizingExecutor = get_gateway().jvm.org.apache.flink.table.executor.python.\ - ChainingOptimizingExecutor - exec_env_field = get_field(self._j_tenv.getClass(), "execEnv") - exec_env_field.set(self._j_tenv, - JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv))) - class StreamTableEnvironment(TableEnvironment): diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py index 5a8e7ab..8eeaee4 100644 --- a/flink-python/pyflink/table/tests/test_dependency.py +++ b/flink-python/pyflink/table/tests/test_dependency.py @@ -74,8 +74,13 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase): def setUp(self): super(StreamDependencyTests, self).setUp() - self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) - self.st_env._execution_mode = "loopback" + origin_execution_mode = os.environ['_python_worker_execution_mode'] + os.environ['_python_worker_execution_mode'] = "loopback" + try: + self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) + finally: + if origin_execution_mode is not None: + os.environ['_python_worker_execution_mode'] = origin_execution_mode def test_set_requirements_without_cached_directory(self): requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4())) diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py index 0b36750..3849d0c 100644 --- a/flink-python/pyflink/testing/test_case_utils.py +++ b/flink-python/pyflink/testing/test_case_utils.py @@ -83,6 +83,7 @@ class PyFlinkTestCase(unittest.TestCase): cls.tempdir = tempfile.mkdtemp() os.environ["FLINK_TESTING"] = "1" + os.environ['_python_worker_execution_mode'] = "process" _find_flink_home() logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"]) @@ -90,6 +91,7 @@ class PyFlinkTestCase(unittest.TestCase): @classmethod def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) + del os.environ['_python_worker_execution_mode'] @classmethod def assert_equals(cls, actual, expected): @@ -136,7 +138,6 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase): self.t_env.get_config().get_configuration().set_string("parallelism.default", "2") self.t_env.get_config().get_configuration().set_string( "python.fn-execution.bundle.size", "1") - self.t_env._execution_mode = "process" class PyFlinkBatchTableTestCase(PyFlinkTestCase): @@ -150,7 +151,6 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase): self.t_env.get_config().get_configuration().set_string("parallelism.default", "2") self.t_env.get_config().get_configuration().set_string( "python.fn-execution.bundle.size", "1") - self.t_env._execution_mode = "process" class PyFlinkStreamingTestCase(PyFlinkTestCase): @@ -163,7 +163,6 @@ class PyFlinkStreamingTestCase(PyFlinkTestCase): self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING) - self.env._execution_mode = "process" class PyFlinkBatchTestCase(PyFlinkTestCase): @@ -176,7 +175,6 @@ class PyFlinkBatchTestCase(PyFlinkTestCase): self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) self.env.set_runtime_mode(RuntimeExecutionMode.BATCH) - self.env._execution_mode = "process" class PythonAPICompletenessTestCase(object):