This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e8d528e293 [FLINK-28786][following][python] Fix PyFlink execution 
failure on Mac Os with M1 chip
1e8d528e293 is described below

commit 1e8d528e293da7ab990f1406fcd209ff4fb177b3
Author: huangxingbo <h...@apache.org>
AuthorDate: Tue Feb 14 16:14:35 2023 +0800

    [FLINK-28786][following][python] Fix PyFlink execution failure on Mac Os 
with M1 chip
    
    This closes #21926.
---
 flink-python/pyflink/fn_execution/__init__.py      |  7 ++++
 .../pyflink/fn_execution/beam/beam_coders.py       | 17 +++++---
 .../pyflink/fn_execution/beam/beam_operations.py   | 17 +++++---
 flink-python/pyflink/fn_execution/coders.py        | 17 +++++---
 .../pyflink/fn_execution/table/operations.py       | 45 +++++++++++++---------
 5 files changed, 69 insertions(+), 34 deletions(-)

diff --git a/flink-python/pyflink/fn_execution/__init__.py 
b/flink-python/pyflink/fn_execution/__init__.py
index 65b48d4d79b..c909021c996 100644
--- a/flink-python/pyflink/fn_execution/__init__.py
+++ b/flink-python/pyflink/fn_execution/__init__.py
@@ -15,3 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+
+import os
+
+if 'PYFLINK_CYTHON_ENABLED' in os.environ:
+    PYFLINK_CYTHON_ENABLED = bool(os.environ['PYFLINK_CYTHON_ENABLED'])
+else:
+    PYFLINK_CYTHON_ENABLED = True
diff --git a/flink-python/pyflink/fn_execution/beam/beam_coders.py 
b/flink-python/pyflink/fn_execution/beam/beam_coders.py
index 982ef26129a..7daf0151d27 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_coders.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_coders.py
@@ -20,14 +20,21 @@ from apache_beam.coders.coders import FastCoder, 
LengthPrefixCoder
 from apache_beam.portability import common_urns
 from apache_beam.typehints import typehints
 
+from pyflink import fn_execution
 from pyflink.fn_execution.coders import LengthPrefixBaseCoder
 from pyflink.fn_execution.flink_fn_execution_pb2 import CoderInfoDescriptor
 
-try:
-    from pyflink.fn_execution.beam import beam_coder_impl_fast as 
beam_coder_impl
-    from pyflink.fn_execution.beam.beam_coder_impl_fast import 
FlinkFieldCoderBeamWrapper
-    from pyflink.fn_execution.beam.beam_coder_impl_fast import 
FlinkLengthPrefixCoderBeamWrapper
-except ImportError:
+if fn_execution.PYFLINK_CYTHON_ENABLED:
+    try:
+        from pyflink.fn_execution.beam import beam_coder_impl_fast as 
beam_coder_impl
+        from pyflink.fn_execution.beam.beam_coder_impl_fast import 
FlinkFieldCoderBeamWrapper
+        from pyflink.fn_execution.beam.beam_coder_impl_fast import 
FlinkLengthPrefixCoderBeamWrapper
+    except:
+        from pyflink.fn_execution.beam import beam_coder_impl_slow as 
beam_coder_impl
+        from pyflink.fn_execution.beam.beam_coder_impl_slow import 
FlinkFieldCoderBeamWrapper
+        from pyflink.fn_execution.beam.beam_coder_impl_slow import 
FlinkLengthPrefixCoderBeamWrapper
+        fn_execution.PYFLINK_CYTHON_ENABLED = False
+else:
     from pyflink.fn_execution.beam import beam_coder_impl_slow as 
beam_coder_impl
     from pyflink.fn_execution.beam.beam_coder_impl_slow import 
FlinkFieldCoderBeamWrapper
     from pyflink.fn_execution.beam.beam_coder_impl_slow import 
