This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a68698264ce1ebc37921ad6517eccd5c3700aeeb Author: huangxingbo <hxbks...@gmail.com> AuthorDate: Fri Aug 6 11:32:26 2021 +0800 [FLINK-21222][python] Support loopback mode to allow Python UDF worker and client reuse the same Python VM This closes #16732. --- docs/content.zh/docs/dev/python/debugging.md | 9 ++ docs/content/docs/dev/python/debugging.md | 9 ++ .../datastream/stream_execution_environment.py | 24 +++- .../tests/test_stream_execution_environment.py | 6 + .../pyflink/fn_execution/beam/beam_boot.py | 65 +++++---- .../fn_execution/beam/beam_worker_pool_service.py | 150 +++++++++++++++++++++ flink-python/pyflink/table/table_environment.py | 21 +++ .../pyflink/table/tests/test_dependency.py | 12 +- flink-python/pyflink/testing/test_case_utils.py | 4 + .../apache/flink/python/util/PythonConfigUtil.java | 11 +- .../AbstractOneInputPythonFunctionOperator.java | 10 -- .../python/AbstractPythonFunctionOperator.java | 44 +++--- .../AbstractTwoInputPythonFunctionOperator.java | 10 -- .../operators/python/PythonCoProcessOperator.java | 4 +- .../python/PythonKeyedCoProcessOperator.java | 4 +- .../python/PythonKeyedProcessOperator.java | 4 +- .../operators/python/PythonProcessOperator.java | 4 +- .../python/AbstractStatelessFunctionOperator.java | 14 -- .../AbstractPythonStreamAggregateOperator.java | 18 +-- ...stractArrowPythonAggregateFunctionOperator.java | 4 +- ...wPythonOverWindowAggregateFunctionOperator.java | 4 +- .../AbstractPythonScalarFunctionOperator.java | 4 +- .../arrow/ArrowPythonScalarFunctionOperator.java | 2 +- .../python/table/PythonTableFunctionOperator.java | 4 +- 24 files changed, 321 insertions(+), 120 deletions(-) diff --git a/docs/content.zh/docs/dev/python/debugging.md b/docs/content.zh/docs/dev/python/debugging.md index 2692a25..2f15eb7 100644 --- a/docs/content.zh/docs/dev/python/debugging.md +++ b/docs/content.zh/docs/dev/python/debugging.md @@ -51,6 +51,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## 调试Python UDFs + +### 本地调试 + +你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。 + +**注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,只能够使用[远程调试](#远程调试)的方式。 + +### 远程调试 + 你可以利用PyCharm提供的[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/)工具进行Python UDF的调试 1. 在PyCharm里创建一个Python Remote Debug diff --git a/docs/content/docs/dev/python/debugging.md b/docs/content/docs/dev/python/debugging.md index 2bbc263..3ea602a 100644 --- a/docs/content/docs/dev/python/debugging.md +++ b/docs/content/docs/dev/python/debugging.md @@ -52,6 +52,15 @@ $ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyfl ``` ## Debugging Python UDFs + +### Local Debug + +You can debug your python functions directly in IDEs such as PyCharm. + +**Note:** Currently, if you use `python-archives` in the job and the parallelism of the job is greater than `1`, you can only use [remote debug](#remote-debug) mode. + +### Remote Debug + You can make use of the [`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/) tool of PyCharm to debug Python UDFs. 1. Create a Python Remote Debug in PyCharm diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index a7ba4c1..3da36ff 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -40,7 +40,7 @@ from pyflink.datastream.time_characteristic import TimeCharacteristic from pyflink.java_gateway import get_gateway from pyflink.serializers import PickleSerializer from pyflink.util.java_utils import load_java_class, add_jars_to_context_class_loader, \ - invoke_method, get_field_value + invoke_method, get_field_value, is_local_deployment, get_j_env_configuration __all__ = ['StreamExecutionEnvironment'] @@ -58,6 +58,7 @@ class StreamExecutionEnvironment(object): def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()): self._j_stream_execution_environment = j_stream_execution_environment + self._remote_mode = False self.serializer = serializer def get_config(self) -> ExecutionConfig: @@ -869,6 +870,27 @@ class StreamExecutionEnvironment(object): -> JavaObject: gateway = get_gateway() JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil + # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster + j_configuration = get_j_env_configuration(self._j_stream_execution_environment) + if not self._remote_mode and is_local_deployment(j_configuration): + from pyflink.common import Configuration + from pyflink.fn_execution.beam.beam_worker_pool_service import \ + BeamFnLoopbackWorkerPoolServicer + + jvm = gateway.jvm + env_config = JPythonConfigUtil.getEnvironmentConfig( + self._j_stream_execution_environment) + parallelism = self.get_parallelism() + if parallelism > 1 and env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()): + import logging + logging.warning("Lookback mode is disabled as python archives are used and the " + "parallelism of the job is greater than 1. The Python user-defined " + "functions will be executed in an independent Python process.") + else: + config = Configuration(j_configuration=j_configuration) + config.set_string( + "loopback.server.address", BeamFnLoopbackWorkerPoolServicer().start()) + JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment) gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply( diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index b026526..e3544f8 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -51,6 +51,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def setUp(self): self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) + self.env._remote_mode = True self.test_sink = DataStreamTestSinkFunction() def test_get_config(self): @@ -350,6 +351,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): get_j_env_configuration(self.env._j_stream_execution_environment).\ setString("taskmanager.numberOfTaskSlots", "10") self.env.add_python_file(python_file_path) + self.env._remote_mode = False ds = self.env.from_collection([1, 2, 3, 4, 5]) ds = ds.map(plus_two_map, Types.LONG()) \ .slot_sharing_group("data_stream") \ @@ -399,6 +401,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): get_j_env_configuration(self.env._j_stream_execution_environment).\ setString("taskmanager.numberOfTaskSlots", "10") + self.env._remote_mode = False self.env.add_python_file(python_file_path) ds = self.env.from_collection([1, 2, 3, 4, 5]) ds = ds.map(plus_two_map, Types.LONG()) \ @@ -488,6 +491,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): from python_package1 import plus return plus(i, 1) + self.env._remote_mode = False ds = self.env.from_collection([1, 2, 3, 4, 5]) ds.map(add_one).add_sink(self.test_sink) self.env.execute("test set requirements with cachd dir") @@ -513,6 +517,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): with open("data/data.txt", 'r') as f: return i + int(f.read()) + self.env._remote_mode = False ds = self.env.from_collection([1, 2, 3, 4, 5]) ds.map(add_from_file).add_sink(self.test_sink) self.env.execute("test set python archive") @@ -536,6 +541,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): assert os.environ["python"] == python_exec_link_path return i + self.env._remote_mode = False ds = self.env.from_collection([1, 2, 3, 4, 5]) ds.map(check_python_exec).add_sink(self.test_sink) self.env.execute("test set python executable") diff --git a/flink-python/pyflink/fn_execution/beam/beam_boot.py b/flink-python/pyflink/fn_execution/beam/beam_boot.py index b0c42dc..5887434 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_boot.py +++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py @@ -35,6 +35,8 @@ import grpc import logging import sys +from apache_beam.portability.api.beam_fn_api_pb2_grpc import BeamFnExternalWorkerPoolStub +from apache_beam.portability.api.beam_fn_api_pb2 import StartWorkerRequest from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor @@ -74,28 +76,41 @@ if __name__ == "__main__": logging.info("Initializing python harness: %s" % " ".join(sys.argv)) - metadata = [("worker_id", worker_id)] - - # read job information from provision stub - with grpc.insecure_channel(provision_endpoint) as channel: - client = ProvisionServiceStub(channel=channel) - info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info - options = json_format.MessageToJson(info.pipeline_options) - logging_endpoint = info.logging_endpoint.url - control_endpoint = info.control_endpoint.url - - os.environ["WORKER_ID"] = worker_id - os.environ["PIPELINE_OPTIONS"] = options - os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir - os.environ["LOGGING_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( - ApiServiceDescriptor(url=logging_endpoint)) - os.environ["CONTROL_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( - ApiServiceDescriptor(url=control_endpoint)) - - env = dict(os.environ) - - if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1": - exit(0) - - call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"], - stdout=sys.stdout, stderr=sys.stderr, env=env) + if 'loopback.server.address' in os.environ: + params = dict(os.environ) + params.update({'SEMI_PERSISTENT_DIRECTORY': semi_persist_dir}) + with grpc.insecure_channel(os.environ['loopback.server.address']) as channel: + client = BeamFnExternalWorkerPoolStub(channel=channel) + request = StartWorkerRequest( + worker_id=worker_id, + provision_endpoint=ApiServiceDescriptor(url=provision_endpoint), + params=params) + client.StartWorker(request) + else: + metadata = [("worker_id", worker_id)] + + # read job information from provision stub + with grpc.insecure_channel(provision_endpoint) as channel: + client = ProvisionServiceStub(channel=channel) + info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info + options = json_format.MessageToJson(info.pipeline_options) + logging_endpoint = info.logging_endpoint.url + control_endpoint = info.control_endpoint.url + + os.environ["WORKER_ID"] = worker_id + os.environ["PIPELINE_OPTIONS"] = options + os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir + os.environ["LOGGING_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( + ApiServiceDescriptor(url=logging_endpoint)) + os.environ["CONTROL_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString( + ApiServiceDescriptor(url=control_endpoint)) + + env = dict(os.environ) + with open('/tmp/test.txt', 'a') as fd: + fd.write(str(env) + '\n') + + if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1": + exit(0) + + call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"], + stdout=sys.stdout, stderr=sys.stderr, env=env) diff --git a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py new file mode 100644 index 0000000..a844ff1 --- /dev/null +++ b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py @@ -0,0 +1,150 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import atexit +import functools +import logging +import os +import sys +import threading +import traceback + +import grpc +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import ProfilingOptions +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.portability.api import endpoints_pb2 +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.runners.worker import sdk_worker_main +from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler +from apache_beam.runners.worker.sdk_worker import SdkHarness +from apache_beam.utils import thread_pool_executor, profiler +from google.protobuf import json_format + +from pyflink.fn_execution.beam import beam_sdk_worker_main # noqa # pylint: disable=unused-import + +_LOGGER = logging.getLogger(__name__) + + +class BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + """ + Worker pool entry point. + + The worker pool exposes an RPC service that is used in MiniCluster to start and stop the Python + SDK workers. + + The worker pool uses child thread for parallelism + """ + + def __init__(self): + self._worker_server = None + self._parse_param_lock = threading.Lock() + + def start(self): + worker_server = grpc.server( + thread_pool_executor.shared_unbounded_instance()) + worker_address = 'localhost:%s' % worker_server.add_insecure_port('[::]:0') + beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(self, worker_server) + worker_server.start() + _LOGGER.info('Listening for workers at %s', worker_address) + + self._worker_server = worker_server + atexit.register(functools.partial(worker_server.stop, 1)) + return worker_address + + def StartWorker(self, + start_worker_request: beam_fn_api_pb2.StartWorkerRequest, + unused_context): + try: + worker_thread = threading.Thread( + name='run_worker_%s' % start_worker_request.worker_id, + target=functools.partial(self._start_sdk_worker_main, start_worker_request)) + worker_thread.daemon = True + worker_thread.start() + return beam_fn_api_pb2.StartWorkerResponse() + except Exception: + return beam_fn_api_pb2.StartWorkerResponse(error=traceback.format_exc()) + + def StopWorker(self, + stop_worker_request: beam_fn_api_pb2.StopWorkerRequest, + unused_context): + pass + + def _start_sdk_worker_main(self, start_worker_request: beam_fn_api_pb2.StartWorkerRequest): + params = start_worker_request.params + self._parse_param_lock.acquire() + if 'PYTHONPATH' in params: + python_path_list = params['PYTHONPATH'].split(':') + python_path_list.reverse() + for path in python_path_list: + sys.path.insert(0, path) + if '_PYTHON_WORKING_DIR' in params: + os.chdir(params['_PYTHON_WORKING_DIR']) + os.environ.update(params) + self._parse_param_lock.release() + + # read job information from provision stub + metadata = [("worker_id", start_worker_request.worker_id)] + provision_endpoint = start_worker_request.provision_endpoint.url + with grpc.insecure_channel(provision_endpoint) as channel: + client = ProvisionServiceStub(channel=channel) + info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info + options = json_format.MessageToJson(info.pipeline_options) + logging_endpoint = info.logging_endpoint.url + control_endpoint = info.control_endpoint.url + + try: + logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor(url=logging_endpoint) + + # Send all logs to the runner. + fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor) + logging.getLogger().setLevel(logging.ERROR) + logging.getLogger().addHandler(fn_log_handler) + except Exception: + _LOGGER.error( + "Failed to set up logging handler, continuing without.", + exc_info=True) + fn_log_handler = None + + sdk_pipeline_options = sdk_worker_main._parse_pipeline_options(options) + + _worker_id = start_worker_request.worker_id + + try: + control_service_descriptor = endpoints_pb2.ApiServiceDescriptor(url=control_endpoint) + status_service_descriptor = endpoints_pb2.ApiServiceDescriptor() + + experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or [] + enable_heap_dump = 'enable_heap_dump' in experiments + SdkHarness( + control_address=control_service_descriptor.url, + status_address=status_service_descriptor.url, + worker_id=_worker_id, + state_cache_size=sdk_worker_main._get_state_cache_size(experiments), + data_buffer_time_limit_ms=sdk_worker_main._get_data_buffer_time_limit_ms( + experiments), + profiler_factory=profiler.Profile.factory_from_options( + sdk_pipeline_options.view_as(ProfilingOptions)), + enable_heap_dump=enable_heap_dump).run() + except: # pylint: disable=broad-except + _LOGGER.exception('Python sdk harness failed: ') + raise + finally: + if fn_log_handler: + fn_log_handler.close() diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index bd6c7b4..a8d0864 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -91,6 +91,7 @@ class TableEnvironment(object): def __init__(self, j_tenv, serializer=PickleSerializer()): self._j_tenv = j_tenv self._serializer = serializer + self._remote_mode = False # When running in MiniCluster, launch the Python UDF worker using the Python executable # specified by sys.executable if users have not specified it explicitly via configuration # python.executable. @@ -1744,6 +1745,26 @@ class TableEnvironment(object): classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key() self._add_jars_to_j_env_config(jars_key) self._add_jars_to_j_env_config(classpaths_key) + # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster + if not self._remote_mode and \ + is_local_deployment(get_j_env_configuration(self._get_j_env())): + from pyflink.common import Configuration + _j_config = jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig( + self._get_j_env(), self.get_config()._j_table_config) + config = Configuration(j_configuration=_j_config) + parallelism = int(config.get_string("parallelism.default", "1")) + + if parallelism > 1 and config.contains_key(jvm.PythonOptions.PYTHON_ARCHIVES.key()): + import logging + logging.warning("Lookback mode is disabled as python archives are used and the " + "parallelism of the job is greater than 1. The Python user-defined " + "functions will be executed in an independent Python process.") + else: + from pyflink.fn_execution.beam.beam_worker_pool_service import \ + BeamFnLoopbackWorkerPoolServicer + + self.get_config().get_configuration().set_string( + "loopback.server.address", BeamFnLoopbackWorkerPoolServicer().start()) def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper: if isinstance(function, AggregateFunction): diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py index 2b9fa66..1e47fb5 100644 --- a/flink-python/pyflink/table/tests/test_dependency.py +++ b/flink-python/pyflink/table/tests/test_dependency.py @@ -72,6 +72,10 @@ class BatchDependencyTests(DependencyTests, PyFlinkBatchTableTestCase): class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase): + def setUp(self): + super(StreamDependencyTests, self).setUp() + self.t_env._remote_mode = False + def test_set_requirements_without_cached_directory(self): requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4())) with open(requirements_txt_path, 'w') as f: @@ -79,9 +83,8 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase): self.t_env.set_python_requirements(requirements_txt_path) def check_requirements(i): - import cloudpickle - assert os.path.abspath(cloudpickle.__file__).startswith( - os.environ['_PYTHON_REQUIREMENTS_INSTALL_DIR']) + import cloudpickle # noqa # pylint: disable=unused-import + assert '_PYTHON_REQUIREMENTS_INSTALL_DIR' in os.environ return i self.t_env.create_temporary_system_function("check_requirements", @@ -156,9 +159,11 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase): with open("data/data.txt", 'r') as f: return i + int(f.read()) + self.t_env.get_config().get_configuration().set_string("parallelism.default", "1") self.t_env.create_temporary_system_function("add_from_file", udf(add_from_file, DataTypes.BIGINT(), DataTypes.BIGINT())) + self.t_env._remote_mode = True table_sink = source_sink_utils.TestAppendSink( ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()]) self.t_env.register_table_sink("Results", table_sink) @@ -196,6 +201,7 @@ class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase): raise Exception("The gateway server is not disabled!") return i + self.t_env._remote_mode = True self.t_env.create_temporary_system_function( "check_pyflink_gateway_disabled", udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(), diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py index ddb5a6b..9f8ff80 100644 --- a/flink-python/pyflink/testing/test_case_utils.py +++ b/flink-python/pyflink/testing/test_case_utils.py @@ -136,6 +136,7 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase): self.t_env.get_config().get_configuration().set_string("parallelism.default", "2") self.t_env.get_config().get_configuration().set_string( "python.fn-execution.bundle.size", "1") + self.t_env._remote_mode = True class PyFlinkBatchTableTestCase(PyFlinkTestCase): @@ -149,6 +150,7 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase): self.t_env.get_config().get_configuration().set_string("parallelism.default", "2") self.t_env.get_config().get_configuration().set_string( "python.fn-execution.bundle.size", "1") + self.t_env._remote_mode = True class PyFlinkStreamingTestCase(PyFlinkTestCase): @@ -161,6 +163,7 @@ class PyFlinkStreamingTestCase(PyFlinkTestCase): self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING) + self.env._remote_mode = True class PyFlinkBatchTestCase(PyFlinkTestCase): @@ -173,6 +176,7 @@ class PyFlinkBatchTestCase(PyFlinkTestCase): self.env = StreamExecutionEnvironment.get_execution_environment() self.env.set_parallelism(2) self.env.set_runtime_mode(RuntimeExecutionMode.BATCH) + self.env._remote_mode = True class PythonAPICompletenessTestCase(object): diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index b0922be..18ddb58 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.ManagedMemoryUseCase; -import org.apache.flink.python.PythonConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -114,9 +113,9 @@ public class PythonConfigUtil { AbstractPythonFunctionOperator<?> pythonFunctionOperator = getPythonOperator(transformation); if (pythonFunctionOperator != null) { - Configuration oldConfig = pythonFunctionOperator.getPythonConfig().getConfig(); + Configuration oldConfig = pythonFunctionOperator.getConfiguration(); // update dependency related configurations for Python operators - pythonFunctionOperator.setPythonConfig( + pythonFunctionOperator.setConfiguration( generateNewPythonConfig(oldConfig, mergedConfig)); } } @@ -268,14 +267,14 @@ public class PythonConfigUtil { } /** - * Generator a new {@link PythonConfig} with the combined config which is derived from + * Generator a new {@link Configuration} with the combined config which is derived from * oldConfig. */ - private static PythonConfig generateNewPythonConfig( + private static Configuration generateNewPythonConfig( Configuration oldConfig, Configuration newConfig) { Configuration mergedConfig = newConfig.clone(); mergedConfig.addAll(oldConfig); - return new PythonConfig(mergedConfig); + return mergedConfig; } public static void setPartitionCustomOperatorNumPartitions( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java index eaf07d4..76202fb 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java @@ -38,8 +38,6 @@ import org.apache.flink.streaming.api.utils.PythonTypeUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import java.util.Map; - import static org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto; /** @@ -56,9 +54,6 @@ public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT> private static final long serialVersionUID = 1L; - /** The options used to configure the Python worker process. */ - private final Map<String, String> jobOptions; - /** The TypeInformation of input data. */ private final TypeInformation<IN> inputTypeInfo; @@ -96,7 +91,6 @@ public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT> TypeInformation<IN> inputTypeInfo, TypeInformation<OUT> outputTypeInfo) { super(config, pythonFunctionInfo, outputTypeInfo); - this.jobOptions = config.toMap(); this.inputTypeInfo = Preconditions.checkNotNull(inputTypeInfo); } @@ -160,10 +154,6 @@ public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT> // Getters // ---------------------------------------------------------------------- - protected Map<String, String> getJobOptions() { - return jobOptions; - } - public TypeInformation<IN> getInputTypeInfo() { return inputTypeInfo; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java index 895d511..7b5ab9a 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java @@ -39,10 +39,10 @@ import org.apache.flink.table.functions.python.PythonEnv; import org.apache.flink.util.Preconditions; import org.apache.flink.util.WrappingRuntimeException; -import java.io.IOException; import java.lang.reflect.Field; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -59,6 +59,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream private static final long serialVersionUID = 1L; + protected Configuration config; + /** * The {@link PythonFunctionRunner} which is responsible for Python user-defined function * execution. @@ -71,6 +73,12 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream /** Number of processed elements in the current bundle. */ protected transient int elementCount; + /** The python config. */ + protected transient PythonConfig pythonConfig; + + /** The options used to configure the Python worker process. */ + protected transient Map<String, String> jobOptions; + /** Max duration of a bundle. */ private transient long maxBundleTimeMills; @@ -85,22 +93,17 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream private transient ExecutorService flushThreadPool; - /** The python config. */ - private PythonConfig config; - public AbstractPythonFunctionOperator(Configuration config) { - this.config = new PythonConfig(Preconditions.checkNotNull(config)); + this.config = Preconditions.checkNotNull(config); this.chainingStrategy = ChainingStrategy.ALWAYS; } - public PythonConfig getPythonConfig() { - return config; - } - @Override public void open() throws Exception { try { - this.maxBundleSize = config.getMaxBundleSize(); + this.pythonConfig = new PythonConfig(config); + this.jobOptions = config.toMap(); + this.maxBundleSize = pythonConfig.getMaxBundleSize(); if (this.maxBundleSize <= 0) { this.maxBundleSize = PythonOptions.MAX_BUNDLE_SIZE.defaultValue(); LOG.error( @@ -111,7 +114,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream LOG.info("The maximum bundle size is configured to {}.", this.maxBundleSize); } - this.maxBundleTimeMills = config.getMaxBundleTimeMills(); + this.maxBundleTimeMills = pythonConfig.getMaxBundleTimeMills(); if (this.maxBundleTimeMills <= 0L) { this.maxBundleTimeMills = PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue(); LOG.error( @@ -125,7 +128,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream } this.pythonFunctionRunner = createPythonFunctionRunner(); - this.pythonFunctionRunner.open(config); + this.pythonFunctionRunner.open(pythonConfig); this.elementCount = 0; this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); @@ -283,13 +286,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream return elementCount == 0; } - /** Reset the {@link PythonConfig} if needed. */ - public void setPythonConfig(PythonConfig pythonConfig) { - this.config = pythonConfig; + /** Reset the {@link Configuration} if needed. */ + public void setConfiguration(Configuration config) { + this.config = config; } - /** Returns the {@link PythonConfig}. */ - public PythonConfig getConfig() { + /** Returns the {@link Configuration}. */ + public Configuration getConfiguration() { return config; } @@ -369,9 +372,10 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream } } - protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOException { + protected PythonEnvironmentManager createPythonEnvironmentManager() { PythonDependencyInfo dependencyInfo = - PythonDependencyInfo.create(config, getRuntimeContext().getDistributedCache()); + PythonDependencyInfo.create( + pythonConfig, getRuntimeContext().getDistributedCache()); PythonEnv pythonEnv = getPythonEnv(); if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) { return new ProcessPythonEnvironmentManager( @@ -386,7 +390,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream } protected FlinkMetricContainer getFlinkMetricContainer() { - return this.config.isMetricEnabled() + return this.pythonConfig.isMetricEnabled() ? new FlinkMetricContainer(getRuntimeContext().getMetricGroup()) : null; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java index 5bf078d..733fe21 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java @@ -38,8 +38,6 @@ import org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCol import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import java.util.Map; - import static org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto; import static org.apache.flink.streaming.api.utils.PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter; @@ -54,9 +52,6 @@ public abstract class AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> private static final long serialVersionUID = 1L; - /** The options used to configure the Python worker process. */ - private final Map<String, String> jobOptions; - /** The left input type. */ private final TypeInformation<IN1> inputTypeInfo1; @@ -89,7 +84,6 @@ public abstract class AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> TypeInformation<IN2> inputTypeInfo2, TypeInformation<OUT> outputTypeInfo) { super(config, pythonFunctionInfo, outputTypeInfo); - this.jobOptions = config.toMap(); this.inputTypeInfo1 = Preconditions.checkNotNull(inputTypeInfo1); this.inputTypeInfo2 = Preconditions.checkNotNull(inputTypeInfo2); } @@ -152,10 +146,6 @@ public abstract class AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> // Getters // ---------------------------------------------------------------------- - protected Map<String, String> getJobOptions() { - return jobOptions; - } - protected TypeInformation<IN1> getLeftInputType() { return inputTypeInfo1; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java index 5508f6a..8208600 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java @@ -74,7 +74,7 @@ public class PythonCoProcessOperator<IN1, IN2, OUT> getRuntimeContext(), getInternalParameters(), inBatchExecutionMode(getKeyedStateBackend())), - getJobOptions(), + jobOptions, getFlinkMetricContainer(), null, null, @@ -117,7 +117,7 @@ public class PythonCoProcessOperator<IN1, IN2, OUT> public <T> AbstractDataStreamPythonFunctionOperator<T> copy( DataStreamPythonFunctionInfo pythonFunctionInfo, TypeInformation<T> outputTypeInfo) { return new PythonCoProcessOperator<>( - getConfig().getConfig(), + config, pythonFunctionInfo, getLeftInputType(), getRightInputType(), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java index 9038a28..ba1c94c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java @@ -113,7 +113,7 @@ public class PythonKeyedCoProcessOperator<OUT> getInternalParameters(), keyTypeInfo, inBatchExecutionMode(getKeyedStateBackend())), - getJobOptions(), + jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, @@ -219,7 +219,7 @@ public class PythonKeyedCoProcessOperator<OUT> public <T> AbstractDataStreamPythonFunctionOperator<T> copy( DataStreamPythonFunctionInfo pythonFunctionInfo, TypeInformation<T> outputTypeInfo) { return new PythonKeyedCoProcessOperator<>( - getConfig().getConfig(), + config, pythonFunctionInfo, getLeftInputType(), getRightInputType(), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java index e3df754..3d81c10 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java @@ -143,7 +143,7 @@ public class PythonKeyedProcessOperator<OUT> getInternalParameters(), keyTypeInfo, inBatchExecutionMode(getKeyedStateBackend())), - getJobOptions(), + jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), keyTypeSerializer, @@ -237,7 +237,7 @@ public class PythonKeyedProcessOperator<OUT> public <T> AbstractDataStreamPythonFunctionOperator<T> copy( DataStreamPythonFunctionInfo pythonFunctionInfo, TypeInformation<T> outputTypeInfo) { return new PythonKeyedProcessOperator<>( - getConfig().getConfig(), + config, pythonFunctionInfo, (RowTypeInfo) getInputTypeInfo(), outputTypeInfo, diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java index 451418c..69d4e74 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java @@ -71,7 +71,7 @@ public class PythonProcessOperator<IN, OUT> getRuntimeContext(), getInternalParameters(), inBatchExecutionMode(getKeyedStateBackend())), - getJobOptions(), + jobOptions, getFlinkMetricContainer(), null, null, @@ -109,6 +109,6 @@ public class PythonProcessOperator<IN, OUT> public <T> AbstractDataStreamPythonFunctionOperator<T> copy( DataStreamPythonFunctionInfo pythonFunctionInfo, TypeInformation<T> outputTypeInfo) { return new PythonProcessOperator<>( - getConfig().getConfig(), pythonFunctionInfo, getInputTypeInfo(), outputTypeInfo); + config, pythonFunctionInfo, getInputTypeInfo(), outputTypeInfo); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java index f83f560..32e1339 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java @@ -34,9 +34,7 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Map; import java.util.stream.Collectors; /** @@ -61,9 +59,6 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> /** The offsets of user-defined function inputs. */ protected final int[] userDefinedFunctionInputOffsets; - /** The options used to configure the Python worker process. */ - private final Map<String, String> jobOptions; - /** The user-defined function input logical type. */ protected transient RowType userDefinedFunctionInputType; @@ -97,7 +92,6 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> this.outputType = Preconditions.checkNotNull(outputType); this.userDefinedFunctionInputOffsets = Preconditions.checkNotNull(userDefinedFunctionInputOffsets); - this.jobOptions = buildJobOptions(config); } @Override @@ -173,12 +167,4 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> RowType runnerOutType); public abstract void processElementInternal(IN value) throws Exception; - - private Map<String, String> buildJobOptions(Configuration config) { - Map<String, String> jobOptions = new HashMap<>(); - if (config.containsKey("table.exec.timezone")) { - jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null)); - } - return jobOptions; - } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java index 19deb7d..c4b3638 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java @@ -47,8 +47,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.stream.Collectors; import static org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto; @@ -78,9 +76,6 @@ public abstract class AbstractPythonStreamAggregateOperator /** The output logical type. */ protected final RowType outputType; - /** The options used to configure the Python worker process. */ - private final Map<String, String> jobOptions; - /** The array of the key indexes. */ private final int[] grouping; @@ -141,7 +136,6 @@ public abstract class AbstractPythonStreamAggregateOperator this.outputType = Preconditions.checkNotNull(outputType); this.aggregateFunctions = aggregateFunctions; this.dataViewSpecs = dataViewSpecs; - this.jobOptions = buildJobOptions(config); this.grouping = grouping; this.indexOfCountStar = indexOfCountStar; this.generateUpdateBefore = generateUpdateBefore; @@ -164,6 +158,7 @@ public abstract class AbstractPythonStreamAggregateOperator PythonTypeUtils.toInternalSerializer(userDefinedFunctionOutputType); rowDataWrapper = new StreamRecordRowDataWrappingCollector(output); super.open(); + configJobOptions(); } @Override @@ -244,8 +239,8 @@ public abstract class AbstractPythonStreamAggregateOperator protected FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() { FlinkFnApi.UserDefinedAggregateFunctions.Builder builder = FlinkFnApi.UserDefinedAggregateFunctions.newBuilder(); - builder.setMetricEnabled(getPythonConfig().isMetricEnabled()); - builder.setProfileEnabled(getPythonConfig().isProfileEnabled()); + builder.setMetricEnabled(pythonConfig.isMetricEnabled()); + builder.setProfileEnabled(pythonConfig.isProfileEnabled()); builder.addAllGrouping(Arrays.stream(grouping).boxed().collect(Collectors.toList())); builder.setGenerateUpdateBefore(generateUpdateBefore); builder.setIndexOfCountStar(indexOfCountStar); @@ -272,18 +267,13 @@ public abstract class AbstractPythonStreamAggregateOperator public abstract RowType createUserDefinedFunctionOutputType(); - private Map<String, String> buildJobOptions(Configuration config) { - Map<String, String> jobOptions = new HashMap<>(); - if (config.containsKey("table.exec.timezone")) { - jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null)); - } + private void configJobOptions() { jobOptions.put( PythonOptions.STATE_CACHE_SIZE.key(), String.valueOf(config.get(PythonOptions.STATE_CACHE_SIZE))); jobOptions.put( PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), String.valueOf(config.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE))); - return jobOptions; } public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType runnerInputType) { diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java index 2ef2038..f2431e5 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java @@ -153,8 +153,8 @@ public abstract class AbstractArrowPythonAggregateFunctionOperator for (PythonFunctionInfo pythonFunctionInfo : pandasAggFunctions) { builder.addUdfs(getUserDefinedFunctionProto(pythonFunctionInfo)); } - builder.setMetricEnabled(getPythonConfig().isMetricEnabled()); - builder.setProfileEnabled(getPythonConfig().isProfileEnabled()); + builder.setMetricEnabled(pythonConfig.isMetricEnabled()); + builder.setProfileEnabled(pythonConfig.isProfileEnabled()); return builder.build(); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java index a004baa..3d7f7e0 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java @@ -256,8 +256,8 @@ public class BatchArrowPythonOverWindowAggregateFunctionOperator functionBuilder.setWindowIndex(aggWindowIndex[i]); builder.addUdfs(functionBuilder); } - builder.setMetricEnabled(getPythonConfig().isMetricEnabled()); - builder.setProfileEnabled(getPythonConfig().isProfileEnabled()); + builder.setMetricEnabled(pythonConfig.isMetricEnabled()); + builder.setProfileEnabled(pythonConfig.isProfileEnabled()); // add windows for (int i = 0; i < lowerBoundary.length; i++) { FlinkFnApi.OverWindow.Builder windowBuilder = FlinkFnApi.OverWindow.newBuilder(); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java index b286164..606766e 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java @@ -119,8 +119,8 @@ public abstract class AbstractPythonScalarFunctionOperator for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) { builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo)); } - builder.setMetricEnabled(getPythonConfig().isMetricEnabled()); - builder.setProfileEnabled(getPythonConfig().isProfileEnabled()); + builder.setMetricEnabled(pythonConfig.isMetricEnabled()); + builder.setProfileEnabled(pythonConfig.isProfileEnabled()); return builder.build(); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java index ebf6800..c42f05d 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java @@ -58,7 +58,7 @@ public class ArrowPythonScalarFunctionOperator extends AbstractPythonScalarFunct @Override public void open() throws Exception { super.open(); - maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), maxBundleSize); + maxArrowBatchSize = Math.min(pythonConfig.getMaxArrowBatchSize(), maxBundleSize); arrowSerializer = new ArrowSerializer(userDefinedFunctionInputType, userDefinedFunctionOutputType); arrowSerializer.open(bais, baos); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java index 0a51717..38fbcb8 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java @@ -156,8 +156,8 @@ public class PythonTableFunctionOperator FlinkFnApi.UserDefinedFunctions.Builder builder = FlinkFnApi.UserDefinedFunctions.newBuilder(); builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(tableFunction)); - builder.setMetricEnabled(getPythonConfig().isMetricEnabled()); - builder.setProfileEnabled(getPythonConfig().isProfileEnabled()); + builder.setMetricEnabled(pythonConfig.isMetricEnabled()); + builder.setProfileEnabled(pythonConfig.isProfileEnabled()); return builder.build(); }