This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a68698264ce1ebc37921ad6517eccd5c3700aeeb
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Fri Aug 6 11:32:26 2021 +0800

    [FLINK-21222][python] Support loopback mode to allow Python UDF worker and 
client reuse the same Python VM
    
    This closes #16732.
---
 docs/content.zh/docs/dev/python/debugging.md       |   9 ++
 docs/content/docs/dev/python/debugging.md          |   9 ++
 .../datastream/stream_execution_environment.py     |  24 +++-
 .../tests/test_stream_execution_environment.py     |   6 +
 .../pyflink/fn_execution/beam/beam_boot.py         |  65 +++++----
 .../fn_execution/beam/beam_worker_pool_service.py  | 150 +++++++++++++++++++++
 flink-python/pyflink/table/table_environment.py    |  21 +++
 .../pyflink/table/tests/test_dependency.py         |  12 +-
 flink-python/pyflink/testing/test_case_utils.py    |   4 +
 .../apache/flink/python/util/PythonConfigUtil.java |  11 +-
 .../AbstractOneInputPythonFunctionOperator.java    |  10 --
 .../python/AbstractPythonFunctionOperator.java     |  44 +++---
 .../AbstractTwoInputPythonFunctionOperator.java    |  10 --
 .../operators/python/PythonCoProcessOperator.java  |   4 +-
 .../python/PythonKeyedCoProcessOperator.java       |   4 +-
 .../python/PythonKeyedProcessOperator.java         |   4 +-
 .../operators/python/PythonProcessOperator.java    |   4 +-
 .../python/AbstractStatelessFunctionOperator.java  |  14 --
 .../AbstractPythonStreamAggregateOperator.java     |  18 +--
 ...stractArrowPythonAggregateFunctionOperator.java |   4 +-
 ...wPythonOverWindowAggregateFunctionOperator.java |   4 +-
 .../AbstractPythonScalarFunctionOperator.java      |   4 +-
 .../arrow/ArrowPythonScalarFunctionOperator.java   |   2 +-
 .../python/table/PythonTableFunctionOperator.java  |   4 +-
 24 files changed, 321 insertions(+), 120 deletions(-)

diff --git a/docs/content.zh/docs/dev/python/debugging.md 
b/docs/content.zh/docs/dev/python/debugging.md
index 2692a25..2f15eb7 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -51,6 +51,15 @@ $ python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyfl
 ```
 
 ## 调试Python UDFs
+
+### 本地调试
+
+你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。
+
+**注意:** 当前,如果你使用了配置 `python-archives`,并且作业的并发度是大于`1`的,只能够使用[远程调试](#远程调试)的方式。
+
+### 远程调试
+
 
你可以利用PyCharm提供的[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/)工具进行Python
 UDF的调试
 
 1. 在PyCharm里创建一个Python Remote Debug
diff --git a/docs/content/docs/dev/python/debugging.md 
b/docs/content/docs/dev/python/debugging.md
index 2bbc263..3ea602a 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -52,6 +52,15 @@ $ python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyfl
 ```
 
 ## Debugging Python UDFs
