HuangXingBo commented on a change in pull request #15083: URL: https://github.com/apache/flink/pull/15083#discussion_r588212168
########## File path: flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java ########## @@ -657,6 +659,12 @@ public static IterateType fromOrd(byte ord) { TypeSerializer keySerializer, Map<String, String> config) { this.keyedStateBackend = keyedStateBackend; + TypeSerializer frameworkKeySerializer = keyedStateBackend.getKeySerializer(); + if (!(frameworkKeySerializer instanceof AbstractRowDataSerializer + || frameworkKeySerializer instanceof RowSerializer)) { Review comment: I think we only need to check `AbstractRowDataSerializer` ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -811,19 +849,55 @@ def reduce(self, func: Union[Callable, ReduceFunction]) -> 'DataStream': func = ReduceFunctionWrapper(func) # type: ignore else: raise TypeError("The input must be a ReduceFunction or a callable function!") + output_type = _from_java_type(self._original_data_type_info.get_java_type_info()) - from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction - j_operator, j_output_type_info = \ - _get_one_input_stream_operator( - self, func, UserDefinedDataStreamFunction.REDUCE) # type: ignore - return DataStream(self._j_data_stream.transform( - "Keyed Reduce", - j_output_type_info, - j_operator - )) + class KeyedReduceFunctionWrapper(KeyedProcessFunction): + + def __init__(self, underlying: ReduceFunction): + self._underlying = underlying + self._reduce_state_name = "_reduce_state" + str(uuid.uuid4()) + self._reduce_value_state = None # type: ValueState + + def open(self, runtime_context: RuntimeContext): + self._reduce_value_state = runtime_context.get_state( + ValueStateDescriptor(self._reduce_state_name, output_type)) + self._underlying.open(runtime_context) + + def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): + reduce_value = self._reduce_value_state.value() + if reduce_value is not None: + reduce_value = self._underlying.reduce(reduce_value, value) + else: + reduce_value = value + self._reduce_value_state.update(reduce_value) + return [reduce_value] + + return self.process(KeyedReduceFunctionWrapper(func), output_type) # type: ignore def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream': - return self._values().filter(func) + if callable(func): + func = FilterFunctionWrapper(func) # type: ignore + elif not isinstance(func, FilterFunction): + raise TypeError("func must be a Callable or instance of FilterFunction.") Review comment: ```suggestion raise TypeError("The input func must be a FilterFunction or a callable function!") ``` ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -785,11 +785,49 @@ def __init__(self, j_keyed_stream, original_data_type_info, origin_stream: DataS def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \ -> 'DataStream': - return self._values().map(func, output_type) + if not isinstance(func, MapFunction): + if callable(func): + func = MapFunctionWrapper(func) # type: ignore + else: + raise TypeError("The input must be a MapFunction or a callable function") + + class KeyedMapFunctionWrapper(KeyedProcessFunction): + + def __init__(self, underlying: MapFunction): + self._underlying = underlying + + def open(self, runtime_context: RuntimeContext): + self._underlying.open(runtime_context) + + def close(self): + self._underlying.close() + + def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): + return [self._underlying.map(value)] + return self.process(KeyedMapFunctionWrapper(func), output_type) # type: ignore def flat_map(self, func: Union[Callable, FlatMapFunction], result_type: TypeInformation = None)\ -> 'DataStream': - return self._values().flat_map(func, result_type) + if not isinstance(func, FlatMapFunction): + if callable(func): + func = FlatMapFunctionWrapper(func) # type: ignore + else: + raise TypeError("The input must be a FlatMapFunction or a callable function") Review comment: ```suggestion raise TypeError("The input func must be a FlatMapFunction or a callable function") ``` ########## File path: flink-python/pyflink/datastream/data_stream.py ########## @@ -785,11 +785,49 @@ def __init__(self, j_keyed_stream, original_data_type_info, origin_stream: DataS def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \ -> 'DataStream': - return self._values().map(func, output_type) + if not isinstance(func, MapFunction): + if callable(func): + func = MapFunctionWrapper(func) # type: ignore + else: + raise TypeError("The input must be a MapFunction or a callable function") Review comment: ```suggestion raise TypeError("The input func must be a MapFunction or a callable function") ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org