FlinkLengthPrefixCoderBeamWrapper
diff --git a/flink-python/pyflink/fn_execution/beam/beam_operations.py 
b/flink-python/pyflink/fn_execution/beam/beam_operations.py
index e149f994593..4681c4d3b4a 100644
--- a/flink-python/pyflink/fn_execution/beam/beam_operations.py
+++ b/flink-python/pyflink/fn_execution/beam/beam_operations.py
@@ -21,6 +21,17 @@ from apache_beam.runners import common
 from apache_beam.runners.worker import bundle_processor, operation_specs
 from apache_beam.utils import proto_utils
 
+from pyflink import fn_execution
+
+if fn_execution.PYFLINK_CYTHON_ENABLED:
+    try:
+        import pyflink.fn_execution.beam.beam_operations_fast as 
beam_operations
+    except:
+        import pyflink.fn_execution.beam.beam_operations_slow as 
beam_operations
+        fn_execution.PYFLINK_CYTHON_ENABLED = False
+else:
+    import pyflink.fn_execution.beam.beam_operations_slow as beam_operations
+
 from pyflink.fn_execution import flink_fn_execution_pb2
 from pyflink.fn_execution.coders import from_proto, from_type_info_proto, 
TimeWindowCoder, \
     CountWindowCoder, FlattenRowCoder
@@ -30,12 +41,6 @@ import pyflink.fn_execution.datastream.operations as 
datastream_operations
 from pyflink.fn_execution.datastream.process import operations
 import pyflink.fn_execution.table.operations as table_operations
 
-try:
-    import pyflink.fn_execution.beam.beam_operations_fast as beam_operations
-except ImportError:
-    import pyflink.fn_execution.beam.beam_operations_slow as beam_operations
-
-
 # ----------------- UDF --------------------
 
 
diff --git a/flink-python/pyflink/fn_execution/coders.py 
b/flink-python/pyflink/fn_execution/coders.py
index fa8ccdd5aac..0f4a822ca6c 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -21,8 +21,19 @@ from abc import ABC, abstractmethod
 from typing import Union
 
 import pytz
-from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, 
AvroSchema
 
+from pyflink import fn_execution
+
+if fn_execution.PYFLINK_CYTHON_ENABLED:
+    try:
+        from pyflink.fn_execution import coder_impl_fast as coder_impl
+    except:
+        from pyflink.fn_execution import coder_impl_slow as coder_impl
+        fn_execution.PYFLINK_CYTHON_ENABLED = False
+else:
+    from pyflink.fn_execution import coder_impl_slow as coder_impl
+
+from pyflink.datastream.formats.avro import GenericRecordAvroTypeInfo, 
AvroSchema
 from pyflink.common.typeinfo import TypeInformation, BasicTypeInfo, BasicType, 
DateTypeInfo, \
     TimeTypeInfo, TimestampTypeInfo, PrimitiveArrayTypeInfo, 
BasicArrayTypeInfo, TupleTypeInfo, \
     MapTypeInfo, ListTypeInfo, RowTypeInfo, PickledBytesTypeInfo, 
ObjectArrayTypeInfo, \
@@ -32,10 +43,6 @@ from pyflink.table.types import TinyIntType, SmallIntType, 
IntType, BigIntType,
     LocalZonedTimestampType, RowType, RowField, to_arrow_type, TimestampType, 
ArrayType, MapType, \
     BinaryType, NullType
 