+
+### Local Debug
+
+You can debug your python functions directly in IDEs such as PyCharm.
+
+**Note:** Currently, if you use `python-archives` in the job and the 
parallelism of the job is greater than `1`, you can only use [remote 
debug](#remote-debug) mode.
+
+### Remote Debug
+
 You can make use of the 
[`pydevd_pycharm`](https://pypi.org/project/pydevd-pycharm/) tool of PyCharm to 
debug Python UDFs.
 
 1. Create a Python Remote Debug in PyCharm
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index a7ba4c1..3da36ff 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -40,7 +40,7 @@ from pyflink.datastream.time_characteristic import 
TimeCharacteristic
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import PickleSerializer
 from pyflink.util.java_utils import load_java_class, 
add_jars_to_context_class_loader, \
-    invoke_method, get_field_value
+    invoke_method, get_field_value, is_local_deployment, 
get_j_env_configuration
 
 __all__ = ['StreamExecutionEnvironment']
 
@@ -58,6 +58,7 @@ class StreamExecutionEnvironment(object):
 
     def __init__(self, j_stream_execution_environment, 
serializer=PickleSerializer()):
         self._j_stream_execution_environment = j_stream_execution_environment
+        self._remote_mode = False
         self.serializer = serializer
 
     def get_config(self) -> ExecutionConfig:
@@ -869,6 +870,27 @@ class StreamExecutionEnvironment(object):
             -> JavaObject:
         gateway = get_gateway()
         JPythonConfigUtil = 
gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
+        # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
+        j_configuration = 
get_j_env_configuration(self._j_stream_execution_environment)
+        if not self._remote_mode and is_local_deployment(j_configuration):
+            from pyflink.common import Configuration
+            from pyflink.fn_execution.beam.beam_worker_pool_service import \
+                BeamFnLoopbackWorkerPoolServicer
+
+            jvm = gateway.jvm
+            env_config = JPythonConfigUtil.getEnvironmentConfig(
+                self._j_stream_execution_environment)
+            parallelism = self.get_parallelism()
+            if parallelism > 1 and 
env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
+                import logging
+                logging.warning("Lookback mode is disabled as python archives 
are used and the "
+                                "parallelism of the job is greater than 1. The 
Python user-defined "
+                                "functions will be executed in an independent 
Python process.")
+            else:
+                config = Configuration(j_configuration=j_configuration)
+                config.set_string(
+                    "loopback.server.address", 
BeamFnLoopbackWorkerPoolServicer().start())
+
         
JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
 
         
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index b026526..e3544f8 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -51,6 +51,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
     def setUp(self):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
+        self.env._remote_mode = True
         self.test_sink = DataStreamTestSinkFunction()
 
     def test_get_config(self):
@@ -350,6 +351,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
         get_j_env_configuration(self.env._j_stream_execution_environment).\
             setString("taskmanager.numberOfTaskSlots", "10")
         self.env.add_python_file(python_file_path)
+        self.env._remote_mode = False
         ds = self.env.from_collection([1, 2, 3, 4, 5])
         ds = ds.map(plus_two_map, Types.LONG()) \
                .slot_sharing_group("data_stream") \
@@ -399,6 +401,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
         get_j_env_configuration(self.env._j_stream_execution_environment).\
             setString("taskmanager.numberOfTaskSlots", "10")
+        self.env._remote_mode = False
         self.env.add_python_file(python_file_path)
         ds = self.env.from_collection([1, 2, 3, 4, 5])
         ds = ds.map(plus_two_map, Types.LONG()) \
@@ -488,6 +491,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
             from python_package1 import plus
             return plus(i, 1)
 
+        self.env._remote_mode = False
         ds = self.env.from_collection([1, 2, 3, 4, 5])
         ds.map(add_one).add_sink(self.test_sink)
         self.env.execute("test set requirements with cachd dir")
@@ -513,6 +517,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
             with open("data/data.txt", 'r') as f:
                 return i + int(f.read())
 
+        self.env._remote_mode = False
         ds = self.env.from_collection([1, 2, 3, 4, 5])
         ds.map(add_from_file).add_sink(self.test_sink)
         self.env.execute("test set python archive")
@@ -536,6 +541,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
             assert os.environ["python"] == python_exec_link_path
             return i
 
+        self.env._remote_mode = False
         ds = self.env.from_collection([1, 2, 3, 4, 5])
         ds.map(check_python_exec).add_sink(self.test_sink)
         self.env.execute("test set python executable")
diff --git a/flink-python/pyflink/fn_execution/beam/beam_boot.py 
b/flink-python/pyflink/fn_execution/beam/beam_boot.py
index b0c42dc..5887434 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_boot.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_boot.py
@@ -35,6 +35,8 @@ import grpc
 import logging
 import sys
 
+from apache_beam.portability.api.beam_fn_api_pb2_grpc import 
BeamFnExternalWorkerPoolStub
+from apache_beam.portability.api.beam_fn_api_pb2 import StartWorkerRequest
 from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
 from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
 from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
@@ -74,28 +76,41 @@ if __name__ == "__main__":
 
     logging.info("Initializing python harness: %s" % " ".join(sys.argv))
 
-    metadata = [("worker_id", worker_id)]
-
-    # read job information from provision stub
-    with grpc.insecure_channel(provision_endpoint) as channel:
-        client = ProvisionServiceStub(channel=channel)
-        info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
-        options = json_format.MessageToJson(info.pipeline_options)
-        logging_endpoint = info.logging_endpoint.url
-        control_endpoint = info.control_endpoint.url
-
-    os.environ["WORKER_ID"] = worker_id
-    os.environ["PIPELINE_OPTIONS"] = options
-    os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir
-    os.environ["LOGGING_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString(
-        ApiServiceDescriptor(url=logging_endpoint))
-    os.environ["CONTROL_API_SERVICE_DESCRIPTOR"] = text_format.MessageToString(
-        ApiServiceDescriptor(url=control_endpoint))
-
-    env = dict(os.environ)
-
-    if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] 
== "1":
-        exit(0)
-
-    call([python_exec, "-m", "pyflink.fn_execution.beam.beam_sdk_worker_main"],
-         stdout=sys.stdout, stderr=sys.stderr, env=env)
+    if 'loopback.server.address' in os.environ:
+        params = dict(os.environ)
+        params.update({'SEMI_PERSISTENT_DIRECTORY': semi_persist_dir})
+        with grpc.insecure_channel(os.environ['loopback.server.address']) as 
channel:
+            client = BeamFnExternalWorkerPoolStub(channel=channel)
+            request = StartWorkerRequest(
+                worker_id=worker_id,
+                
provision_endpoint=ApiServiceDescriptor(url=provision_endpoint),
+                params=params)
+            client.StartWorker(request)
+    else:
+        metadata = [("worker_id", worker_id)]
+
+        # read job information from provision stub
+        with grpc.insecure_channel(provision_endpoint) as channel:
+            client = ProvisionServiceStub(channel=channel)
+            info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+            options = json_format.MessageToJson(info.pipeline_options)
+            logging_endpoint = info.logging_endpoint.url
+            control_endpoint = info.control_endpoint.url
+
+        os.environ["WORKER_ID"] = worker_id
+        os.environ["PIPELINE_OPTIONS"] = options
+        os.environ["SEMI_PERSISTENT_DIRECTORY"] = semi_persist_dir
+        os.environ["LOGGING_API_SERVICE_DESCRIPTOR"] = 
text_format.MessageToString(
+            ApiServiceDescriptor(url=logging_endpoint))
+        os.environ["CONTROL_API_SERVICE_DESCRIPTOR"] = 
text_format.MessageToString(
+            ApiServiceDescriptor(url=control_endpoint))
+
+        env = dict(os.environ)
+        with open('/tmp/test.txt', 'a') as fd:
+            fd.write(str(env) + '\n')
+
+        if "FLINK_BOOT_TESTING" in os.environ and 
os.environ["FLINK_BOOT_TESTING"] == "1":
+            exit(0)
+
+        call([python_exec, "-m", 
"pyflink.fn_execution.beam.beam_sdk_worker_main"],
+             stdout=sys.stdout, stderr=sys.stderr, env=env)
diff --git a/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py 
b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
new file mode 100644
index 0000000..a844ff1
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/beam/beam_worker_pool_service.py
@@ -0,0 +1,150 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import atexit
+import functools
+import logging
+import os
+import sys
+import threading
+import traceback
+
+import grpc
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import ProfilingOptions
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.runners.worker import sdk_worker_main
+from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
+from apache_beam.runners.worker.sdk_worker import SdkHarness
+from apache_beam.utils import thread_pool_executor, profiler
+from google.protobuf import json_format
+
+from pyflink.fn_execution.beam import beam_sdk_worker_main  # noqa # pylint: 
disable=unused-import
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class 
BeamFnLoopbackWorkerPoolServicer(beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+    """
+    Worker pool entry point.
+
+    The worker pool exposes an RPC service that is used in MiniCluster to 
start and stop the Python
+    SDK workers.
+
+    The worker pool uses child thread for parallelism
+    """
+
+    def __init__(self):
+        self._worker_server = None
+        self._parse_param_lock = threading.Lock()
+
+    def start(self):
+        worker_server = grpc.server(
+            thread_pool_executor.shared_unbounded_instance())
+        worker_address = 'localhost:%s' % 
worker_server.add_insecure_port('[::]:0')
+        
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(self, 
worker_server)
+        worker_server.start()
+        _LOGGER.info('Listening for workers at %s', worker_address)
+
+        self._worker_server = worker_server
+        atexit.register(functools.partial(worker_server.stop, 1))
+        return worker_address
+
+    def StartWorker(self,
+                    start_worker_request: beam_fn_api_pb2.StartWorkerRequest,
+                    unused_context):
+        try:
+            worker_thread = threading.Thread(
+                name='run_worker_%s' % start_worker_request.worker_id,
+                target=functools.partial(self._start_sdk_worker_main, 
start_worker_request))
+            worker_thread.daemon = True
+            worker_thread.start()
+            return beam_fn_api_pb2.StartWorkerResponse()
+        except Exception:
+            return 
beam_fn_api_pb2.StartWorkerResponse(error=traceback.format_exc())
+
+    def StopWorker(self,
+                   stop_worker_request: beam_fn_api_pb2.StopWorkerRequest,
+                   unused_context):
+        pass
+
+    def _start_sdk_worker_main(self, start_worker_request: 
beam_fn_api_pb2.StartWorkerRequest):
+        params = start_worker_request.params
+        self._parse_param_lock.acquire()
+        if 'PYTHONPATH' in params:
+            python_path_list = params['PYTHONPATH'].split(':')
+            python_path_list.reverse()
+            for path in python_path_list:
+                sys.path.insert(0, path)
+        if '_PYTHON_WORKING_DIR' in params:
+            os.chdir(params['_PYTHON_WORKING_DIR'])
+        os.environ.update(params)
+        self._parse_param_lock.release()
+
+        # read job information from provision stub
+        metadata = [("worker_id", start_worker_request.worker_id)]
+        provision_endpoint = start_worker_request.provision_endpoint.url
+        with grpc.insecure_channel(provision_endpoint) as channel:
+            client = ProvisionServiceStub(channel=channel)
+            info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+            options = json_format.MessageToJson(info.pipeline_options)
+            logging_endpoint = info.logging_endpoint.url
+            control_endpoint = info.control_endpoint.url
+
+        try:
+            logging_service_descriptor = 
endpoints_pb2.ApiServiceDescriptor(url=logging_endpoint)
+
+            # Send all logs to the runner.
+            fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor)
+            logging.getLogger().setLevel(logging.ERROR)
+            logging.getLogger().addHandler(fn_log_handler)
+        except Exception:
+            _LOGGER.error(
+                "Failed to set up logging handler, continuing without.",
+                exc_info=True)
+            fn_log_handler = None
+
+        sdk_pipeline_options = sdk_worker_main._parse_pipeline_options(options)
+
+        _worker_id = start_worker_request.worker_id
+
+        try:
+            control_service_descriptor = 
endpoints_pb2.ApiServiceDescriptor(url=control_endpoint)
+            status_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
+
+            experiments = 
sdk_pipeline_options.view_as(DebugOptions).experiments or []
+            enable_heap_dump = 'enable_heap_dump' in experiments
+            SdkHarness(
+                control_address=control_service_descriptor.url,
+                status_address=status_service_descriptor.url,
+                worker_id=_worker_id,
+                
state_cache_size=sdk_worker_main._get_state_cache_size(experiments),
+                
data_buffer_time_limit_ms=sdk_worker_main._get_data_buffer_time_limit_ms(
+                    experiments),
+                profiler_factory=profiler.Profile.factory_from_options(
+                    sdk_pipeline_options.view_as(ProfilingOptions)),
+                enable_heap_dump=enable_heap_dump).run()
+        except:  # pylint: disable=broad-except
+            _LOGGER.exception('Python sdk harness failed: ')
+            raise
+        finally:
+            if fn_log_handler:
+                fn_log_handler.close()
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index bd6c7b4..a8d0864 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -91,6 +91,7 @@ class TableEnvironment(object):
     def __init__(self, j_tenv, serializer=PickleSerializer()):
         self._j_tenv = j_tenv
         self._serializer = serializer
+        self._remote_mode = False
         # When running in MiniCluster, launch the Python UDF worker using the 
Python executable
         # specified by sys.executable if users have not specified it 
explicitly via configuration
         # python.executable.
@@ -1744,6 +1745,26 @@ class TableEnvironment(object):
         classpaths_key = 
jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         self._add_jars_to_j_env_config(jars_key)
         self._add_jars_to_j_env_config(classpaths_key)
+        # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
+        if not self._remote_mode and \
+                
is_local_deployment(get_j_env_configuration(self._get_j_env())):
+            from pyflink.common import Configuration
+            _j_config = 
jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig(
+                self._get_j_env(), self.get_config()._j_table_config)
+            config = Configuration(j_configuration=_j_config)
+            parallelism = int(config.get_string("parallelism.default", "1"))
+
+            if parallelism > 1 and 
config.contains_key(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
+                import logging
+                logging.warning("Lookback mode is disabled as python archives 
are used and the "
+                                "parallelism of the job is greater than 1. The 
Python user-defined "
+                                "functions will be executed in an independent 
Python process.")
+            else:
+                from pyflink.fn_execution.beam.beam_worker_pool_service import 
\
+                    BeamFnLoopbackWorkerPoolServicer
+
+                self.get_config().get_configuration().set_string(
+                    "loopback.server.address", 
BeamFnLoopbackWorkerPoolServicer().start())
 
     def _wrap_aggregate_function_if_needed(self, function) -> 
UserDefinedFunctionWrapper:
         if isinstance(function, AggregateFunction):
diff --git a/flink-python/pyflink/table/tests/test_dependency.py 
b/flink-python/pyflink/table/tests/test_dependency.py
index 2b9fa66..1e47fb5 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -72,6 +72,10 @@ class BatchDependencyTests(DependencyTests, 
PyFlinkBatchTableTestCase):
 
 class StreamDependencyTests(DependencyTests, PyFlinkStreamTableTestCase):
 
+    def setUp(self):
+        super(StreamDependencyTests, self).setUp()
+        self.t_env._remote_mode = False
+
     def test_set_requirements_without_cached_directory(self):
         requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
         with open(requirements_txt_path, 'w') as f:
@@ -79,9 +83,8 @@ class StreamDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
         self.t_env.set_python_requirements(requirements_txt_path)
 
         def check_requirements(i):
-            import cloudpickle
-            assert os.path.abspath(cloudpickle.__file__).startswith(
-                os.environ['_PYTHON_REQUIREMENTS_INSTALL_DIR'])
+            import cloudpickle  # noqa # pylint: disable=unused-import
+            assert '_PYTHON_REQUIREMENTS_INSTALL_DIR' in os.environ
             return i
 
         self.t_env.create_temporary_system_function("check_requirements",
@@ -156,9 +159,11 @@ class StreamDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
             with open("data/data.txt", 'r') as f:
                 return i + int(f.read())
 
+        
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"1")
         self.t_env.create_temporary_system_function("add_from_file",
                                                     udf(add_from_file, 
DataTypes.BIGINT(),
                                                         DataTypes.BIGINT()))
+        self.t_env._remote_mode = True
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
@@ -196,6 +201,7 @@ class StreamDependencyTests(DependencyTests, 
PyFlinkStreamTableTestCase):
                 raise Exception("The gateway server is not disabled!")
             return i
 
+        self.t_env._remote_mode = True
         self.t_env.create_temporary_system_function(
             "check_pyflink_gateway_disabled",
             udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
diff --git a/flink-python/pyflink/testing/test_case_utils.py 
b/flink-python/pyflink/testing/test_case_utils.py
index ddb5a6b..9f8ff80 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -136,6 +136,7 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase):
         
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
+        self.t_env._remote_mode = True
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -149,6 +150,7 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase):
         
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
+        self.t_env._remote_mode = True
 
 
 class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -161,6 +163,7 @@ class PyFlinkStreamingTestCase(PyFlinkTestCase):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+        self.env._remote_mode = True
 
 
 class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -173,6 +176,7 @@ class PyFlinkBatchTestCase(PyFlinkTestCase):
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
+        self.env._remote_mode = True
 
 
 class PythonAPICompletenessTestCase(object):
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index b0922be..18ddb58 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
-import org.apache.flink.python.PythonConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -114,9 +113,9 @@ public class PythonConfigUtil {
                 AbstractPythonFunctionOperator<?> pythonFunctionOperator =
                         getPythonOperator(transformation);
                 if (pythonFunctionOperator != null) {
-                    Configuration oldConfig = 
pythonFunctionOperator.getPythonConfig().getConfig();
+                    Configuration oldConfig = 
pythonFunctionOperator.getConfiguration();
                     // update dependency related configurations for Python 
operators
-                    pythonFunctionOperator.setPythonConfig(
+                    pythonFunctionOperator.setConfiguration(
                             generateNewPythonConfig(oldConfig, mergedConfig));
                 }
             }
@@ -268,14 +267,14 @@ public class PythonConfigUtil {
     }
 
     /**
-     * Generator a new {@link PythonConfig} with the combined config which is 
derived from
+     * Generator a new {@link Configuration} with the combined config which is 
derived from
      * oldConfig.
      */
-    private static PythonConfig generateNewPythonConfig(
+    private static Configuration generateNewPythonConfig(
             Configuration oldConfig, Configuration newConfig) {
         Configuration mergedConfig = newConfig.clone();
         mergedConfig.addAll(oldConfig);
-        return new PythonConfig(mergedConfig);
+        return mergedConfig;
     }
 
     public static void setPartitionCustomOperatorNumPartitions(
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java
index eaf07d4..76202fb 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.java
@@ -38,8 +38,6 @@ import org.apache.flink.streaming.api.utils.PythonTypeUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Map;
-
 import static 
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
 
 /**
@@ -56,9 +54,6 @@ public abstract class 
AbstractOneInputPythonFunctionOperator<IN, OUT>
 
     private static final long serialVersionUID = 1L;
 
-    /** The options used to configure the Python worker process. */
-    private final Map<String, String> jobOptions;
-
     /** The TypeInformation of input data. */
     private final TypeInformation<IN> inputTypeInfo;
 
@@ -96,7 +91,6 @@ public abstract class 
AbstractOneInputPythonFunctionOperator<IN, OUT>
             TypeInformation<IN> inputTypeInfo,
             TypeInformation<OUT> outputTypeInfo) {
         super(config, pythonFunctionInfo, outputTypeInfo);
-        this.jobOptions = config.toMap();
         this.inputTypeInfo = Preconditions.checkNotNull(inputTypeInfo);
     }
 
@@ -160,10 +154,6 @@ public abstract class 
AbstractOneInputPythonFunctionOperator<IN, OUT>
     // Getters
     // ----------------------------------------------------------------------
 
-    protected Map<String, String> getJobOptions() {
-        return jobOptions;
-    }
-
     public TypeInformation<IN> getInputTypeInfo() {
         return inputTypeInfo;
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 895d511..7b5ab9a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -39,10 +39,10 @@ import org.apache.flink.table.functions.python.PythonEnv;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.WrappingRuntimeException;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -59,6 +59,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     private static final long serialVersionUID = 1L;
 
+    protected Configuration config;
+
     /**
      * The {@link PythonFunctionRunner} which is responsible for Python 
user-defined function
      * execution.
@@ -71,6 +73,12 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     /** Number of processed elements in the current bundle. */
     protected transient int elementCount;
 
+    /** The python config. */
+    protected transient PythonConfig pythonConfig;
+
+    /** The options used to configure the Python worker process. */
+    protected transient Map<String, String> jobOptions;
+
     /** Max duration of a bundle. */
     private transient long maxBundleTimeMills;
 
@@ -85,22 +93,17 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
 
     private transient ExecutorService flushThreadPool;
 
-    /** The python config. */
-    private PythonConfig config;
-
     public AbstractPythonFunctionOperator(Configuration config) {
-        this.config = new PythonConfig(Preconditions.checkNotNull(config));
+        this.config = Preconditions.checkNotNull(config);
         this.chainingStrategy = ChainingStrategy.ALWAYS;
     }
 
-    public PythonConfig getPythonConfig() {
-        return config;
-    }
-
     @Override
     public void open() throws Exception {
         try {
-            this.maxBundleSize = config.getMaxBundleSize();
+            this.pythonConfig = new PythonConfig(config);
+            this.jobOptions = config.toMap();
+            this.maxBundleSize = pythonConfig.getMaxBundleSize();
             if (this.maxBundleSize <= 0) {
                 this.maxBundleSize = 
PythonOptions.MAX_BUNDLE_SIZE.defaultValue();
                 LOG.error(
@@ -111,7 +114,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
                 LOG.info("The maximum bundle size is configured to {}.", 
this.maxBundleSize);
             }
 
-            this.maxBundleTimeMills = config.getMaxBundleTimeMills();
+            this.maxBundleTimeMills = pythonConfig.getMaxBundleTimeMills();
             if (this.maxBundleTimeMills <= 0L) {
                 this.maxBundleTimeMills = 
PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue();
                 LOG.error(
@@ -125,7 +128,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
             }
 
             this.pythonFunctionRunner = createPythonFunctionRunner();
-            this.pythonFunctionRunner.open(config);
+            this.pythonFunctionRunner.open(pythonConfig);
 
             this.elementCount = 0;
             this.lastFinishBundleTime = 
getProcessingTimeService().getCurrentProcessingTime();
@@ -283,13 +286,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
         return elementCount == 0;
     }
 
-    /** Reset the {@link PythonConfig} if needed. */
-    public void setPythonConfig(PythonConfig pythonConfig) {
-        this.config = pythonConfig;
+    /** Reset the {@link Configuration} if needed. */
+    public void setConfiguration(Configuration config) {
+        this.config = config;
     }
 
-    /** Returns the {@link PythonConfig}. */
-    public PythonConfig getConfig() {
+    /** Returns the {@link Configuration}. */
+    public Configuration getConfiguration() {
         return config;
     }
 
@@ -369,9 +372,10 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
         }
     }
 
-    protected PythonEnvironmentManager createPythonEnvironmentManager() throws 
IOException {
+    protected PythonEnvironmentManager createPythonEnvironmentManager() {
         PythonDependencyInfo dependencyInfo =
-                PythonDependencyInfo.create(config, 
getRuntimeContext().getDistributedCache());
+                PythonDependencyInfo.create(
+                        pythonConfig, 
getRuntimeContext().getDistributedCache());
         PythonEnv pythonEnv = getPythonEnv();
         if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
             return new ProcessPythonEnvironmentManager(
@@ -386,7 +390,7 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
     }
 
     protected FlinkMetricContainer getFlinkMetricContainer() {
-        return this.config.isMetricEnabled()
+        return this.pythonConfig.isMetricEnabled()
                 ? new 
FlinkMetricContainer(getRuntimeContext().getMetricGroup())
                 : null;
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java
index 5bf078d..733fe21 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractTwoInputPythonFunctionOperator.java
@@ -38,8 +38,6 @@ import 
org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCol
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Map;
-
 import static 
org.apache.flink.streaming.api.utils.ProtoUtils.createRawTypeCoderInfoDescriptorProto;
 import static 
org.apache.flink.streaming.api.utils.PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter;
 
@@ -54,9 +52,6 @@ public abstract class 
AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT>
 
     private static final long serialVersionUID = 1L;
 
-    /** The options used to configure the Python worker process. */
-    private final Map<String, String> jobOptions;
-
     /** The left input type. */
     private final TypeInformation<IN1> inputTypeInfo1;
 
@@ -89,7 +84,6 @@ public abstract class 
AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT>
             TypeInformation<IN2> inputTypeInfo2,
             TypeInformation<OUT> outputTypeInfo) {
         super(config, pythonFunctionInfo, outputTypeInfo);
-        this.jobOptions = config.toMap();
         this.inputTypeInfo1 = Preconditions.checkNotNull(inputTypeInfo1);
         this.inputTypeInfo2 = Preconditions.checkNotNull(inputTypeInfo2);
     }
@@ -152,10 +146,6 @@ public abstract class 
AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT>
     // Getters
     // ----------------------------------------------------------------------
 
-    protected Map<String, String> getJobOptions() {
-        return jobOptions;
-    }
-
     protected TypeInformation<IN1> getLeftInputType() {
         return inputTypeInfo1;
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
index 5508f6a..8208600 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.java
@@ -74,7 +74,7 @@ public class PythonCoProcessOperator<IN1, IN2, OUT>
                         getRuntimeContext(),
                         getInternalParameters(),
                         inBatchExecutionMode(getKeyedStateBackend())),
-                getJobOptions(),
+                jobOptions,
                 getFlinkMetricContainer(),
                 null,
                 null,
@@ -117,7 +117,7 @@ public class PythonCoProcessOperator<IN1, IN2, OUT>
     public <T> AbstractDataStreamPythonFunctionOperator<T> copy(
             DataStreamPythonFunctionInfo pythonFunctionInfo, 
TypeInformation<T> outputTypeInfo) {
         return new PythonCoProcessOperator<>(
-                getConfig().getConfig(),
+                config,
                 pythonFunctionInfo,
                 getLeftInputType(),
                 getRightInputType(),
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index 9038a28..ba1c94c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -113,7 +113,7 @@ public class PythonKeyedCoProcessOperator<OUT>
                         getInternalParameters(),
                         keyTypeInfo,
                         inBatchExecutionMode(getKeyedStateBackend())),
-                getJobOptions(),
+                jobOptions,
                 getFlinkMetricContainer(),
                 getKeyedStateBackend(),
                 keyTypeSerializer,
@@ -219,7 +219,7 @@ public class PythonKeyedCoProcessOperator<OUT>
     public <T> AbstractDataStreamPythonFunctionOperator<T> copy(
             DataStreamPythonFunctionInfo pythonFunctionInfo, 
TypeInformation<T> outputTypeInfo) {
         return new PythonKeyedCoProcessOperator<>(
-                getConfig().getConfig(),
+                config,
                 pythonFunctionInfo,
                 getLeftInputType(),
                 getRightInputType(),
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index e3df754..3d81c10 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -143,7 +143,7 @@ public class PythonKeyedProcessOperator<OUT>
                         getInternalParameters(),
                         keyTypeInfo,
                         inBatchExecutionMode(getKeyedStateBackend())),
-                getJobOptions(),
+                jobOptions,
                 getFlinkMetricContainer(),
                 getKeyedStateBackend(),
                 keyTypeSerializer,
@@ -237,7 +237,7 @@ public class PythonKeyedProcessOperator<OUT>
     public <T> AbstractDataStreamPythonFunctionOperator<T> copy(
             DataStreamPythonFunctionInfo pythonFunctionInfo, 
TypeInformation<T> outputTypeInfo) {
         return new PythonKeyedProcessOperator<>(
-                getConfig().getConfig(),
+                config,
                 pythonFunctionInfo,
                 (RowTypeInfo) getInputTypeInfo(),
                 outputTypeInfo,
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
index 451418c..69d4e74 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessOperator.java
@@ -71,7 +71,7 @@ public class PythonProcessOperator<IN, OUT>
                         getRuntimeContext(),
                         getInternalParameters(),
                         inBatchExecutionMode(getKeyedStateBackend())),
-                getJobOptions(),
+                jobOptions,
                 getFlinkMetricContainer(),
                 null,
                 null,
@@ -109,6 +109,6 @@ public class PythonProcessOperator<IN, OUT>
     public <T> AbstractDataStreamPythonFunctionOperator<T> copy(
             DataStreamPythonFunctionInfo pythonFunctionInfo, 
TypeInformation<T> outputTypeInfo) {
         return new PythonProcessOperator<>(
-                getConfig().getConfig(), pythonFunctionInfo, 
getInputTypeInfo(), outputTypeInfo);
+                config, pythonFunctionInfo, getInputTypeInfo(), 
outputTypeInfo);
     }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
index f83f560..32e1339 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
@@ -34,9 +34,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -61,9 +59,6 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
     /** The offsets of user-defined function inputs. */
     protected final int[] userDefinedFunctionInputOffsets;
 
-    /** The options used to configure the Python worker process. */
-    private final Map<String, String> jobOptions;
-
     /** The user-defined function input logical type. */
     protected transient RowType userDefinedFunctionInputType;
 
@@ -97,7 +92,6 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
         this.outputType = Preconditions.checkNotNull(outputType);
         this.userDefinedFunctionInputOffsets =
                 Preconditions.checkNotNull(userDefinedFunctionInputOffsets);
-        this.jobOptions = buildJobOptions(config);
     }
 
     @Override
@@ -173,12 +167,4 @@ public abstract class 
AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
             RowType runnerOutType);
 
     public abstract void processElementInternal(IN value) throws Exception;
-
-    private Map<String, String> buildJobOptions(Configuration config) {
-        Map<String, String> jobOptions = new HashMap<>();
-        if (config.containsKey("table.exec.timezone")) {
-            jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
-        }
-        return jobOptions;
-    }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
index 19deb7d..c4b3638 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.java
@@ -47,8 +47,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.streaming.api.utils.ProtoUtils.createRowTypeCoderInfoDescriptorProto;
@@ -78,9 +76,6 @@ public abstract class AbstractPythonStreamAggregateOperator
     /** The output logical type. */
     protected final RowType outputType;
 
-    /** The options used to configure the Python worker process. */
-    private final Map<String, String> jobOptions;
-
     /** The array of the key indexes. */
     private final int[] grouping;
 
@@ -141,7 +136,6 @@ public abstract class AbstractPythonStreamAggregateOperator
         this.outputType = Preconditions.checkNotNull(outputType);
         this.aggregateFunctions = aggregateFunctions;
         this.dataViewSpecs = dataViewSpecs;
-        this.jobOptions = buildJobOptions(config);
         this.grouping = grouping;
         this.indexOfCountStar = indexOfCountStar;
         this.generateUpdateBefore = generateUpdateBefore;
@@ -164,6 +158,7 @@ public abstract class AbstractPythonStreamAggregateOperator
                 
PythonTypeUtils.toInternalSerializer(userDefinedFunctionOutputType);
         rowDataWrapper = new StreamRecordRowDataWrappingCollector(output);
         super.open();
+        configJobOptions();
     }
 
     @Override
@@ -244,8 +239,8 @@ public abstract class AbstractPythonStreamAggregateOperator
     protected FlinkFnApi.UserDefinedAggregateFunctions 
getUserDefinedFunctionsProto() {
         FlinkFnApi.UserDefinedAggregateFunctions.Builder builder =
                 FlinkFnApi.UserDefinedAggregateFunctions.newBuilder();
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         
builder.addAllGrouping(Arrays.stream(grouping).boxed().collect(Collectors.toList()));
         builder.setGenerateUpdateBefore(generateUpdateBefore);
         builder.setIndexOfCountStar(indexOfCountStar);
@@ -272,18 +267,13 @@ public abstract class 
AbstractPythonStreamAggregateOperator
 
     public abstract RowType createUserDefinedFunctionOutputType();
 
-    private Map<String, String> buildJobOptions(Configuration config) {
-        Map<String, String> jobOptions = new HashMap<>();
-        if (config.containsKey("table.exec.timezone")) {
-            jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
-        }
+    private void configJobOptions() {
         jobOptions.put(
                 PythonOptions.STATE_CACHE_SIZE.key(),
                 String.valueOf(config.get(PythonOptions.STATE_CACHE_SIZE)));
         jobOptions.put(
                 PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(),
                 
String.valueOf(config.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE)));
-        return jobOptions;
     }
 
     public FlinkFnApi.CoderInfoDescriptor 
createInputCoderInfoDescriptor(RowType runnerInputType) {
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
index 2ef2038..f2431e5 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.java
@@ -153,8 +153,8 @@ public abstract class 
AbstractArrowPythonAggregateFunctionOperator
         for (PythonFunctionInfo pythonFunctionInfo : pandasAggFunctions) {
             builder.addUdfs(getUserDefinedFunctionProto(pythonFunctionInfo));
         }
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         return builder.build();
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
index a004baa..3d7f7e0 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.java
@@ -256,8 +256,8 @@ public class 
BatchArrowPythonOverWindowAggregateFunctionOperator
             functionBuilder.setWindowIndex(aggWindowIndex[i]);
             builder.addUdfs(functionBuilder);
         }
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         // add windows
         for (int i = 0; i < lowerBoundary.length; i++) {
             FlinkFnApi.OverWindow.Builder windowBuilder = 
FlinkFnApi.OverWindow.newBuilder();
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
index b286164..606766e 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/AbstractPythonScalarFunctionOperator.java
@@ -119,8 +119,8 @@ public abstract class AbstractPythonScalarFunctionOperator
         for (PythonFunctionInfo pythonFunctionInfo : scalarFunctions) {
             
builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(pythonFunctionInfo));
         }
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         return builder.build();
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
index ebf6800..c42f05d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
@@ -58,7 +58,7 @@ public class ArrowPythonScalarFunctionOperator extends 
AbstractPythonScalarFunct
     @Override
     public void open() throws Exception {
         super.open();
-        maxArrowBatchSize = Math.min(getPythonConfig().getMaxArrowBatchSize(), 
maxBundleSize);
+        maxArrowBatchSize = Math.min(pythonConfig.getMaxArrowBatchSize(), 
maxBundleSize);
         arrowSerializer =
                 new ArrowSerializer(userDefinedFunctionInputType, 
userDefinedFunctionOutputType);
         arrowSerializer.open(bais, baos);
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index 0a51717..38fbcb8 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -156,8 +156,8 @@ public class PythonTableFunctionOperator
         FlinkFnApi.UserDefinedFunctions.Builder builder =
                 FlinkFnApi.UserDefinedFunctions.newBuilder();
         builder.addUdfs(ProtoUtils.getUserDefinedFunctionProto(tableFunction));
-        builder.setMetricEnabled(getPythonConfig().isMetricEnabled());
-        builder.setProfileEnabled(getPythonConfig().isProfileEnabled());
+        builder.setMetricEnabled(pythonConfig.isMetricEnabled());
+        builder.setProfileEnabled(pythonConfig.isProfileEnabled());
         return builder.build();
     }
 

Reply via email to