This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1e8d528e293 [FLINK-28786][following][python] Fix PyFlink execution failure on Mac Os with M1 chip 1e8d528e293 is described below commit 1e8d528e293da7ab990f1406fcd209ff4fb177b3 Author: huangxingbo <h...@apache.org> AuthorDate: Tue Feb 14 16:14:35 2023 +0800 [FLINK-28786][following][python] Fix PyFlink execution failure on Mac Os with M1 chip This closes #21926. --- flink-python/pyflink/fn_execution/__init__.py | 7 ++++ .../pyflink/fn_execution/beam/beam_coders.py | 17 +++++--- .../pyflink/fn_execution/beam/beam_operations.py | 17 +++++--- flink-python/pyflink/fn_execution/coders.py | 17 +++++--- .../pyflink/fn_execution/table/operations.py | 45 +++++++++++++--------- 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/flink-python/pyflink/fn_execution/__init__.py b/flink-python/pyflink/fn_execution/__init__.py index 65b48d4d79b..c909021c996 100644 --- a/flink-python/pyflink/fn_execution/__init__.py +++ b/flink-python/pyflink/fn_execution/__init__.py @@ -15,3 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ + +import os + +if 'PYFLINK_CYTHON_ENABLED' in os.environ: + PYFLINK_CYTHON_ENABLED = bool(os.environ['PYFLINK_CYTHON_ENABLED']) +else: + PYFLINK_CYTHON_ENABLED = True diff --git a/flink-python/pyflink/fn_execution/beam/beam_coders.py b/flink-python/pyflink/fn_execution/beam/beam_coders.py index 982ef26129a..7daf0151d27 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_coders.py +++ b/flink-python/pyflink/fn_execution/beam/beam_coders.py @@ -20,14 +20,21 @@ from apache_beam.coders.coders import FastCoder, LengthPrefixCoder from apache_beam.portability import common_urns from apache_beam.typehints import typehints +from pyflink import fn_execution from pyflink.fn_execution.coders import LengthPrefixBaseCoder from pyflink.fn_execution.flink_fn_execution_pb2 import CoderInfoDescriptor -try: - from pyflink.fn_execution.beam import beam_coder_impl_fast as beam_coder_impl - from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkFieldCoderBeamWrapper - from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper -except ImportError: +if fn_execution.PYFLINK_CYTHON_ENABLED: + try: + from pyflink.fn_execution.beam import beam_coder_impl_fast as beam_coder_impl + from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkFieldCoderBeamWrapper + from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper + except: + from pyflink.fn_execution.beam import beam_coder_impl_slow as beam_coder_impl + from pyflink.fn_execution.beam.beam_coder_impl_slow import FlinkFieldCoderBeamWrapper + from pyflink.fn_execution.beam.beam_coder_impl_slow import FlinkLengthPrefixCoderBeamWrapper + fn_execution.PYFLINK_CYTHON_ENABLED = False +else: from pyflink.fn_execution.beam import beam_coder_impl_slow as beam_coder_impl from pyflink.fn_execution.beam.beam_coder_impl_slow import FlinkFieldCoderBeamWrapper from pyflink.fn_execution.beam.beam_coder_impl_slow import FlinkLengthPrefixCoderBeamWrapper diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py b/flink-python/pyflink/fn_execution/beam/beam_operations.py index e149f994593..4681c4d3b4a 100644 --- a/flink-python/pyflink/fn_execution/beam/beam_operations.py +++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py @@ -21,6 +21,17 @@ from apache_beam.runners import common from apache_beam.runners.worker import bundle_processor, operation_specs from apache_beam.utils import proto_utils +from pyflink import fn_execution + +if fn_execution.PYFLINK_CYTHON_ENABLED: + try: + import pyflink.fn_execution.beam.beam_operations_fast as beam_operations + except: + import pyflink.fn_execution.beam.beam_operations_slow as beam_operations + fn_execution.PYFLINK_CYTHON_ENABLED = False +else: + import pyflink.fn_execution.beam.beam_operations_slow as beam_operations + from pyflink.fn_execution import flink_fn_execution_pb2 from pyflink.fn_execution.coders import from_proto, from_type_info_proto, TimeWindowCoder, \ CountWindowCoder, FlattenRowCoder @@ -30,12 +41,6 @@ import pyflink.fn_execution.datastream.operations as datastream_operations from pyflink.fn_execution.datastream.process import operations import pyflink.fn_execution.table.operations as table_operations -try: - import pyflink.fn_execution.beam.beam_operations_fast as beam_operations -except ImportError: - import pyflink.fn_execution.beam.beam_operations_slow as beam_operations - - # ----------------- UDF -------------------- diff --git a/flink-python/pyflink/fn_execution/coders.py b/flink-python/pyflink/fn_execution/coders.py index fa8ccdd5aac..0f4a822ca6c 100644 --- a/flink-python/pyflink/fn_execution/coders.py +++ b/flink-python/pyflink/fn_execution/coders.py @@ -21,8 +21,19 @@ from abc import ABC, abstractmethod from typing import Union import pytz -from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema +from pyflink import fn_execution + +if fn_execution.PYFLINK_CYTHON_ENABLED: + try: + from pyflink.fn_execution import coder_impl_fast as coder_impl + except: + from pyflink.fn_execution import coder_impl_slow as coder_impl + fn_execution.PYFLINK_CYTHON_ENABLED = False +else: + from pyflink.fn_execution import coder_impl_slow as coder_impl + +from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, AvroSchema from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, DateTypeInfo, \ TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, TupleTypeInfo, \ MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, ObjectArrayTypeInfo, \ @@ -32,10 +43,6 @@ from pyflink.table.types import TinyIntType, SmallIntType, IntType, BigIntType, LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, ArrayType, MapType, \ BinaryType, NullType -try: - from pyflink.fn_execution import coder_impl_fast as coder_impl -except: - from pyflink.fn_execution import coder_impl_slow as coder_impl __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 'BooleanCoder', 'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 'BinaryCoder', 'CharCoder', diff --git a/flink-python/pyflink/fn_execution/table/operations.py b/flink-python/pyflink/fn_execution/table/operations.py index 74fa35031cf..d897095ece7 100644 --- a/flink-python/pyflink/fn_execution/table/operations.py +++ b/flink-python/pyflink/fn_execution/table/operations.py @@ -20,6 +20,30 @@ from functools import reduce from itertools import chain from typing import Tuple +from pyflink import fn_execution + +if fn_execution.PYFLINK_CYTHON_ENABLED: + try: + from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \ + SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \ + SimpleTableAggsHandleFunction, GroupTableAggFunction + from pyflink.fn_execution.table.window_aggregate_fast import \ + SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction + from pyflink.fn_execution.coder_impl_fast import InternalRow + except: + from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \ + SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \ + SimpleTableAggsHandleFunction, GroupTableAggFunction + from pyflink.fn_execution.table.window_aggregate_slow import \ + SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction + fn_execution.PYFLINK_CYTHON_ENABLED = False +else: + from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \ + SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \ + SimpleTableAggsHandleFunction, GroupTableAggFunction + from pyflink.fn_execution.table.window_aggregate_slow import \ + SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction + from pyflink.fn_execution.coders import DataViewFilterCoder, PickleCoder from pyflink.fn_execution.datastream.timerservice import InternalTimer from pyflink.fn_execution.datastream.operations import Operation @@ -36,21 +60,6 @@ from pyflink.fn_execution.utils import operation_utils from pyflink.fn_execution.utils.operation_utils import extract_user_defined_aggregate_function from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup -try: - from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \ - SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \ - SimpleTableAggsHandleFunction, GroupTableAggFunction - from pyflink.fn_execution.table.window_aggregate_fast import \ - SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction - from pyflink.fn_execution.coder_impl_fast import InternalRow - has_cython = True -except ImportError: - from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \ - SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \ - SimpleTableAggsHandleFunction, GroupTableAggFunction - from pyflink.fn_execution.table.window_aggregate_slow import \ - SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction - has_cython = False from pyflink.table import FunctionContext, Row @@ -378,13 +387,13 @@ class AbstractStreamGroupAggregateOperation(BaseStatefulOperation): # [element_type, element(for process_element), timestamp(for timer), key(for timer)] # all the fields are nullable except the "element_type" if input_data[0] == NORMAL_RECORD: - if has_cython: + if fn_execution.PYFLINK_CYTHON_ENABLED: row = InternalRow.from_row(input_data[1]) else: row = input_data[1] self.group_agg_function.process_element(row) else: - if has_cython: + if fn_execution.PYFLINK_CYTHON_ENABLED: timer = InternalRow.from_row(input_data[3]) else: timer = input_data[3] @@ -521,7 +530,7 @@ class StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation) def process_element_or_timer(self, input_data: Tuple[int, Row, int, int, Row]): if input_data[0] == NORMAL_RECORD: self.group_agg_function.process_watermark(input_data[3]) - if has_cython: + if fn_execution.PYFLINK_CYTHON_ENABLED: input_row = InternalRow.from_row(input_data[1]) else: input_row = input_data[1]