HuangXingBo commented on a change in pull request #15083:
URL: https://github.com/apache/flink/pull/15083#discussion_r588212168



##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
##########
@@ -657,6 +659,12 @@ public static IterateType fromOrd(byte ord) {
                 TypeSerializer keySerializer,
                 Map<String, String> config) {
             this.keyedStateBackend = keyedStateBackend;
+            TypeSerializer frameworkKeySerializer = 
keyedStateBackend.getKeySerializer();
+            if (!(frameworkKeySerializer instanceof AbstractRowDataSerializer
+                    || frameworkKeySerializer instanceof RowSerializer)) {

Review comment:
       I think we only need to check `AbstractRowDataSerializer`

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -811,19 +849,55 @@ def reduce(self, func: Union[Callable, ReduceFunction]) 
-> 'DataStream':
                 func = ReduceFunctionWrapper(func)  # type: ignore
             else:
                 raise TypeError("The input must be a ReduceFunction or a 
callable function!")
+        output_type = 
_from_java_type(self._original_data_type_info.get_java_type_info())
 
-        from pyflink.fn_execution.flink_fn_execution_pb2 import 
UserDefinedDataStreamFunction
-        j_operator, j_output_type_info = \
-            _get_one_input_stream_operator(
-                self, func, UserDefinedDataStreamFunction.REDUCE)  # type: 
ignore
-        return DataStream(self._j_data_stream.transform(
-            "Keyed Reduce",
-            j_output_type_info,
-            j_operator
-        ))
+        class KeyedReduceFunctionWrapper(KeyedProcessFunction):
+
+            def __init__(self, underlying: ReduceFunction):
+                self._underlying = underlying
+                self._reduce_state_name = "_reduce_state" + str(uuid.uuid4())
+                self._reduce_value_state = None  # type: ValueState
+
+            def open(self, runtime_context: RuntimeContext):
+                self._reduce_value_state = runtime_context.get_state(
+                    ValueStateDescriptor(self._reduce_state_name, output_type))
+                self._underlying.open(runtime_context)
+
+            def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
+                reduce_value = self._reduce_value_state.value()
+                if reduce_value is not None:
+                    reduce_value = self._underlying.reduce(reduce_value, value)
+                else:
+                    reduce_value = value
+                self._reduce_value_state.update(reduce_value)
+                return [reduce_value]
+
+        return self.process(KeyedReduceFunctionWrapper(func), output_type)  # 
type: ignore
 
     def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
-        return self._values().filter(func)
+        if callable(func):
+            func = FilterFunctionWrapper(func)  # type: ignore
+        elif not isinstance(func, FilterFunction):
+            raise TypeError("func must be a Callable or instance of 
FilterFunction.")

Review comment:
       ```suggestion
               raise TypeError("The input func must be a FilterFunction or a 
callable function!")
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -785,11 +785,49 @@ def __init__(self, j_keyed_stream, 
original_data_type_info, origin_stream: DataS
 
     def map(self, func: Union[Callable, MapFunction], output_type: 
TypeInformation = None) \
             -> 'DataStream':
-        return self._values().map(func, output_type)
+        if not isinstance(func, MapFunction):
+            if callable(func):
+                func = MapFunctionWrapper(func)  # type: ignore
+            else:
+                raise TypeError("The input must be a MapFunction or a callable 
function")
+
+        class KeyedMapFunctionWrapper(KeyedProcessFunction):
+
+            def __init__(self, underlying: MapFunction):
+                self._underlying = underlying
+
+            def open(self, runtime_context: RuntimeContext):
+                self._underlying.open(runtime_context)
+
+            def close(self):
+                self._underlying.close()
+
+            def process_element(self, value, ctx: 
'KeyedProcessFunction.Context'):
+                return [self._underlying.map(value)]
+        return self.process(KeyedMapFunctionWrapper(func), output_type)  # 
type: ignore
 
     def flat_map(self, func: Union[Callable, FlatMapFunction], result_type: 
TypeInformation = None)\
             -> 'DataStream':
-        return self._values().flat_map(func, result_type)
+        if not isinstance(func, FlatMapFunction):
+            if callable(func):
+                func = FlatMapFunctionWrapper(func)  # type: ignore
+            else:
+                raise TypeError("The input must be a FlatMapFunction or a 
callable function")

Review comment:
       ```suggestion
                   raise TypeError("The input func must be a FlatMapFunction or 
a callable function")
   ```

##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -785,11 +785,49 @@ def __init__(self, j_keyed_stream, 
original_data_type_info, origin_stream: DataS
 
     def map(self, func: Union[Callable, MapFunction], output_type: 
TypeInformation = None) \
             -> 'DataStream':
-        return self._values().map(func, output_type)
+        if not isinstance(func, MapFunction):
+            if callable(func):
+                func = MapFunctionWrapper(func)  # type: ignore
+            else:
+                raise TypeError("The input must be a MapFunction or a callable 
function")

Review comment:
       ```suggestion
                   raise TypeError("The input func must be a MapFunction or a 
callable function")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to