This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5ccb167 [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner (#11439) 5ccb167 is described below commit 5ccb16724769becab0003e0299d9c4a63cd52378 Author: Dian Fu <dia...@apache.org> AuthorDate: Fri Mar 20 09:46:36 2020 +0800 [FLINK-16650][python] Support LocalZonedTimestampType for Python UDF in blink planner (#11439) --- flink-python/pyflink/fn_execution/coder_impl.py | 12 ++++++++ flink-python/pyflink/fn_execution/coders.py | 23 +++++++++++++++ .../pyflink/fn_execution/sdk_worker_main.py | 10 ++++++- .../fn_execution/tests/test_coders_common.py | 14 ++++++++- flink-python/pyflink/table/tests/test_types.py | 21 ++++++++++--- flink-python/pyflink/table/tests/test_udf.py | 34 ++++++++++++++++++++++ flink-python/pyflink/table/types.py | 5 +++- flink-python/setup.py | 2 +- .../flink/python/AbstractPythonFunctionRunner.java | 18 +++++++++++- .../python/PythonScalarFunctionFlatMap.java | 19 +++++++++++- .../python/AbstractStatelessFunctionOperator.java | 22 ++++++++++++-- .../BaseRowPythonScalarFunctionOperator.java | 7 +++-- .../scalar/PythonScalarFunctionOperator.java | 7 +++-- .../arrow/ArrowPythonScalarFunctionOperator.java | 7 +++-- .../BaseRowArrowPythonScalarFunctionOperator.java | 7 +++-- .../table/BaseRowPythonTableFunctionOperator.java | 7 +++-- .../python/table/PythonTableFunctionOperator.java | 7 +++-- .../AbstractPythonStatelessFunctionRunner.java | 6 ++-- .../AbstractGeneralPythonScalarFunctionRunner.java | 7 +++-- .../scalar/AbstractPythonScalarFunctionRunner.java | 7 +++-- .../scalar/BaseRowPythonScalarFunctionRunner.java | 7 +++-- .../python/scalar/PythonScalarFunctionRunner.java | 7 +++-- .../AbstractArrowPythonScalarFunctionRunner.java | 7 +++-- .../arrow/ArrowPythonScalarFunctionRunner.java | 7 +++-- .../BaseRowArrowPythonScalarFunctionRunner.java | 7 +++-- .../table/AbstractPythonTableFunctionRunner.java | 7 +++-- .../table/BaseRowPythonTableFunctionRunner.java | 7 +++-- .../python/table/PythonTableFunctionRunner.java | 7 +++-- .../table/runtime/typeutils/PythonTypeUtils.java | 20 +++++++++++++ .../BaseRowPythonScalarFunctionOperatorTest.java | 7 +++-- .../scalar/PythonScalarFunctionOperatorTest.java | 7 +++-- .../ArrowPythonScalarFunctionOperatorTest.java | 7 +++-- ...seRowArrowPythonScalarFunctionOperatorTest.java | 7 +++-- .../BaseRowPythonTableFunctionOperatorTest.java | 4 ++- .../table/PythonTableFunctionOperatorTest.java | 4 ++- .../BaseRowPythonScalarFunctionRunnerTest.java | 4 ++- .../scalar/PythonScalarFunctionRunnerTest.java | 4 ++- .../arrow/ArrowPythonScalarFunctionRunnerTest.java | 1 + .../BaseRowPythonTableFunctionRunnerTest.java | 4 ++- .../table/PythonTableFunctionRunnerTest.java | 5 ++-- ...PassThroughArrowPythonScalarFunctionRunner.java | 9 ++++-- .../PassThroughPythonScalarFunctionRunner.java | 9 ++++-- .../plan/nodes/common/CommonPythonBase.scala | 8 ++++- .../nodes/physical/batch/BatchExecPythonCalc.scala | 2 +- .../physical/batch/BatchExecPythonCorrelate.scala | 2 +- .../physical/stream/StreamExecPythonCalc.scala | 2 +- .../stream/StreamExecPythonCorrelate.scala | 2 +- .../planner/utils/python/PythonTableUtils.scala | 8 ++++- .../flink/table/plan/nodes/CommonPythonBase.scala | 9 +++++- .../plan/nodes/dataset/DataSetPythonCalc.scala | 2 +- .../nodes/datastream/DataStreamPythonCalc.scala | 2 +- .../datastream/DataStreamPythonCorrelate.scala | 2 +- 52 files changed, 352 insertions(+), 77 deletions(-) diff --git a/flink-python/pyflink/fn_execution/coder_impl.py b/flink-python/pyflink/fn_execution/coder_impl.py index 7b6405a..f453a88 100644 --- a/flink-python/pyflink/fn_execution/coder_impl.py +++ b/flink-python/pyflink/fn_execution/coder_impl.py @@ -406,6 +406,18 @@ class TimestampCoderImpl(StreamCoderImpl): return datetime.datetime.utcfromtimestamp(second).replace(microsecond=microsecond) +class LocalZonedTimestampCoderImpl(TimestampCoderImpl): + + def __init__(self, precision, timezone): + super(LocalZonedTimestampCoderImpl, self).__init__(precision) + self.timezone = timezone + + def internal_to_timestamp(self, milliseconds, nanoseconds): + return self.timezone.localize( + super(LocalZonedTimestampCoderImpl, self).internal_to_timestamp( + milliseconds, nanoseconds)) + + class ArrowCoderImpl(StreamCoderImpl): def __init__(self, schema): diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index eb47cf7..da4f03b 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -22,13 +22,16 @@ from abc import ABC import datetime import decimal import pyarrow as pa +import pytz from apache_beam.coders import Coder from apache_beam.coders.coders import FastCoder, LengthPrefixCoder +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.portability import common_urns from apache_beam.typehints import typehints from pyflink.fn_execution import coder_impl from pyflink.fn_execution import flink_fn_execution_pb2 +from pyflink.fn_execution.sdk_worker_main import pipeline_options from pyflink.table import Row FLINK_SCALAR_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:scalar_function:v1" @@ -377,6 +380,22 @@ class TimestampCoder(DeterministicCoder): return datetime.datetime +class LocalZonedTimestampCoder(DeterministicCoder): + """ + Coder for LocalZonedTimestamp. + """ + + def __init__(self, precision, timezone): + self.precision = precision + self.timezone = timezone + + def _create_impl(self): + return coder_impl.LocalZonedTimestampCoderImpl(self.precision, self.timezone) + + def to_type_hint(self): + return datetime.datetime + + class ArrowCoder(DeterministicCoder): """ Coder for Arrow. @@ -466,6 +485,10 @@ def from_proto(field_type): return RowCoder([from_proto(f.type) for f in field_type.row_schema.fields]) if field_type_name == type_name.TIMESTAMP: return TimestampCoder(field_type.timestamp_info.precision) + if field_type_name == type_name.LOCAL_ZONED_TIMESTAMP: + timezone = pytz.timezone(pipeline_options.view_as(DebugOptions).lookup_experiment( + "table.exec.timezone")) + return LocalZonedTimestampCoder(field_type.local_zoned_timestamp_info.precision, timezone) elif field_type_name == type_name.ARRAY: return ArrayCoder(from_proto(field_type.collection_element_type)) elif field_type_name == type_name.MAP: diff --git a/flink-python/pyflink/fn_execution/sdk_worker_main.py b/flink-python/pyflink/fn_execution/sdk_worker_main.py index 82d091c..7f2c423 100644 --- a/flink-python/pyflink/fn_execution/sdk_worker_main.py +++ b/flink-python/pyflink/fn_execution/sdk_worker_main.py @@ -15,10 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +import os import sys # force to register the operations to SDK Harness +from apache_beam.options.pipeline_options import PipelineOptions + import pyflink.fn_execution.operations # noqa # pylint: disable=unused-import # force to register the coders to SDK Harness @@ -26,5 +28,11 @@ import pyflink.fn_execution.coders # noqa # pylint: disable=unused-import import apache_beam.runners.worker.sdk_worker_main +if 'PIPELINE_OPTIONS' in os.environ: + pipeline_options = apache_beam.runners.worker.sdk_worker_main._parse_pipeline_options( + os.environ['PIPELINE_OPTIONS']) +else: + pipeline_options = PipelineOptions.from_dictionary({}) + if __name__ == '__main__': apache_beam.runners.worker.sdk_worker_main.main(sys.argv) diff --git a/flink-python/pyflink/fn_execution/tests/test_coders_common.py b/flink-python/pyflink/fn_execution/tests/test_coders_common.py index 42d1c19..69e9961 100644 --- a/flink-python/pyflink/fn_execution/tests/test_coders_common.py +++ b/flink-python/pyflink/fn_execution/tests/test_coders_common.py @@ -22,7 +22,8 @@ import unittest from pyflink.fn_execution.coders import BigIntCoder, TinyIntCoder, BooleanCoder, \ SmallIntCoder, IntCoder, FloatCoder, DoubleCoder, BinaryCoder, CharCoder, DateCoder, \ - TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, FlattenRowCoder, RowCoder + TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder, FlattenRowCoder, RowCoder, \ + LocalZonedTimestampCoder class CodersTest(unittest.TestCase): @@ -93,6 +94,17 @@ class CodersTest(unittest.TestCase): coder = TimestampCoder(6) self.check_coder(coder, datetime.datetime(2019, 9, 10, 18, 30, 20, 123456)) + def test_local_zoned_timestamp_coder(self): + import datetime + import pytz + timezone = pytz.timezone("Asia/Shanghai") + coder = LocalZonedTimestampCoder(3, timezone) + self.check_coder(coder, + timezone.localize(datetime.datetime(2019, 9, 10, 18, 30, 20, 123000))) + coder = LocalZonedTimestampCoder(6, timezone) + self.check_coder(coder, + timezone.localize(datetime.datetime(2019, 9, 10, 18, 30, 20, 123456))) + def test_array_coder(self): element_coder = BigIntCoder() coder = ArrayCoder(element_coder) diff --git a/flink-python/pyflink/table/tests/test_types.py b/flink-python/pyflink/table/tests/test_types.py index 415224d..04a2e13 100644 --- a/flink-python/pyflink/table/tests/test_types.py +++ b/flink-python/pyflink/table/tests/test_types.py @@ -33,7 +33,8 @@ from pyflink.table.types import (_infer_schema_from_data, _infer_type, _array_type_mappings, _merge_type, _create_type_verifier, UserDefinedType, DataTypes, Row, RowField, RowType, ArrayType, BigIntType, VarCharType, MapType, DataType, - _to_java_type, _from_java_type, ZonedTimestampType) + _to_java_type, _from_java_type, ZonedTimestampType, + LocalZonedTimestampType) class ExamplePointUDT(UserDefinedType): @@ -535,11 +536,23 @@ class TypesTests(unittest.TestCase): def test_local_zoned_timestamp_type(self): lztst = DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE() - ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000, tzinfo=UTCOffsetTimezone(1)) - self.assertEqual(-3600000000, lztst.to_sql_type(ts)) + ts = datetime.datetime(1970, 1, 1, 0, 0, 0, 0000) + self.assertEqual(0, lztst.to_sql_type(ts)) + + import pytz + # suppose the timezone of the data is +9:00 + timezone = pytz.timezone("Asia/Tokyo") + orig_epoch = LocalZonedTimestampType.EPOCH_ORDINAL + try: + # suppose the local timezone is +8:00 + LocalZonedTimestampType.EPOCH_ORDINAL = 28800000000 + ts_tokyo = timezone.localize(ts) + self.assertEqual(-3600000000, lztst.to_sql_type(ts_tokyo)) + finally: + LocalZonedTimestampType.EPOCH_ORDINAL = orig_epoch if sys.version_info >= (3, 6): - ts2 = lztst.from_sql_type(-3600000000) + ts2 = lztst.from_sql_type(0) self.assertEqual(ts.astimezone(), ts2.astimezone()) def test_zoned_timestamp_type(self): diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py index e12b71a..a599aac 100644 --- a/flink-python/pyflink/table/tests/test_udf.py +++ b/flink-python/pyflink/table/tests/test_udf.py @@ -15,6 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import datetime + +import pytz + from pyflink.table import DataTypes from pyflink.table.udf import ScalarFunction, udf from pyflink.testing import source_sink_utils @@ -562,6 +566,36 @@ class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests, self.t_env.register_function( "non-callable-udf", udf(Plus(), DataTypes.BIGINT(), DataTypes.BIGINT())) + def test_data_types_only_supported_in_blink_planner(self): + timezone = self.t_env.get_config().get_local_timezone() + local_datetime = pytz.timezone(timezone).localize( + datetime.datetime(1970, 1, 1, 0, 0, 0, 123000)) + + def local_zoned_timestamp_func(local_zoned_timestamp_param): + assert local_zoned_timestamp_param == local_datetime, \ + 'local_zoned_timestamp_param is wrong value %s !' % local_zoned_timestamp_param + return local_zoned_timestamp_param + + self.t_env.register_function( + "local_zoned_timestamp_func", + udf(local_zoned_timestamp_func, + [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)], + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))) + + table_sink = source_sink_utils.TestAppendSink( + ['a'], [DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)]) + self.t_env.register_table_sink("Results", table_sink) + + t = self.t_env.from_elements( + [(local_datetime,)], + DataTypes.ROW([DataTypes.FIELD("a", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))])) + + t.select("local_zoned_timestamp_func(local_zoned_timestamp_func(a))") \ + .insert_into("Results") + self.t_env.execute("test") + actual = source_sink_utils.results() + self.assert_equals(actual, ["1970-01-01T00:00:00.123Z"]) + class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests, PyFlinkBlinkBatchTableTestCase): diff --git a/flink-python/pyflink/table/types.py b/flink-python/pyflink/table/types.py index d3ab0f2..c193bc3 100644 --- a/flink-python/pyflink/table/types.py +++ b/flink-python/pyflink/table/types.py @@ -477,6 +477,8 @@ class LocalZonedTimestampType(AtomicType): :param nullable: boolean, whether the field can be null (None) or not. """ + EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10 ** 6 + def __init__(self, precision=6, nullable=True): super(LocalZonedTimestampType, self).__init__(nullable) assert 0 <= precision <= 9 @@ -492,10 +494,11 @@ class LocalZonedTimestampType(AtomicType): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return int(seconds) * 10 ** 6 + dt.microsecond + return int(seconds) * 10 ** 6 + dt.microsecond + self.EPOCH_ORDINAL def from_sql_type(self, ts): if ts is not None: + ts = ts - self.EPOCH_ORDINAL return datetime.datetime.fromtimestamp(ts // 10 ** 6).replace(microsecond=ts % 10 ** 6) diff --git a/flink-python/setup.py b/flink-python/setup.py index ca4b598..5f50376 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -225,7 +225,7 @@ run sdist. python_requires='>=3.5', install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0', 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2', - 'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<=0.16.0'], + 'pandas>=0.23.4,<=0.25.3', 'pyarrow>=0.15.1,<=0.16.0', 'pytz>=2018.3'], tests_require=['pytest==4.4.1'], description='Apache Flink Python API', long_description=long_description, diff --git a/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java index 597057e..84e7838 100644 --- a/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java @@ -37,11 +37,14 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct; +import java.util.Map; + /** * An base class for {@link PythonFunctionRunner}. * @@ -65,6 +68,11 @@ public abstract class AbstractPythonFunctionRunner<IN> implements PythonFunction private final PythonEnvironmentManager environmentManager; /** + * The options used to configure the Python worker process. + */ + private final Map<String, String> jobOptions; + + /** * The bundle factory which has all job-scoped information and can be used to create a {@link StageBundleFactory}. */ private transient JobBundleFactory jobBundleFactory; @@ -112,11 +120,13 @@ public abstract class AbstractPythonFunctionRunner<IN> implements PythonFunction String taskName, FnDataReceiver<byte[]> resultReceiver, PythonEnvironmentManager environmentManager, - StateRequestHandler stateRequestHandler) { + StateRequestHandler stateRequestHandler, + Map<String, String> jobOptions) { this.taskName = Preconditions.checkNotNull(taskName); this.resultReceiver = Preconditions.checkNotNull(resultReceiver); this.environmentManager = Preconditions.checkNotNull(environmentManager); this.stateRequestHandler = Preconditions.checkNotNull(stateRequestHandler); + this.jobOptions = Preconditions.checkNotNull(jobOptions); } @Override @@ -131,6 +141,12 @@ public abstract class AbstractPythonFunctionRunner<IN> implements PythonFunction PipelineOptionsFactory.as(PortablePipelineOptions.class); // one operator has one Python SDK harness portableOptions.setSdkWorkerParallelism(1); + ExperimentalOptions experimentalOptions = portableOptions.as(ExperimentalOptions.class); + for (Map.Entry<String, String> entry : jobOptions.entrySet()) { + ExperimentalOptions.addExperiment(experimentalOptions, + String.join("=", entry.getKey(), entry.getValue())); + } + Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions); jobBundleFactory = createJobBundleFactory(pipelineOptions); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java index 9785509..10cee8d 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/functions/python/PythonScalarFunctionFlatMap.java @@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -141,6 +143,11 @@ public final class PythonScalarFunctionFlatMap private final PythonConfig config; /** + * The options used to configure the Python worker process. + */ + private final Map<String, String> jobOptions; + + /** * Use an AtomicBoolean because we start/stop bundles by a timer thread. */ private transient AtomicBoolean bundleStarted; @@ -178,6 +185,7 @@ public final class PythonScalarFunctionFlatMap this.udfInputOffsets = Preconditions.checkNotNull(udfInputOffsets); this.forwardedFields = Preconditions.checkNotNull(forwardedFields); this.config = new PythonConfig(Preconditions.checkNotNull(config)); + this.jobOptions = buildJobOptions(config); } @Override @@ -285,7 +293,8 @@ public final class PythonScalarFunctionFlatMap scalarFunctions, createPythonEnvironmentManager(), udfInputType, - udfOutputType); + udfOutputType, + jobOptions); } private PythonEnvironmentManager createPythonEnvironmentManager() throws IOException { @@ -321,6 +330,14 @@ public final class PythonScalarFunctionFlatMap } } + private Map<String, String> buildJobOptions(Configuration config) { + Map<String, String> jobOptions = new HashMap<>(); + if (config.containsKey("table.exec.timezone")) { + jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null)); + } + return jobOptions; + } + @Override public TypeInformation<Row> getProducedType() { return (TypeInformation<Row>) LegacyTypeInfoDataTypeConverter diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java index 3670ac7..248683c 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/AbstractStatelessFunctionOperator.java @@ -37,6 +37,8 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -69,6 +71,11 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> protected final int[] userDefinedFunctionInputOffsets; /** + * The options used to configure the Python worker process. + */ + private final Map<String, String> jobOptions; + + /** * The user-defined function input logical type. */ protected transient RowType userDefinedFunctionInputType; @@ -108,6 +115,7 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> this.inputType = Preconditions.checkNotNull(inputType); this.outputType = Preconditions.checkNotNull(outputType); this.userDefinedFunctionInputOffsets = Preconditions.checkNotNull(userDefinedFunctionInputOffsets); + this.jobOptions = buildJobOptions(config); } @Override @@ -140,7 +148,8 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> return new ProjectUdfInputPythonScalarFunctionRunner( createPythonFunctionRunner( userDefinedFunctionResultReceiver, - createPythonEnvironmentManager())); + createPythonEnvironmentManager(), + jobOptions)); } /** @@ -153,7 +162,16 @@ public abstract class AbstractStatelessFunctionOperator<IN, OUT, UDFIN> public abstract PythonFunctionRunner<UDFIN> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager); + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions); + + private Map<String, String> buildJobOptions(Configuration config) { + Map<String, String> jobOptions = new HashMap<>(); + if (config.containsKey("table.exec.timezone")) { + jobOptions.put("table.exec.timezone", config.getString("table.exec.timezone", null)); + } + return jobOptions; + } private class ProjectUdfInputPythonScalarFunctionRunner implements PythonFunctionRunner<IN> { diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java index 6f9e922..c845090 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperator.java @@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.io.IOException; +import java.util.Map; /** * The Python {@link ScalarFunction} operator for the blink planner. @@ -80,13 +81,15 @@ public class BaseRowPythonScalarFunctionOperator extends AbstractBaseRowPythonSc @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new BaseRowPythonScalarFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, scalarFunctions, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType); + userDefinedFunctionOutputType, + jobOptions); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java index 672882e..8fa27bf 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java @@ -34,6 +34,7 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.io.IOException; +import java.util.Map; /** * The Python {@link ScalarFunction} operator for the legacy planner. @@ -81,13 +82,15 @@ public class PythonScalarFunctionOperator extends AbstractRowPythonScalarFunctio @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PythonScalarFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, scalarFunctions, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType); + userDefinedFunctionOutputType, + jobOptions); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java index 3554106..4a686e8 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java @@ -38,6 +38,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.io.IOException; +import java.util.Map; /** * Arrow Python {@link ScalarFunction} operator for the old planner. @@ -93,7 +94,8 @@ public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFu @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new ArrowPythonScalarFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, @@ -101,7 +103,8 @@ public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFu pythonEnvironmentManager, userDefinedFunctionInputType, userDefinedFunctionOutputType, - getPythonConfig().getMaxArrowBatchSize()); + getPythonConfig().getMaxArrowBatchSize(), + jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java index aa46991ba..6a20fe6 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperator.java @@ -37,6 +37,7 @@ import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.io.IOException; +import java.util.Map; /** * Arrow Python {@link ScalarFunction} operator for the blink planner. @@ -92,7 +93,8 @@ public class BaseRowArrowPythonScalarFunctionOperator extends AbstractBaseRowPyt @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new BaseRowArrowPythonScalarFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, @@ -100,7 +102,8 @@ public class BaseRowArrowPythonScalarFunctionOperator extends AbstractBaseRowPyt pythonEnvironmentManager, userDefinedFunctionInputType, userDefinedFunctionOutputType, - getPythonConfig().getMaxArrowBatchSize()); + getPythonConfig().getMaxArrowBatchSize(), + jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java index d138df3..f7dd371 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperator.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.calcite.rel.core.JoinRelType; import java.io.IOException; +import java.util.Map; /** * The Python {@link TableFunction} operator for the blink planner. @@ -117,14 +118,16 @@ public class BaseRowPythonTableFunctionOperator @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new BaseRowPythonTableFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, tableFunction, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType); + userDefinedFunctionOutputType, + jobOptions); } private Projection<BaseRow, BinaryRow> createUdtfInputProjection() { diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java index a4c5259..418d271 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperator.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.calcite.rel.core.JoinRelType; import java.io.IOException; +import java.util.Map; /** * The Python {@link TableFunction} operator for the legacy planner. @@ -131,13 +132,15 @@ public class PythonTableFunctionOperator extends AbstractPythonTableFunctionOper @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PythonTableFunctionRunner( getRuntimeContext().getTaskName(), resultReceiver, tableFunction, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType); + userDefinedFunctionOutputType, + jobOptions); } } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java index c13d3af..5155bf6 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/AbstractPythonStatelessFunctionRunner.java @@ -51,6 +51,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.beam.runners.core.construction.BeamUrns.getUrn; @@ -86,8 +87,9 @@ public abstract class AbstractPythonStatelessFunctionRunner<IN> extends Abstract PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, - String functionUrn) { - super(taskName, resultReceiver, environmentManager, StateRequestHandler.unsupported()); + String functionUrn, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, environmentManager, StateRequestHandler.unsupported(), jobOptions); this.functionUrn = functionUrn; this.inputType = Preconditions.checkNotNull(inputType); this.outputType = Preconditions.checkNotNull(outputType); diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java index 67b16ae..eededf3 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractGeneralPythonScalarFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.util.WindowedValue; +import java.util.Map; + /** * Abstract {@link PythonFunctionRunner} used to execute Python {@link ScalarFunction}s. * @@ -51,8 +53,9 @@ public abstract class AbstractGeneralPythonScalarFunctionRunner<IN> extends Abst PythonFunctionInfo[] scalarFunctions, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java index c5991cd..9e0cdf8 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/AbstractPythonScalarFunctionRunner.java @@ -31,6 +31,8 @@ import org.apache.flink.util.Preconditions; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * Abstract {@link PythonFunctionRunner} used to execute Python {@link ScalarFunction}s. * @@ -49,8 +51,9 @@ public abstract class AbstractPythonScalarFunctionRunner<IN> extends AbstractPyt PythonFunctionInfo[] scalarFunctions, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, environmentManager, inputType, outputType, SCALAR_FUNCTION_URN); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, environmentManager, inputType, outputType, SCALAR_FUNCTION_URN, jobOptions); this.scalarFunctions = Preconditions.checkNotNull(scalarFunctions); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java index 758ac64..4a2ee55 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Python {@link ScalarFunction}s. * It takes {@link BaseRow} as the input and outputs a byte array. @@ -43,8 +45,9 @@ public class BaseRowPythonScalarFunctionRunner extends AbstractGeneralPythonScal PythonFunctionInfo[] scalarFunctions, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java index ca7d80a..8e48b46 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Python {@link ScalarFunction}s. * It takes {@link Row} as the input and outputs a byte array. @@ -43,8 +45,9 @@ public class PythonScalarFunctionRunner extends AbstractGeneralPythonScalarFunct PythonFunctionInfo[] scalarFunctions, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java index d218c8f..dda5395 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/AbstractArrowPythonScalarFunctionRunner.java @@ -36,6 +36,8 @@ import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.util.WindowedValue; +import java.util.Map; + /** * Abstract {@link PythonFunctionRunner} used to execute Arrow Python {@link ScalarFunction}s. * @@ -97,8 +99,9 @@ public abstract class AbstractArrowPythonScalarFunctionRunner<IN> extends Abstra PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, - int maxArrowBatchSize) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType); + int maxArrowBatchSize, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions); this.maxArrowBatchSize = maxArrowBatchSize; } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java index ee86327..df3d291 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Arrow Python {@link ScalarFunction}s. * It takes {@link Row} as the input type. @@ -44,8 +46,9 @@ public class ArrowPythonScalarFunctionRunner extends AbstractArrowPythonScalarFu PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, - int maxArrowBatchSize) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize); + int maxArrowBatchSize, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java index f15dec0..60e9609 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/BaseRowArrowPythonScalarFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Arrow Python {@link ScalarFunction}s. * It takes {@link BaseRow} as the input type. @@ -44,8 +46,9 @@ public class BaseRowArrowPythonScalarFunctionRunner extends AbstractArrowPythonS PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, - int maxBatchSize) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxBatchSize); + int maxBatchSize, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxBatchSize, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java index 0aa3265..962f556 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/AbstractPythonTableFunctionRunner.java @@ -34,6 +34,8 @@ import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.util.WindowedValue; +import java.util.Map; + /** * Abstract {@link PythonFunctionRunner} used to execute Python {@link TableFunction}. * @@ -58,8 +60,9 @@ public abstract class AbstractPythonTableFunctionRunner<IN> extends AbstractPyth PythonFunctionInfo tableFunction, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, environmentManager, inputType, outputType, TABLE_FUNCTION_URN); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, environmentManager, inputType, outputType, TABLE_FUNCTION_URN, jobOptions); this.tableFunction = Preconditions.checkNotNull(tableFunction); } diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java index 866d182..3ff6ca4 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunner.java @@ -30,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Python {@link TableFunction}. * It takes {@link BaseRow} as the input and outputs a byte array. @@ -42,8 +44,9 @@ public class BaseRowPythonTableFunctionRunner extends AbstractPythonTableFunctio PythonFunctionInfo tableFunction, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java index c691ca3..48fa400 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunner.java @@ -31,6 +31,8 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; +import java.util.Map; + /** * A {@link PythonFunctionRunner} used to execute Python {@link TableFunction}. * It takes {@link Row} as the input and outputs a byte array. @@ -44,8 +46,9 @@ public class PythonTableFunctionRunner extends AbstractPythonTableFunctionRunner PythonFunctionInfo tableFunction, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType); + RowType outputType, + Map<String, String> jobOptions) { + super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType, jobOptions); } @Override diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java index 97d6cf6..60f79a0 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java @@ -52,6 +52,7 @@ import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LegacyTypeInformationType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -268,6 +269,11 @@ public final class PythonTypeUtils { return new SqlTimestampSerializer(timestampType.getPrecision()); } + @Override + public TypeSerializer visit(LocalZonedTimestampType localZonedTimestampType) { + return new SqlTimestampSerializer(localZonedTimestampType.getPrecision()); + } + public TypeSerializer visit(ArrayType arrayType) { LogicalType elementType = arrayType.getElementType(); TypeSerializer elementTypeSerializer = elementType.accept(this); @@ -417,6 +423,20 @@ public final class PythonTypeUtils { } @Override + public FlinkFnApi.Schema.FieldType visit(LocalZonedTimestampType localZonedTimestampType) { + FlinkFnApi.Schema.FieldType.Builder builder = + FlinkFnApi.Schema.FieldType.newBuilder() + .setTypeName(FlinkFnApi.Schema.TypeName.LOCAL_ZONED_TIMESTAMP) + .setNullable(localZonedTimestampType.isNullable()); + + FlinkFnApi.Schema.LocalZonedTimestampInfo.Builder dateTimeBuilder = + FlinkFnApi.Schema.LocalZonedTimestampInfo.newBuilder() + .setPrecision(localZonedTimestampType.getPrecision()); + builder.setLocalZonedTimestampInfo(dateTimeBuilder.build()); + return builder.build(); + } + + @Override public FlinkFnApi.Schema.FieldType visit(DecimalType decimalType) { FlinkFnApi.Schema.FieldType.Builder builder = FlinkFnApi.Schema.FieldType.newBuilder() diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java index 0c5e10e..ef4e618 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/BaseRowPythonScalarFunctionOperatorTest.java @@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.util.Collection; +import java.util.Map; import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow; @@ -115,14 +116,16 @@ public class BaseRowPythonScalarFunctionOperatorTest @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughPythonScalarFunctionRunner<BaseRow>( getRuntimeContext().getTaskName(), resultReceiver, scalarFunctions, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType) { + userDefinedFunctionOutputType, + jobOptions) { @Override public TypeSerializer<BaseRow> getInputTypeSerializer() { return (BaseRowSerializer) PythonTypeUtils.toBlinkTypeSerializer(getInputType()); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java index c910e45..459feb2 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperatorTest.java @@ -36,6 +36,7 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.util.Collection; +import java.util.Map; import java.util.Queue; /** @@ -91,14 +92,16 @@ public class PythonScalarFunctionOperatorTest extends PythonScalarFunctionOperat @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughPythonScalarFunctionRunner<Row>( getRuntimeContext().getTaskName(), resultReceiver, scalarFunctions, pythonEnvironmentManager, userDefinedFunctionInputType, - userDefinedFunctionOutputType) { + userDefinedFunctionOutputType, + jobOptions) { @Override public TypeSerializer<Row> getInputTypeSerializer() { return (RowSerializer) PythonTypeUtils.toFlinkTypeSerializer(getInputType()); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java index 93d0f2c..affca78 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperatorTest.java @@ -38,6 +38,7 @@ import org.apache.flink.types.Row; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.util.Collection; +import java.util.Map; import java.util.Queue; /** @@ -89,7 +90,8 @@ public class ArrowPythonScalarFunctionOperatorTest extends PythonScalarFunctionO @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughArrowPythonScalarFunctionRunner<Row>( getRuntimeContext().getTaskName(), resultReceiver, @@ -97,7 +99,8 @@ public class ArrowPythonScalarFunctionOperatorTest extends PythonScalarFunctionO pythonEnvironmentManager, userDefinedFunctionInputType, userDefinedFunctionOutputType, - getPythonConfig().getMaxArrowBatchSize()) { + getPythonConfig().getMaxArrowBatchSize(), + jobOptions) { @Override public ArrowWriter<Row> createArrowWriter() { return ArrowUtils.createRowArrowWriter(root); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java index 43138f3..f32bf32 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/BaseRowArrowPythonScalarFunctionOperatorTest.java @@ -43,6 +43,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; import java.util.Collection; +import java.util.Map; import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow; @@ -111,7 +112,8 @@ public class BaseRowArrowPythonScalarFunctionOperatorTest @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughArrowPythonScalarFunctionRunner<BaseRow>( getRuntimeContext().getTaskName(), resultReceiver, @@ -119,7 +121,8 @@ public class BaseRowArrowPythonScalarFunctionOperatorTest pythonEnvironmentManager, userDefinedFunctionInputType, userDefinedFunctionOutputType, - getPythonConfig().getMaxArrowBatchSize()) { + getPythonConfig().getMaxArrowBatchSize(), + jobOptions) { @Override public ArrowWriter<BaseRow> createArrowWriter() { return ArrowUtils.createBaseRowArrowWriter(root); diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java index f72bd9e..f94bb0c 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/BaseRowPythonTableFunctionOperatorTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.calcite.rel.core.JoinRelType; import java.util.Collection; +import java.util.Map; import static org.apache.flink.table.runtime.util.StreamRecordUtils.baserow; import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow; @@ -94,7 +95,8 @@ public class BaseRowPythonTableFunctionOperatorTest @Override public PythonFunctionRunner<BaseRow> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughPythonTableFunctionRunner<BaseRow>(resultReceiver) { @Override public BaseRow copy(BaseRow element) { diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java index 4d4ddfe..409675b 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/table/PythonTableFunctionOperatorTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.calcite.rel.core.JoinRelType; import java.util.Collection; +import java.util.Map; import java.util.Queue; /** @@ -77,7 +78,8 @@ public class PythonTableFunctionOperatorTest extends PythonTableFunctionOperator @Override public PythonFunctionRunner<Row> createPythonFunctionRunner( FnDataReceiver<byte[]> resultReceiver, - PythonEnvironmentManager pythonEnvironmentManager) { + PythonEnvironmentManager pythonEnvironmentManager, + Map<String, String> jobOptions) { return new PassThroughPythonTableFunctionRunner<Row>(resultReceiver) { @Override public Row copy(Row element) { diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java index 85a680b..7f42b52 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/BaseRowPythonScalarFunctionRunnerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import static org.junit.Assert.assertEquals; @@ -88,6 +89,7 @@ public class BaseRowPythonScalarFunctionRunnerTest extends AbstractPythonScalarF pythonFunctionInfos, environmentManager, inputType, - outputType); + outputType, + Collections.emptyMap()); } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java index 7757b3f..66f1eed 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/PythonScalarFunctionRunnerTest.java @@ -210,7 +210,8 @@ public class PythonScalarFunctionRunnerTest extends AbstractPythonScalarFunction pythonFunctionInfos, environmentManager, inputType, - outputType); + outputType, + Collections.emptyMap()); } private AbstractGeneralPythonScalarFunctionRunner<Row> createUDFRunner( @@ -236,6 +237,7 @@ public class PythonScalarFunctionRunnerTest extends AbstractPythonScalarFunction environmentManager, rowType, rowType, + Collections.emptyMap(), jobBundleFactory) { @Override public TypeSerializer<Row> getInputTypeSerializer() { diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java index 8843d82..208b5bf 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/scalar/arrow/ArrowPythonScalarFunctionRunnerTest.java @@ -173,6 +173,7 @@ public class ArrowPythonScalarFunctionRunnerTest extends AbstractPythonScalarFun inputType, outputType, maxArrowBatchSize, + Collections.emptyMap(), jobBundleFactory) { @Override public ArrowWriter<Row> createArrowWriter() { diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java index a93511f..23c6533 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/BaseRowPythonTableFunctionRunnerTest.java @@ -30,6 +30,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import static org.junit.Assert.assertEquals; @@ -73,6 +74,7 @@ public class BaseRowPythonTableFunctionRunnerTest extends AbstractPythonTableFun pythonFunctionInfo, environmentManager, inputType, - outputType); + outputType, + Collections.emptyMap()); } } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java index 2c8383c..5c396af 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/runners/python/table/PythonTableFunctionRunnerTest.java @@ -96,7 +96,8 @@ public class PythonTableFunctionRunnerTest extends AbstractPythonTableFunctionRu pythonFunctionInfo, environmentManager, inputType, - outputType); + outputType, + Collections.emptyMap()); } private AbstractPythonTableFunctionRunner<Row> createUDTFRunner( @@ -135,7 +136,7 @@ public class PythonTableFunctionRunnerTest extends AbstractPythonTableFunctionRu RowType inputType, RowType outputType, JobBundleFactory jobBundleFactory) { - super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType); + super(taskName, resultReceiver, tableFunction, environmentManager, inputType, outputType, Collections.emptyMap()); this.jobBundleFactory = jobBundleFactory; } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java index 09e4809..ed66123 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughArrowPythonScalarFunctionRunner.java @@ -29,6 +29,7 @@ import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.flink.table.runtime.utils.PythonTestUtils.createMockJobBundleFactory; @@ -49,8 +50,9 @@ public abstract class PassThroughArrowPythonScalarFunctionRunner<IN> extends Abs PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, - int maxArrowBatchSize) { - this(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize, createMockJobBundleFactory()); + int maxArrowBatchSize, + Map<String, String> jobOptions) { + this(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions, createMockJobBundleFactory()); } public PassThroughArrowPythonScalarFunctionRunner( @@ -61,8 +63,9 @@ public abstract class PassThroughArrowPythonScalarFunctionRunner<IN> extends Abs RowType inputType, RowType outputType, int maxArrowBatchSize, + Map<String, String> jobOptions, JobBundleFactory jobBundleFactory) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize); + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, maxArrowBatchSize, jobOptions); this.jobBundleFactory = jobBundleFactory; this.bufferedInputs = new ArrayList<>(); } diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java index 08dd276..492eef6 100644 --- a/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java +++ b/flink-python/src/test/java/org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.java @@ -29,6 +29,7 @@ import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.flink.table.runtime.utils.PythonTestUtils.createMockJobBundleFactory; @@ -48,8 +49,9 @@ public abstract class PassThroughPythonScalarFunctionRunner<IN> extends Abstract PythonFunctionInfo[] scalarFunctions, PythonEnvironmentManager environmentManager, RowType inputType, - RowType outputType) { - this(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, createMockJobBundleFactory()); + RowType outputType, + Map<String, String> jobOptions) { + this(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions, createMockJobBundleFactory()); } public PassThroughPythonScalarFunctionRunner( @@ -59,8 +61,9 @@ public abstract class PassThroughPythonScalarFunctionRunner<IN> extends Abstract PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, + Map<String, String> jobOptions, JobBundleFactory jobBundleFactory) { - super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType); + super(taskName, resultReceiver, scalarFunctions, environmentManager, inputType, outputType, jobOptions); this.jobBundleFactory = jobBundleFactory; this.bufferedInputs = new ArrayList<>(); } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala index 128cf0c..5a6efa2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonBase.scala @@ -20,7 +20,8 @@ package org.apache.flink.table.planner.plan.nodes.common import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.table.api.TableException +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionInfo} import org.apache.flink.table.planner.functions.utils.{ScalarSqlFunction, TableSqlFunction} @@ -84,4 +85,9 @@ trait CommonPythonBase { } } + protected def getConfig(tableConfig: TableConfig): Configuration = { + val config = new Configuration(tableConfig.getConfiguration) + config.setString("table.exec.timezone", tableConfig.getLocalTimeZone.getId) + config + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala index b9eb6a8..f850663 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala @@ -58,7 +58,7 @@ class BatchExecPythonCalc( inputTransform, calcProgram, "BatchExecPythonCalc", - planner.getTableConfig.getConfiguration) + getConfig(planner.getTableConfig)) ExecNode.setManagedMemoryWeight( ret, getPythonWorkerMemory(planner.getTableConfig.getConfiguration)) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala index b341e35..cd52714 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala @@ -76,7 +76,7 @@ class BatchExecPythonCorrelate( scan, "BatchExecPythonCorrelate", outputRowType, - planner.getTableConfig.getConfiguration, + getConfig(planner.getTableConfig), joinType) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala index 45bf644..5b8d8f2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala @@ -57,7 +57,7 @@ class StreamExecPythonCalc( inputTransform, calcProgram, "StreamExecPythonCalc", - planner.getTableConfig.getConfiguration) + getConfig(planner.getTableConfig)) if (inputsContainSingleton()) { ret.setParallelism(1) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala index 4ece477..214268d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala @@ -81,7 +81,7 @@ class StreamExecPythonCorrelate( scan, "StreamExecPythonCorrelate", outputRowType, - planner.getTableConfig.getConfiguration, + getConfig(planner.getTableConfig), joinType) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala index b1c16d9..56e4e21 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.utils.python import java.nio.charset.StandardCharsets import java.sql.{Date, Time, Timestamp} -import java.time.{LocalDate, LocalDateTime, LocalTime} +import java.time.{Instant, LocalDate, LocalDateTime, LocalTime} import java.util.TimeZone import java.util.function.BiConsumer @@ -124,6 +124,12 @@ object PythonTableUtils { case c: Int => new Timestamp(c.toLong / 1000) } + case _ if dataType == org.apache.flink.api.common.typeinfo.Types.INSTANT => + (obj: Any) => nullSafeConvert(obj) { + case c: Long => Instant.ofEpochMilli(c / 1000) + case c: Int => Instant.ofEpochMilli(c.toLong / 1000) + } + case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => nullSafeConvert(obj) { case c: Long => c / 1000 case c: Int => c.toLong / 1000 diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala index 058b074..3ca4e41 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonBase.scala @@ -19,7 +19,8 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.table.api.TableException +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionInfo} import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} @@ -81,4 +82,10 @@ trait CommonPythonBase { createPythonFunctionInfo(pythonRexCall, inputNodes, tfc.getTableFunction) } } + + protected def getConfig(tableConfig: TableConfig): Configuration = { + val config = new Configuration(tableConfig.getConfiguration) + config.setString("table.exec.timezone", tableConfig.getLocalTimeZone.getId) + config + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala index e75ed80..420faac 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetPythonCalc.scala @@ -81,7 +81,7 @@ class DataSetPythonCalc( val flatMapFunctionOutputRowType = TypeConversions.fromLegacyInfoToDataType( flatMapFunctionResultTypeInfo).getLogicalType.asInstanceOf[RowType] val flatMapFunction = getPythonScalarFunctionFlatMap( - tableEnv.getConfig.getConfiguration, + getConfig(tableEnv.getConfig), flatMapFunctionInputRowType, flatMapFunctionOutputRowType, calcProgram) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala index 37c0eef..a405169 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCalc.scala @@ -84,7 +84,7 @@ class DataStreamPythonCalc( val pythonOperatorOutputRowType = TypeConversions.fromLegacyInfoToDataType( pythonOperatorResultTypeInfo).getLogicalType.asInstanceOf[RowType] val pythonOperator = getPythonScalarFunctionOperator( - planner.getConfig.getConfiguration, + getConfig(planner.getConfig), pythonOperatorInputRowType, pythonOperatorOutputRowType, calcProgram) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala index 1e46c35..7703d85 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamPythonCorrelate.scala @@ -95,7 +95,7 @@ class DataStreamPythonCorrelate( val sqlFunction = pythonTableFuncRexCall.getOperator.asInstanceOf[TableSqlFunction] val pythonOperator = getPythonTableFunctionOperator( - planner.getConfig.getConfiguration, + getConfig(planner.getConfig), pythonOperatorInputRowType, pythonOperatorOutputRowType, pythonFunctionInfo,