This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccb167  [FLINK-16650][python] Support LocalZonedTimestampType for 
Python UDF in blink planner (#11439)
5ccb167 is described below

commit 5ccb16724769becab0003e0299d9c4a63cd52378
Author: Dian Fu <dia...@apache.org>
AuthorDate: Fri Mar 20 09:46:36 2020 +0800

    [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in 
blink planner (#11439)
---
 flink-python/pyflink/fn_execution/coder_impl.py    | 12 ++++++++
 flink-python/pyflink/fn_execution/coders.py        | 23 +++++++++++++++
 .../pyflink/fn_execution/sdk_worker_main.py        | 10 ++++++-
 .../fn_execution/tests/test_coders_common.py       | 14 ++++++++-
 flink-python/pyflink/table/tests/test_types.py     | 21 ++++++++++---
 flink-python/pyflink/table/tests/test_udf.py       | 34 ++++++++++++++++++++++
 flink-python/pyflink/table/types.py                |  5 +++-
 flink-python/setup.py                              |  2 +-
 .../flink/python/AbstractPythonFunctionRunner.java | 18 +++++++++++-
 .../python/PythonScalarFunctionFlatMap.java        | 19 +++++++++++-
 .../python/AbstractStatelessFunctionOperator.java  | 22 ++++++++++++--
 .../BaseRowPythonScalarFunctionOperator.java       |  7 +++--
 .../scalar/PythonScalarFunctionOperator.java       |  7 +++--
 .../arrow/ArrowPythonScalarFunctionOperator.java   |  7 +++--
 .../BaseRowArrowPythonScalarFunctionOperator.java  |  7 +++--
 .../table/BaseRowPythonTableFunctionOperator.java  |  7 +++--
 .../python/table/PythonTableFunctionOperator.java  |  7 +++--
 .../AbstractPythonStatelessFunctionRunner.java     |  6 ++--
 .../AbstractGeneralPythonScalarFunctionRunner.java |  7 +++--
 .../scalar/AbstractPythonScalarFunctionRunner.java |  7 +++--
 .../scalar/BaseRowPythonScalarFunctionRunner.java  |  7 +++--
 .../python/scalar/PythonScalarFunctionRunner.java  |  7 +++--
 .../AbstractArrowPythonScalarFunctionRunner.java   |  7 +++--
 .../arrow/ArrowPythonScalarFunctionRunner.java     |  7 +++--
 .../BaseRowArrowPythonScalarFunctionRunner.java    |  7 +++--
 .../table/AbstractPythonTableFunctionRunner.java   |  7 +++--
 .../table/BaseRowPythonTableFunctionRunner.java    |  7 +++--
 .../python/table/PythonTableFunctionRunner.java    |  7 +++--
 .../table/runtime/typeutils/PythonTypeUtils.java   | 20 +++++++++++++
 .../BaseRowPythonScalarFunctionOperatorTest.java   |  7 +++--
 .../scalar/PythonScalarFunctionOperatorTest.java   |  7 +++--
 .../ArrowPythonScalarFunctionOperatorTest.java     |  7 +++--
 ...seRowArrowPythonScalarFunctionOperatorTest.java |  7 +++--
 .../BaseRowPythonTableFunctionOperatorTest.java    |  4 ++-
 .../table/PythonTableFunctionOperatorTest.java     |  4 ++-
 .../BaseRowPythonScalarFunctionRunnerTest.java     |  4 ++-
 .../scalar/PythonScalarFunctionRunnerTest.java     |  4 ++-
 .../arrow/ArrowPythonScalarFunctionRunnerTest.java |  1 +
 .../BaseRowPythonTableFunctionRunnerTest.java      |  4 ++-
 .../table/PythonTableFunctionRunnerTest.java       |  5 ++--
 ...PassThroughArrowPythonScalarFunctionRunner.java |  9 ++++--
 .../PassThroughPythonScalarFunctionRunner.java     |  9 ++++--
 .../plan/nodes/common/CommonPythonBase.scala       |  8 ++++-
 .../nodes/physical/batch/BatchExecPythonCalc.scala |  2 +-
 .../physical/batch/BatchExecPythonCorrelate.scala  |  2 +-
 .../physical/stream/StreamExecPythonCalc.scala     |  2 +-
 .../stream/StreamExecPythonCorrelate.scala         |  2 +-
 .../planner/utils/python/PythonTableUtils.scala    |  8 ++++-
 .../flink/table/plan/nodes/CommonPythonBase.scala  |  9 +++++-
 .../plan/nodes/dataset/DataSetPythonCalc.scala     |  2 +-
 .../nodes/datastream/DataStreamPythonCalc.scala    |  2 +-
 .../datastream/DataStreamPythonCorrelate.scala     |  2 +-
 52 files changed, 352 insertions(+), 77 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/coder_impl.py 
b/flink-python/pyflink/fn_execution/coder_impl.py
index 7b6405a..f453a88 100644
--- a/flink-python/pyflink/fn_execution/coder_impl.py
+++ b/flink-python/pyflink/fn_execution/coder_impl.py
@@ -406,6 +406,18 @@ class TimestampCoderImpl(StreamCoderImpl):
         return 
datetime.datetime.utcfromtimestamp(second).replace(microsecond=microsecond)
 
 
+class LocalZonedTimestampCoderImpl(TimestampCoderImpl):
+
+    def __init__(self, precision, timezone):
+        super(LocalZonedTimestampCoderImpl, self).__init__(precision)
+        self.timezone = timezone
+
+    def internal_to_timestamp(self, milliseconds, nanoseconds):
+        return self.timezone.localize(
+            super(LocalZonedTimestampCoderImpl, self).internal_to_timestamp(
+                milliseconds, nanoseconds))
+
+
 class ArrowCoderImpl(StreamCoderImpl):
 
     def __init__(self, schema):
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index eb47cf7..da4f03b 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -22,13 +22,16 @@ from abc import ABC
 import datetime
 import decimal
 import pyarrow as pa
+import pytz
 from apache_beam.coders import Coder
 from apache_beam.coders.coders import FastCoder, LengthPrefixCoder
+from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.portability import common_urns
 from apache_beam.typehints import typehints
 
 from pyflink.fn_execution import coder_impl
 from pyflink.fn_execution import flink_fn_execution_pb2
+from pyflink.fn_execution.sdk_worker_main import pipeline_options
 from pyflink.table import Row
 
 FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = 
"flink:coder:schema:scalar_function:v1"
@@ -377,6 +380,22 @@ class TimestampCoder(DeterministicCoder):
         return datetime.datetime
 
 
+class LocalZonedTimestampCoder(DeterministicCoder):
+    """
+    Coder for LocalZonedTimestamp.
+    """
+
+    def __init__(self, precision, timezone):
+        self.precision = precision
+        self.timezone = timezone
+
+    def _create_impl(self):
+        return coder_impl.LocalZonedTimestampCoderImpl(self.precision, 
self.timezone)
+
+    def to_type_hint(self):
+        return datetime.datetime
+
+
 class ArrowCoder(DeterministicCoder):
     """
     Coder for Arrow.
@@ -466,6 +485,10 @@ def from_proto(field_type):
         return RowCoder([from_proto(f.type) for f in 
field_type.row_schema.fields])
     if field_type_name == type_name.TIMESTAMP:
         return TimestampCoder(field_type.timestamp_info.precision)
+    if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP:
+        timezone = 
pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment(
+            "table.exec.timezone"))
+        return 
LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, 
timezone)
     elif field_type_name == type_name.ARRAY:
         return ArrayCoder(from_proto(field_type.collection_element_type))
     elif field_type_name == type_name.MAP:
diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py 
b/flink-python/pyflink/fn_execution/sdk_worker_main.py
index 82d091c..7f2c423 100644
--- a/flink-python/pyflink/fn_execution/sdk_worker_main.py
+++ b/flink-python/pyflink/fn_execution/sdk_worker_main.py
@@ -15,10 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+import os
 import sys
 
 # force to register the operations to SDK Harness
+from apache_beam.options.pipeline_options import PipelineOptions
+
 import pyflink.fn_execution.operations # noqa # pylint:  disable=unused-import
 
 # force to register the coders to SDK Harness
@@ -26,5 +28,11 @@ import pyflink.fn_execution.coders # noqa # pylint: 
disable=unused-import
 
 import apache_beam.runners.worker.sdk_worker_main
 
+if 'PIPELINE_OPTIONS' in os.environ:
+    pipeline_options = 
apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options(
+        os.environ['PIPELINE_OPTIONS'])
+else:
+    pipeline_options = PipelineOptions.from_dictionary({})
+
 if __name__ == '__main__':
     apache_beam.runners.worker.sdk_worker_main.main(sys.argv)
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders_common.py 
b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
index 42d1c19..69e9961 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders_common.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
@@ -22,7 +22,8 @@ import unittest
 
 from pyflink.fn_execution.coders import BigIntCoder, TinyIntCoder, 
BooleanCoder, \
     SmallIntCoder, IntCoder, FloatCoder, DoubleCoder, BinaryCoder, CharCoder, 
DateCoder, \
-    TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, 
FlattenRowCoder, RowCoder
+    TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, 
FlattenRowCoder, RowCoder, \
+    LocalZonedTimestampCoder
 
 
 class CodersTest(unittest.TestCase):
@@ -93,6 +94,17 @@ class CodersTest(unittest.TestCase):
         coder = TimestampCoder(6)
         self.check_coder(coder, datetime.datetime(2019, 9, 10, 18, 30, 20, 
123456))
 
+    def test_local_zoned_timestamp_coder(self):
+        import datetime
+        import pytz
+        timezone = pytz.timezone("Asia/Shanghai")
+        coder = LocalZonedTimestampCoder(3, timezone)
+        self.check_coder(coder,
+                         timezone.localize(datetime.datetime(2019, 9, 10, 18, 
30, 20, 123000)))
+        coder = LocalZonedTimestampCoder(6, timezone)
+        self.check_coder(coder,
+                         timezone.localize(datetime.datetime(2019, 9, 10, 18, 
30, 20, 123456)))
+
     def test_array_coder(self):
         element_coder = BigIntCoder()
         coder = ArrayCoder(element_coder)
diff --git a/flink-python/pyflink/table/tests/test_types.py 
b/flink-python/pyflink/table/tests/test_types.py
index 415224d..04a2e13 100644
--- a/flink-python/pyflink/table/tests/test_types.py
+++ b/flink-python/pyflink/table/tests/test_types.py
@@ -33,7 +33,8 @@ from pyflink.table.types import (_infer_schema_from_data, 
_infer_type,
                                  _array_type_mappings, _merge_type,
                                  _create_type_verifier, UserDefinedType, 
DataTypes, Row, RowField,
                                  RowType, ArrayType, BigIntType, VarCharType, 
MapType, DataType,
-                                 _to_java_type, _from_java_type, 
ZonedTimestampType)
+                                 _to_java_type, _from_java_type, 
ZonedTimestampType,
+                                 LocalZonedTimestampType)
 
 
 class ExamplePointUDT(UserDefinedType):
@@ -535,11 +536,23 @@ class TypesTests(unittest.TestCase):
 
     def test_local_zoned_timestamp_type(self):
         lztst = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
-        ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000, 
tzinfo=UTCOffsetTimezone(1))
-        self.assertEqual(-3600000000, lztst.to_sql_type(ts))
+        ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000)
+        self.assertEqual(0, lztst.to_sql_type(ts))
+
+        import pytz
+        # suppose the timezone of the data is +9:00
+        timezone = pytz.timezone("Asia/Tokyo")
+        orig_epoch = LocalZonedTimestampType.EPOCH_ORDINAL
+        try:
+            # suppose the local timezone is +8:00
+            LocalZonedTimestampType.EPOCH_ORDINAL = 28800000000
+            ts_tokyo = timezone.localize(ts)
+            self.assertEqual(-3600000000, lztst.to_sql_type(ts_tokyo))
+        finally:
+            LocalZonedTimestampType.EPOCH_ORDINAL = orig_epoch
 
         if sys.version_info >= (3, 6):
-            ts2 = lztst.from_sql_type(-3600000000)
+            ts2 = lztst.from_sql_type(0)
             self.assertEqual(ts.astimezone(), ts2.astimezone())
 
     def test_zoned_timestamp_type(self):
diff --git a/flink-python/pyflink/table/tests/test_udf.py 
b/flink-python/pyflink/table/tests/test_udf.py
index e12b71a..a599aac 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -15,6 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import datetime
+
+import pytz
+
 from pyflink.table import DataTypes
 from pyflink.table.udf import ScalarFunction, udf
 from pyflink.testing import source_sink_utils
@@ -562,6 +566,36 @@ class 
PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
             self.t_env.register_function(
                 "non-callable-udf", udf(Plus(), DataTypes.BIGINT(), 
DataTypes.BIGINT()))
 
+    def test_data_types_only_supported_in_blink_planner(self):
+        timezone = self.t_env.get_config().get_local_timezone()
+        local_datetime = pytz.timezone(timezone).localize(
+            datetime.datetime(1970, 1, 1, 0, 0, 0, 123000))
+
+        def local_zoned_timestamp_func(local_zoned_timestamp_param):
+            assert local_zoned_timestamp_param == local_datetime, \
+                'local_zoned_timestamp_param is wrong value %s !' % 
local_zoned_timestamp_param
+            return local_zoned_timestamp_param
+
+        self.t_env.register_function(
+            "local_zoned_timestamp_func",
+            udf(local_zoned_timestamp_func,
+                [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)],
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))
+
+        table_sink = source_sink_utils.TestAppendSink(
+            ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
+        self.t_env.register_table_sink("Results", table_sink)
+
+        t = self.t_env.from_elements(
+            [(local_datetime,)],
+            DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))]))
+
+        t.select("local_zoned_timestamp_func(local_zoned_timestamp_func(a))") \
+            .insert_into("Results")
+        self.t_env.execute("test")
+        actual = source_sink_utils.results()
+        self.assert_equals(actual, ["1970-01-01T00:00:00.123Z"])
+
 
 class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
                                                 
PyFlinkBlinkBatchTableTestCase):
diff --git a/flink-python/pyflink/table/types.py 
b/flink-python/pyflink/table/types.py
index d3ab0f2..c193bc3 100644
--- a/flink-python/pyflink/table/types.py
+++ b/flink-python/pyflink/table/types.py
@@ -477,6 +477,8 @@ class LocalZonedTimestampType(AtomicType):
     :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6
+
     def __init__(self, precision=6, nullable=True):
         super(LocalZonedTimestampType, self).__init__(nullable)
         assert 0 <= precision <= 9
@@ -492,10 +494,11 @@ class LocalZonedTimestampType(AtomicType):
         if dt is not None:
             seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
                        else time.mktime(dt.timetuple()))
-            return int(seconds) * 10 ** 6 + dt.microsecond
+            return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL
 
     def from_sql_type(self, ts):
         if ts is not None:
+            ts = ts - self.EPOCH_ORDINAL
             return datetime.datetime.fromtimestamp(ts // 10 ** 
6).replace(microsecond=ts % 10 ** 6)
 
 
diff --git a/flink-python/setup.py b/flink-python/setup.py
index ca4b598..5f50376 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -225,7 +225,7 @@ run sdist.
         python_requires='>=3.5',
         install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 
'apache-beam==2.19.0',
                           'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 
'jsonpickle==1.2',
-                          'pandas>=0.23.4,<=0.25.3', 
'pyarrow>=0.15.1,<=0.16.0'],
+                          'pandas>=0.23.4,<=0.25.3', 
'pyarrow>=0.15.1,<=0.16.0', 'pytz>=2018.3'],
         tests_require=['pytest==4.4.1'],
         description='Apache Flink Python API',
         long_description=long_description,
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java
index 597057e..84e7838 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java
@@ -37,11 +37,14 @@ import 
org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
 
+import java.util.Map;
+
 /**
  * An base class for {@link PythonFunctionRunner}.
  *
@@ -65,6 +68,11 @@ public abstract class AbstractPythonFunctionRunner<IN> 
implements PythonFunction
        private final PythonEnvironmentManager environmentManager;
 
        /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       /**
         * The bundle factory which has all job-scoped information and can be 
used to create a {@link StageBundleFactory}.
         */
        private transient JobBundleFactory jobBundleFactory;
@@ -112,11 +120,13 @@ public abstract class AbstractPythonFunctionRunner<IN> 
implements PythonFunction
                String taskName,
                FnDataReceiver<byte[]> resultReceiver,
                PythonEnvironmentManager environmentManager,
-               StateRequestHandler stateRequestHandler) {
+               StateRequestHandler stateRequestHandler,
+               Map<String, String> jobOptions) {
                this.taskName = Preconditions.checkNotNull(taskName);
                this.resultReceiver = 
Preconditions.checkNotNull(resultReceiver);
                this.environmentManager = 
Preconditions.checkNotNull(environmentManager);
                this.stateRequestHandler = 
Preconditions.checkNotNull(stateRequestHandler);
+               this.jobOptions = Preconditions.checkNotNull(jobOptions);
        }
 
        @Override
@@ -131,6 +141,12 @@ public abstract class AbstractPythonFunctionRunner<IN> 
implements PythonFunction
                        
PipelineOptionsFactory.as(PortablePipelineOptions.class);
                // one operator has one Python SDK harness
                portableOptions.setSdkWorkerParallelism(1);
+               ExperimentalOptions experimentalOptions = 
portableOptions.as(ExperimentalOptions.class);
+               for (Map.Entry<String, String> entry : jobOptions.entrySet()) {
+                       ExperimentalOptions.addExperiment(experimentalOptions,
+                               String.join("=", entry.getKey(), 
entry.getValue()));
+               }
+
                Struct pipelineOptions = 
PipelineOptionsTranslation.toProto(portableOptions);
 
                jobBundleFactory = createJobBundleFactory(pipelineOptions);
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
index 9785509..10cee8d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -141,6 +143,11 @@ public final class PythonScalarFunctionFlatMap
        private final PythonConfig config;
 
        /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       /**
         * Use an AtomicBoolean because we start/stop bundles by a timer thread.
         */
        private transient AtomicBoolean bundleStarted;
@@ -178,6 +185,7 @@ public final class PythonScalarFunctionFlatMap
                this.udfInputOffsets = 
Preconditions.checkNotNull(udfInputOffsets);
                this.forwardedFields = 
Preconditions.checkNotNull(forwardedFields);
                this.config = new 
PythonConfig(Preconditions.checkNotNull(config));
+               this.jobOptions = buildJobOptions(config);
        }
 
        @Override
@@ -285,7 +293,8 @@ public final class PythonScalarFunctionFlatMap
                        scalarFunctions,
                        createPythonEnvironmentManager(),
                        udfInputType,
-                       udfOutputType);
+                       udfOutputType,
+                       jobOptions);
        }
 
        private PythonEnvironmentManager createPythonEnvironmentManager() 
throws IOException {
@@ -321,6 +330,14 @@ public final class PythonScalarFunctionFlatMap
                }
        }
 
+       private Map<String, String> buildJobOptions(Configuration config) {
+               Map<String, String> jobOptions = new HashMap<>();
+               if (config.containsKey("table.exec.timezone")) {
+                       jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
+               }
+               return jobOptions;
+       }
+
        @Override
        public TypeInformation<Row> getProducedType() {
                return (TypeInformation<Row>) LegacyTypeInfoDataTypeConverter
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
index 3670ac7..248683c 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java
@@ -37,6 +37,8 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
@@ -69,6 +71,11 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
        protected final int[] userDefinedFunctionInputOffsets;
 
        /**
+        * The options used to configure the Python worker process.
+        */
+       private final Map<String, String> jobOptions;
+
+       /**
         * The user-defined function input logical type.
         */
        protected transient RowType userDefinedFunctionInputType;
@@ -108,6 +115,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
                this.inputType = Preconditions.checkNotNull(inputType);
                this.outputType = Preconditions.checkNotNull(outputType);
                this.userDefinedFunctionInputOffsets = 
Preconditions.checkNotNull(userDefinedFunctionInputOffsets);
+               this.jobOptions = buildJobOptions(config);
        }
 
        @Override
@@ -140,7 +148,8 @@ public abstract class AbstractStatelessFunctionOperator<IN, 
OUT, UDFIN>
                return new ProjectUdfInputPythonScalarFunctionRunner(
                        createPythonFunctionRunner(
                                userDefinedFunctionResultReceiver,
-                               createPythonEnvironmentManager()));
+                               createPythonEnvironmentManager(),
+                               jobOptions));
        }
 
        /**
@@ -153,7 +162,16 @@ public abstract class 
AbstractStatelessFunctionOperator<IN, OUT, UDFIN>
 
        public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner(
                FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager);
+               PythonEnvironmentManager pythonEnvironmentManager,
+               Map<String, String> jobOptions);
+
+       private Map<String, String> buildJobOptions(Configuration config) {
+               Map<String, String> jobOptions = new HashMap<>();
+               if (config.containsKey("table.exec.timezone")) {
+                       jobOptions.put("table.exec.timezone", 
config.getString("table.exec.timezone", null));
+               }
+               return jobOptions;
+       }
 
        private class ProjectUdfInputPythonScalarFunctionRunner implements 
PythonFunctionRunner<IN> {
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java
index 6f9e922..c845090 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * The Python {@link ScalarFunction} operator for the blink planner.
@@ -80,13 +81,15 @@ public class BaseRowPythonScalarFunctionOperator extends 
AbstractBaseRowPythonSc
        @Override
        public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                        FnDataReceiver<byte[]> resultReceiver,
-                       PythonEnvironmentManager pythonEnvironmentManager) {
+                       PythonEnvironmentManager pythonEnvironmentManager,
+                       Map<String, String> jobOptions) {
                return new BaseRowPythonScalarFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
                        scalarFunctions,
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
-                       userDefinedFunctionOutputType);
+                       userDefinedFunctionOutputType,
+                       jobOptions);
        }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
index 672882e..8fa27bf 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
@@ -34,6 +34,7 @@ import org.apache.flink.types.Row;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * The Python {@link ScalarFunction} operator for the legacy planner.
@@ -81,13 +82,15 @@ public class PythonScalarFunctionOperator extends 
AbstractRowPythonScalarFunctio
        @Override
        public PythonFunctionRunner<Row> createPythonFunctionRunner(
                        FnDataReceiver<byte[]> resultReceiver,
-                       PythonEnvironmentManager pythonEnvironmentManager) {
+                       PythonEnvironmentManager pythonEnvironmentManager,
+                       Map<String, String> jobOptions) {
                return new PythonScalarFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
                        scalarFunctions,
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
-                       userDefinedFunctionOutputType);
+                       userDefinedFunctionOutputType,
+                       jobOptions);
        }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
index 3554106..4a686e8 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
@@ -38,6 +38,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Arrow Python {@link ScalarFunction} operator for the old planner.
@@ -93,7 +94,8 @@ public class ArrowPythonScalarFunctionOperator extends 
AbstractRowPythonScalarFu
        @Override
        public PythonFunctionRunner<Row> createPythonFunctionRunner(
                FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager) {
+               PythonEnvironmentManager pythonEnvironmentManager,
+               Map<String, String> jobOptions) {
                return new ArrowPythonScalarFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
@@ -101,7 +103,8 @@ public class ArrowPythonScalarFunctionOperator extends 
AbstractRowPythonScalarFu
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
                        userDefinedFunctionOutputType,
-                       getPythonConfig().getMaxArrowBatchSize());
+                       getPythonConfig().getMaxArrowBatchSize(),
+                       jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java
index aa46991ba..6a20fe6 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java
@@ -37,6 +37,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Arrow Python {@link ScalarFunction} operator for the blink planner.
@@ -92,7 +93,8 @@ public class BaseRowArrowPythonScalarFunctionOperator extends 
AbstractBaseRowPyt
        @Override
        public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager) {
+               PythonEnvironmentManager pythonEnvironmentManager,
+               Map<String, String> jobOptions) {
                return new BaseRowArrowPythonScalarFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
@@ -100,7 +102,8 @@ public class BaseRowArrowPythonScalarFunctionOperator 
extends AbstractBaseRowPyt
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
                        userDefinedFunctionOutputType,
-                       getPythonConfig().getMaxArrowBatchSize());
+                       getPythonConfig().getMaxArrowBatchSize(),
+                       jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java
index d138df3..f7dd371 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.calcite.rel.core.JoinRelType;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * The Python {@link TableFunction} operator for the blink planner.
@@ -117,14 +118,16 @@ public class BaseRowPythonTableFunctionOperator
        @Override
        public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager) {
+               PythonEnvironmentManager pythonEnvironmentManager,
+               Map<String, String> jobOptions) {
                return new BaseRowPythonTableFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
                        tableFunction,
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
-                       userDefinedFunctionOutputType);
+                       userDefinedFunctionOutputType,
+                       jobOptions);
        }
 
        private Projection<BaseRow, BinaryRow> createUdtfInputProjection() {
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
index a4c5259..418d271 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.calcite.rel.core.JoinRelType;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * The Python {@link TableFunction} operator for the legacy planner.
@@ -131,13 +132,15 @@ public class PythonTableFunctionOperator extends 
AbstractPythonTableFunctionOper
        @Override
        public PythonFunctionRunner<Row> createPythonFunctionRunner(
                FnDataReceiver<byte[]> resultReceiver,
-               PythonEnvironmentManager pythonEnvironmentManager) {
+               PythonEnvironmentManager pythonEnvironmentManager,
+               Map<String, String> jobOptions) {
                return new PythonTableFunctionRunner(
                        getRuntimeContext().getTaskName(),
                        resultReceiver,
                        tableFunction,
                        pythonEnvironmentManager,
                        userDefinedFunctionInputType,
-                       userDefinedFunctionOutputType);
+                       userDefinedFunctionOutputType,
+                       jobOptions);
        }
 }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java
index c13d3af..5155bf6 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java
@@ -51,6 +51,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
 
@@ -86,8 +87,9 @@ public abstract class 
AbstractPythonStatelessFunctionRunner<IN> extends Abstract
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
-               String functionUrn) {
-               super(taskName, resultReceiver, environmentManager, 
StateRequestHandler.unsupported());
+               String functionUrn,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, environmentManager, 
StateRequestHandler.unsupported(), jobOptions);
                this.functionUrn = functionUrn;
                this.inputType = Preconditions.checkNotNull(inputType);
                this.outputType = Preconditions.checkNotNull(outputType);
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java
index 67b16ae..eededf3 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java
@@ -30,6 +30,8 @@ import 
org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import java.util.Map;
+
 /**
  * Abstract {@link PythonFunctionRunner} used to execute Python {@link 
ScalarFunction}s.
  *
@@ -51,8 +53,9 @@ public abstract class 
AbstractGeneralPythonScalarFunctionRunner<IN> extends Abst
                PythonFunctionInfo[] scalarFunctions,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java
index c5991cd..9e0cdf8 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * Abstract {@link PythonFunctionRunner} used to execute Python {@link 
ScalarFunction}s.
  *
@@ -49,8 +51,9 @@ public abstract class AbstractPythonScalarFunctionRunner<IN> 
extends AbstractPyt
                PythonFunctionInfo[] scalarFunctions,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, environmentManager, inputType, 
outputType, SCALAR_FUNCTION_URN);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, environmentManager, inputType, 
outputType, SCALAR_FUNCTION_URN, jobOptions);
                this.scalarFunctions = 
Preconditions.checkNotNull(scalarFunctions);
        }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java
index 758ac64..4a2ee55 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Python {@link 
ScalarFunction}s.
  * It takes {@link BaseRow} as the input and outputs a byte array.
@@ -43,8 +45,9 @@ public class BaseRowPythonScalarFunctionRunner extends 
AbstractGeneralPythonScal
                PythonFunctionInfo[] scalarFunctions,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java
index ca7d80a..8e48b46 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.Row;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Python {@link 
ScalarFunction}s.
  * It takes {@link Row} as the input and outputs a byte array.
@@ -43,8 +45,9 @@ public class PythonScalarFunctionRunner extends 
AbstractGeneralPythonScalarFunct
                PythonFunctionInfo[] scalarFunctions,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java
index d218c8f..dda5395 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java
@@ -36,6 +36,8 @@ import 
org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import java.util.Map;
+
 /**
  * Abstract {@link PythonFunctionRunner} used to execute Arrow Python {@link 
ScalarFunction}s.
  *
@@ -97,8 +99,9 @@ public abstract class 
AbstractArrowPythonScalarFunctionRunner<IN> extends Abstra
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
-               int maxArrowBatchSize) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType);
+               int maxArrowBatchSize,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions);
                this.maxArrowBatchSize = maxArrowBatchSize;
        }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java
index ee86327..df3d291 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.Row;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Arrow Python {@link 
ScalarFunction}s.
  * It takes {@link Row} as the input type.
@@ -44,8 +46,9 @@ public class ArrowPythonScalarFunctionRunner extends 
AbstractArrowPythonScalarFu
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
-               int maxArrowBatchSize) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize);
+               int maxArrowBatchSize,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java
index f15dec0..60e9609 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Arrow Python {@link 
ScalarFunction}s.
  * It takes {@link BaseRow} as the input type.
@@ -44,8 +46,9 @@ public class BaseRowArrowPythonScalarFunctionRunner extends 
AbstractArrowPythonS
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
-               int maxBatchSize) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxBatchSize);
+               int maxBatchSize,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxBatchSize, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java
index 0aa3265..962f556 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java
@@ -34,6 +34,8 @@ import 
org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import java.util.Map;
+
 /**
  * Abstract {@link PythonFunctionRunner} used to execute Python {@link 
TableFunction}.
  *
@@ -58,8 +60,9 @@ public abstract class AbstractPythonTableFunctionRunner<IN> 
extends AbstractPyth
                PythonFunctionInfo tableFunction,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, environmentManager, inputType, 
outputType, TABLE_FUNCTION_URN);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, environmentManager, inputType, 
outputType, TABLE_FUNCTION_URN, jobOptions);
                this.tableFunction = Preconditions.checkNotNull(tableFunction);
        }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java
index 866d182..3ff6ca4 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Python {@link TableFunction}.
  * It takes {@link BaseRow} as the input and outputs a byte array.
@@ -42,8 +44,9 @@ public class BaseRowPythonTableFunctionRunner extends 
AbstractPythonTableFunctio
                PythonFunctionInfo tableFunction,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java
index c691ca3..48fa400 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java
@@ -31,6 +31,8 @@ import org.apache.flink.types.Row;
 
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
+import java.util.Map;
+
 /**
  * A {@link PythonFunctionRunner} used to execute Python {@link TableFunction}.
  * It takes {@link Row} as the input and outputs a byte array.
@@ -44,8 +46,9 @@ public class PythonTableFunctionRunner extends 
AbstractPythonTableFunctionRunner
                PythonFunctionInfo tableFunction,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType);
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType, jobOptions);
        }
 
        @Override
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index 97d6cf6..60f79a0 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
@@ -268,6 +269,11 @@ public final class PythonTypeUtils {
                        return new 
SqlTimestampSerializer(timestampType.getPrecision());
                }
 
+               @Override
+               public TypeSerializer visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                       return new 
SqlTimestampSerializer(localZonedTimestampType.getPrecision());
+               }
+
                public TypeSerializer visit(ArrayType arrayType) {
                        LogicalType elementType = arrayType.getElementType();
                        TypeSerializer elementTypeSerializer = 
elementType.accept(this);
@@ -417,6 +423,20 @@ public final class PythonTypeUtils {
                }
 
                @Override
+               public FlinkFnApi.Schema.FieldType 
visit(LocalZonedTimestampType localZonedTimestampType) {
+                       FlinkFnApi.Schema.FieldType.Builder builder =
+                               FlinkFnApi.Schema.FieldType.newBuilder()
+                                       
.setTypeName(FlinkFnApi.Schema.TypeName.LOCAL_ZONED_TIMESTAMP)
+                                       
.setNullable(localZonedTimestampType.isNullable());
+
+                       FlinkFnApi.Schema.LocalZonedTimestampInfo.Builder 
dateTimeBuilder =
+                               
FlinkFnApi.Schema.LocalZonedTimestampInfo.newBuilder()
+                                       
.setPrecision(localZonedTimestampType.getPrecision());
+                       
builder.setLocalZonedTimestampInfo(dateTimeBuilder.build());
+                       return builder.build();
+               }
+
+               @Override
                public FlinkFnApi.Schema.FieldType visit(DecimalType 
decimalType) {
                        FlinkFnApi.Schema.FieldType.Builder builder =
                                FlinkFnApi.Schema.FieldType.newBuilder()
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java
index 0c5e10e..ef4e618 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.util.Collection;
+import java.util.Map;
 
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow;
 
@@ -115,14 +116,16 @@ public class BaseRowPythonScalarFunctionOperatorTest
                @Override
                public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                                FnDataReceiver<byte[]> resultReceiver,
-                               PythonEnvironmentManager 
pythonEnvironmentManager) {
+                               PythonEnvironmentManager 
pythonEnvironmentManager,
+                               Map<String, String> jobOptions) {
                        return new 
PassThroughPythonScalarFunctionRunner<BaseRow>(
                                getRuntimeContext().getTaskName(),
                                resultReceiver,
                                scalarFunctions,
                                pythonEnvironmentManager,
                                userDefinedFunctionInputType,
-                               userDefinedFunctionOutputType) {
+                               userDefinedFunctionOutputType,
+                               jobOptions) {
                                @Override
                                public TypeSerializer<BaseRow> 
getInputTypeSerializer() {
                                        return (BaseRowSerializer) 
PythonTypeUtils.toBlinkTypeSerializer(getInputType());
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
index c910e45..459feb2 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.types.Row;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -91,14 +92,16 @@ public class PythonScalarFunctionOperatorTest extends 
PythonScalarFunctionOperat
                @Override
                public PythonFunctionRunner<Row> createPythonFunctionRunner(
                                FnDataReceiver<byte[]> resultReceiver,
-                               PythonEnvironmentManager 
pythonEnvironmentManager) {
+                               PythonEnvironmentManager 
pythonEnvironmentManager,
+                               Map<String, String> jobOptions) {
                        return new PassThroughPythonScalarFunctionRunner<Row>(
                                getRuntimeContext().getTaskName(),
                                resultReceiver,
                                scalarFunctions,
                                pythonEnvironmentManager,
                                userDefinedFunctionInputType,
-                               userDefinedFunctionOutputType) {
+                               userDefinedFunctionOutputType,
+                               jobOptions) {
                                @Override
                                public TypeSerializer<Row> 
getInputTypeSerializer() {
                                        return (RowSerializer) 
PythonTypeUtils.toFlinkTypeSerializer(getInputType());
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
index 93d0f2c..affca78 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.types.Row;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -89,7 +90,8 @@ public class ArrowPythonScalarFunctionOperatorTest extends 
PythonScalarFunctionO
                @Override
                public PythonFunctionRunner<Row> createPythonFunctionRunner(
                                FnDataReceiver<byte[]> resultReceiver,
-                               PythonEnvironmentManager 
pythonEnvironmentManager) {
+                               PythonEnvironmentManager 
pythonEnvironmentManager,
+                               Map<String, String> jobOptions) {
                        return new 
PassThroughArrowPythonScalarFunctionRunner<Row>(
                                getRuntimeContext().getTaskName(),
                                resultReceiver,
@@ -97,7 +99,8 @@ public class ArrowPythonScalarFunctionOperatorTest extends 
PythonScalarFunctionO
                                pythonEnvironmentManager,
                                userDefinedFunctionInputType,
                                userDefinedFunctionOutputType,
-                               getPythonConfig().getMaxArrowBatchSize()) {
+                               getPythonConfig().getMaxArrowBatchSize(),
+                               jobOptions) {
                                @Override
                                public ArrowWriter<Row> createArrowWriter() {
                                        return 
ArrowUtils.createRowArrowWriter(root);
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java
index 43138f3..f32bf32 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 
 import java.util.Collection;
+import java.util.Map;
 
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow;
 
@@ -111,7 +112,8 @@ public class BaseRowArrowPythonScalarFunctionOperatorTest
                @Override
                public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                        FnDataReceiver<byte[]> resultReceiver,
-                       PythonEnvironmentManager pythonEnvironmentManager) {
+                       PythonEnvironmentManager pythonEnvironmentManager,
+                       Map<String, String> jobOptions) {
                        return new 
PassThroughArrowPythonScalarFunctionRunner<BaseRow>(
                                getRuntimeContext().getTaskName(),
                                resultReceiver,
@@ -119,7 +121,8 @@ public class BaseRowArrowPythonScalarFunctionOperatorTest
                                pythonEnvironmentManager,
                                userDefinedFunctionInputType,
                                userDefinedFunctionOutputType,
-                               getPythonConfig().getMaxArrowBatchSize()) {
+                               getPythonConfig().getMaxArrowBatchSize(),
+                               jobOptions) {
                                @Override
                                public ArrowWriter<BaseRow> createArrowWriter() 
{
                                        return 
ArrowUtils.createBaseRowArrowWriter(root);
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java
index f72bd9e..f94bb0c 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.calcite.rel.core.JoinRelType;
 
 import java.util.Collection;
+import java.util.Map;
 
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow;
@@ -94,7 +95,8 @@ public class BaseRowPythonTableFunctionOperatorTest
                @Override
                public PythonFunctionRunner<BaseRow> createPythonFunctionRunner(
                        FnDataReceiver<byte[]> resultReceiver,
-                       PythonEnvironmentManager pythonEnvironmentManager) {
+                       PythonEnvironmentManager pythonEnvironmentManager,
+                       Map<String, String> jobOptions) {
                        return new 
PassThroughPythonTableFunctionRunner<BaseRow>(resultReceiver) {
                                @Override
                                public BaseRow copy(BaseRow element) {
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
index 4d4ddfe..409675b 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.calcite.rel.core.JoinRelType;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -77,7 +78,8 @@ public class PythonTableFunctionOperatorTest extends 
PythonTableFunctionOperator
                @Override
                public PythonFunctionRunner<Row> createPythonFunctionRunner(
                        FnDataReceiver<byte[]> resultReceiver,
-                       PythonEnvironmentManager pythonEnvironmentManager) {
+                       PythonEnvironmentManager pythonEnvironmentManager,
+                       Map<String, String> jobOptions) {
                        return new 
PassThroughPythonTableFunctionRunner<Row>(resultReceiver) {
                                @Override
                                public Row copy(Row element) {
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java
index 85a680b..7f42b52 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -88,6 +89,7 @@ public class BaseRowPythonScalarFunctionRunnerTest extends 
AbstractPythonScalarF
                        pythonFunctionInfos,
                        environmentManager,
                        inputType,
-                       outputType);
+                       outputType,
+                       Collections.emptyMap());
        }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java
index 7757b3f..66f1eed 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java
@@ -210,7 +210,8 @@ public class PythonScalarFunctionRunnerTest extends 
AbstractPythonScalarFunction
                        pythonFunctionInfos,
                        environmentManager,
                        inputType,
-                       outputType);
+                       outputType,
+                       Collections.emptyMap());
        }
 
        private AbstractGeneralPythonScalarFunctionRunner<Row> createUDFRunner(
@@ -236,6 +237,7 @@ public class PythonScalarFunctionRunnerTest extends 
AbstractPythonScalarFunction
                        environmentManager,
                        rowType,
                        rowType,
+                       Collections.emptyMap(),
                        jobBundleFactory) {
                        @Override
                        public TypeSerializer<Row> getInputTypeSerializer() {
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java
index 8843d82..208b5bf 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java
@@ -173,6 +173,7 @@ public class ArrowPythonScalarFunctionRunnerTest extends 
AbstractPythonScalarFun
                        inputType,
                        outputType,
                        maxArrowBatchSize,
+                       Collections.emptyMap(),
                        jobBundleFactory) {
                        @Override
                        public ArrowWriter<Row> createArrowWriter() {
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java
index a93511f..23c6533 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -73,6 +74,7 @@ public class BaseRowPythonTableFunctionRunnerTest extends 
AbstractPythonTableFun
                        pythonFunctionInfo,
                        environmentManager,
                        inputType,
-                       outputType);
+                       outputType,
+                       Collections.emptyMap());
        }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java
index 2c8383c..5c396af 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java
@@ -96,7 +96,8 @@ public class PythonTableFunctionRunnerTest extends 
AbstractPythonTableFunctionRu
                        pythonFunctionInfo,
                        environmentManager,
                        inputType,
-                       outputType);
+                       outputType,
+                       Collections.emptyMap());
        }
 
        private AbstractPythonTableFunctionRunner<Row> createUDTFRunner(
@@ -135,7 +136,7 @@ public class PythonTableFunctionRunnerTest extends 
AbstractPythonTableFunctionRu
                        RowType inputType,
                        RowType outputType,
                        JobBundleFactory jobBundleFactory) {
-                       super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType);
+                       super(taskName, resultReceiver, tableFunction, 
environmentManager, inputType, outputType, Collections.emptyMap());
                        this.jobBundleFactory = jobBundleFactory;
                }
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java
index 09e4809..ed66123 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.flink.table.runtime.utils.PythonTestUtils.createMockJobBundleFactory;
 
@@ -49,8 +50,9 @@ public abstract class 
PassThroughArrowPythonScalarFunctionRunner<IN> extends Abs
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
-               int maxArrowBatchSize) {
-               this(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize, 
createMockJobBundleFactory());
+               int maxArrowBatchSize,
+               Map<String, String> jobOptions) {
+               this(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions, 
createMockJobBundleFactory());
        }
 
        public PassThroughArrowPythonScalarFunctionRunner(
@@ -61,8 +63,9 @@ public abstract class 
PassThroughArrowPythonScalarFunctionRunner<IN> extends Abs
                RowType inputType,
                RowType outputType,
                int maxArrowBatchSize,
+               Map<String, String> jobOptions,
                JobBundleFactory jobBundleFactory) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize);
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions);
                this.jobBundleFactory = jobBundleFactory;
                this.bufferedInputs = new ArrayList<>();
        }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
index 08dd276..492eef6 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.flink.table.runtime.utils.PythonTestUtils.createMockJobBundleFactory;
 
@@ -48,8 +49,9 @@ public abstract class 
PassThroughPythonScalarFunctionRunner<IN> extends Abstract
                PythonFunctionInfo[] scalarFunctions,
                PythonEnvironmentManager environmentManager,
                RowType inputType,
-               RowType outputType) {
-               this(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, createMockJobBundleFactory());
+               RowType outputType,
+               Map<String, String> jobOptions) {
+               this(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions, 
createMockJobBundleFactory());
        }
 
        public PassThroughPythonScalarFunctionRunner(
@@ -59,8 +61,9 @@ public abstract class 
PassThroughPythonScalarFunctionRunner<IN> extends Abstract
                PythonEnvironmentManager environmentManager,
                RowType inputType,
                RowType outputType,
+               Map<String, String> jobOptions,
                JobBundleFactory jobBundleFactory) {
-               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType);
+               super(taskName, resultReceiver, scalarFunctions, 
environmentManager, inputType, outputType, jobOptions);
                this.jobBundleFactory = jobBundleFactory;
                this.bufferedInputs = new ArrayList<>();
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
index 128cf0c..5a6efa2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala
@@ -20,7 +20,8 @@ package org.apache.flink.table.planner.plan.nodes.common
 
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.table.api.TableException
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionInfo}
 import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
@@ -84,4 +85,9 @@ trait CommonPythonBase {
     }
   }
 
+  protected def getConfig(tableConfig: TableConfig): Configuration = {
+    val config = new Configuration(tableConfig.getConfiguration)
+    config.setString("table.exec.timezone", tableConfig.getLocalTimeZone.getId)
+    config
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
index b9eb6a8..f850663 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
@@ -58,7 +58,7 @@ class BatchExecPythonCalc(
       inputTransform,
       calcProgram,
       "BatchExecPythonCalc",
-      planner.getTableConfig.getConfiguration)
+      getConfig(planner.getTableConfig))
 
     ExecNode.setManagedMemoryWeight(
       ret, getPythonWorkerMemory(planner.getTableConfig.getConfiguration))
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
index b341e35..cd52714 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
@@ -76,7 +76,7 @@ class BatchExecPythonCorrelate(
       scan,
       "BatchExecPythonCorrelate",
       outputRowType,
-      planner.getTableConfig.getConfiguration,
+      getConfig(planner.getTableConfig),
       joinType)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
index 45bf644..5b8d8f2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
@@ -57,7 +57,7 @@ class StreamExecPythonCalc(
       inputTransform,
       calcProgram,
       "StreamExecPythonCalc",
-      planner.getTableConfig.getConfiguration)
+      getConfig(planner.getTableConfig))
 
     if (inputsContainSingleton()) {
       ret.setParallelism(1)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
index 4ece477..214268d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
@@ -81,7 +81,7 @@ class StreamExecPythonCorrelate(
       scan,
       "StreamExecPythonCorrelate",
       outputRowType,
-      planner.getTableConfig.getConfiguration,
+      getConfig(planner.getTableConfig),
       joinType)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
index b1c16d9..56e4e21 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.utils.python
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime}
+import java.time.{Instant, LocalDate, LocalDateTime, LocalTime}
 import java.util.TimeZone
 import java.util.function.BiConsumer
 
@@ -124,6 +124,12 @@ object PythonTableUtils {
       case c: Int => new Timestamp(c.toLong / 1000)
     }
 
+    case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT =>
+      (obj: Any) => nullSafeConvert(obj) {
+        case c: Long => Instant.ofEpochMilli(c / 1000)
+        case c: Int => Instant.ofEpochMilli(c.toLong / 1000)
+    }
+
     case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => 
nullSafeConvert(obj) {
       case c: Long => c / 1000
       case c: Int => c.toLong / 1000
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
index 058b074..3ca4e41 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala
@@ -19,7 +19,8 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.table.api.TableException
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionInfo}
 import org.apache.flink.table.functions.utils.{ScalarSqlFunction, 
TableSqlFunction}
@@ -81,4 +82,10 @@ trait CommonPythonBase {
         createPythonFunctionInfo(pythonRexCall, inputNodes, 
tfc.getTableFunction)
     }
   }
+
+  protected def getConfig(tableConfig: TableConfig): Configuration = {
+    val config = new Configuration(tableConfig.getConfiguration)
+    config.setString("table.exec.timezone", tableConfig.getLocalTimeZone.getId)
+    config
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala
index e75ed80..420faac 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala
@@ -81,7 +81,7 @@ class DataSetPythonCalc(
     val flatMapFunctionOutputRowType = 
TypeConversions.fromLegacyInfoToDataType(
       flatMapFunctionResultTypeInfo).getLogicalType.asInstanceOf[RowType]
     val flatMapFunction = getPythonScalarFunctionFlatMap(
-      tableEnv.getConfig.getConfiguration,
+      getConfig(tableEnv.getConfig),
       flatMapFunctionInputRowType,
       flatMapFunctionOutputRowType,
       calcProgram)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala
index 37c0eef..a405169 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala
@@ -84,7 +84,7 @@ class DataStreamPythonCalc(
     val pythonOperatorOutputRowType = TypeConversions.fromLegacyInfoToDataType(
       pythonOperatorResultTypeInfo).getLogicalType.asInstanceOf[RowType]
     val pythonOperator = getPythonScalarFunctionOperator(
-      planner.getConfig.getConfiguration,
+      getConfig(planner.getConfig),
       pythonOperatorInputRowType,
       pythonOperatorOutputRowType,
       calcProgram)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala
index 1e46c35..7703d85 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala
@@ -95,7 +95,7 @@ class DataStreamPythonCorrelate(
     val sqlFunction = 
pythonTableFuncRexCall.getOperator.asInstanceOf[TableSqlFunction]
 
     val pythonOperator = getPythonTableFunctionOperator(
-      planner.getConfig.getConfiguration,
+      getConfig(planner.getConfig),
       pythonOperatorInputRowType,
       pythonOperatorOutputRowType,
       pythonFunctionInfo,

Reply via email to