-try:
-    from pyflink.fn_execution import coder_impl_fast as coder_impl
-except:
-    from pyflink.fn_execution import coder_impl_slow as coder_impl
 
 __all__ = ['FlattenRowCoder', 'RowCoder', 'BigIntCoder', 'TinyIntCoder', 
'BooleanCoder',
            'SmallIntCoder', 'IntCoder', 'FloatCoder', 'DoubleCoder', 
'BinaryCoder', 'CharCoder',
diff --git a/flink-python/pyflink/fn_execution/table/operations.py 
b/flink-python/pyflink/fn_execution/table/operations.py
index 74fa35031cf..d897095ece7 100644
--- a/flink-python/pyflink/fn_execution/table/operations.py
+++ b/flink-python/pyflink/fn_execution/table/operations.py
@@ -20,6 +20,30 @@ from functools import reduce
 from itertools import chain
 from typing import Tuple
 
+from pyflink import fn_execution
+
+if fn_execution.PYFLINK_CYTHON_ENABLED:
+    try:
+        from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \
+            SimpleAggsHandleFunction, GroupAggFunction, 
DistinctViewDescriptor, \
+            SimpleTableAggsHandleFunction, GroupTableAggFunction
+        from pyflink.fn_execution.table.window_aggregate_fast import \
+            SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction
+        from pyflink.fn_execution.coder_impl_fast import InternalRow
+    except:
+        from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \
+            SimpleAggsHandleFunction, GroupAggFunction, 
DistinctViewDescriptor, \
+            SimpleTableAggsHandleFunction, GroupTableAggFunction
+        from pyflink.fn_execution.table.window_aggregate_slow import \
+            SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction
+        fn_execution.PYFLINK_CYTHON_ENABLED = False
+else:
+    from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \
+        SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \
+        SimpleTableAggsHandleFunction, GroupTableAggFunction
+    from pyflink.fn_execution.table.window_aggregate_slow import \
+        SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction
+
 from pyflink.fn_execution.coders import DataViewFilterCoder, PickleCoder
 from pyflink.fn_execution.datastream.timerservice import InternalTimer
 from pyflink.fn_execution.datastream.operations import Operation
@@ -36,21 +60,6 @@ from pyflink.fn_execution.utils import operation_utils
 from pyflink.fn_execution.utils.operation_utils import 
extract_user_defined_aggregate_function
 from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup
 
-try:
-    from pyflink.fn_execution.table.aggregate_fast import RowKeySelector, \
-        SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \
-        SimpleTableAggsHandleFunction, GroupTableAggFunction
-    from pyflink.fn_execution.table.window_aggregate_fast import \
-        SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction
-    from pyflink.fn_execution.coder_impl_fast import InternalRow
-    has_cython = True
-except ImportError:
-    from pyflink.fn_execution.table.aggregate_slow import RowKeySelector, \
-        SimpleAggsHandleFunction, GroupAggFunction, DistinctViewDescriptor, \
-        SimpleTableAggsHandleFunction, GroupTableAggFunction
-    from pyflink.fn_execution.table.window_aggregate_slow import \
-        SimpleNamespaceAggsHandleFunction, GroupWindowAggFunction
-    has_cython = False
 
 from pyflink.table import FunctionContext, Row
 
@@ -378,13 +387,13 @@ class 
AbstractStreamGroupAggregateOperation(BaseStatefulOperation):
         # [element_type, element(for process_element), timestamp(for timer), 
key(for timer)]
         # all the fields are nullable except the "element_type"
         if input_data[0] == NORMAL_RECORD:
-            if has_cython:
+            if fn_execution.PYFLINK_CYTHON_ENABLED:
                 row = InternalRow.from_row(input_data[1])
             else:
                 row = input_data[1]
             self.group_agg_function.process_element(row)
         else:
-            if has_cython:
+            if fn_execution.PYFLINK_CYTHON_ENABLED:
                 timer = InternalRow.from_row(input_data[3])
             else:
                 timer = input_data[3]
@@ -521,7 +530,7 @@ class 
StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation)
     def process_element_or_timer(self, input_data: Tuple[int, Row, int, int, 
Row]):
         if input_data[0] == NORMAL_RECORD:
             self.group_agg_function.process_watermark(input_data[3])
-            if has_cython:
+            if fn_execution.PYFLINK_CYTHON_ENABLED:
                 input_row = InternalRow.from_row(input_data[1])
             else:
                 input_row = input_data[1]

Reply via email to