This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new efff43cb274 [FLINK-28452][python] Support "Reduce" and "aggregate" 
operations of "window_all" in Python datastream API
efff43cb274 is described below

commit efff43cb2740db8b24d295299a2f73451a56658f
Author: zhangjingcun <cun8c...@163.com>
AuthorDate: Fri Jul 8 16:25:19 2022 +0800

    [FLINK-28452][python] Support "Reduce" and "aggregate" operations of 
"window_all" in Python datastream API
    
    This closes #20219.
---
 flink-python/pyflink/datastream/data_stream.py     | 127 ++++++++++++++++++-
 flink-python/pyflink/datastream/functions.py       |  56 +++++++++
 .../pyflink/datastream/tests/test_window.py        | 136 +++++++++++++++++++++
 3 files changed, 318 insertions(+), 1 deletion(-)

diff --git a/flink-python/pyflink/datastream/data_stream.py 
b/flink-python/pyflink/datastream/data_stream.py
index 98ee3060a1d..1c808d45e89 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -40,7 +40,11 @@ from pyflink.datastream.functions import (_get_python_env, 
FlatMapFunction, MapF
                                           InternalIterableAllWindowFunction,
                                           ProcessAllWindowFunction,
                                           
InternalIterableProcessAllWindowFunction,
-                                          BroadcastProcessFunction, 
KeyedBroadcastProcessFunction)
+                                          BroadcastProcessFunction,
+                                          KeyedBroadcastProcessFunction,
+                                          InternalSingleValueAllWindowFunction,
+                                          PassThroughAllWindowFunction,
+                                          
InternalSingleValueProcessAllWindowFunction)
 from pyflink.datastream.output_tag import OutputTag
 from pyflink.datastream.slot_sharing_group import SlotSharingGroup
 from pyflink.datastream.state import ValueStateDescriptor, ValueState, 
ListStateDescriptor, \
@@ -2080,6 +2084,127 @@ class AllWindowedStream(object):
         self._late_data_output_tag = output_tag
         return self
 
+    def reduce(self,
+               reduce_function: Union[Callable, ReduceFunction],
+               window_function: Union[AllWindowFunction, 
ProcessAllWindowFunction] = None,
+               output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function 
is called for each
+        evaluation of the window for each key individually. The output of the 
window function is
+        interpreted as a regular non-windowed stream.
+
+        Arriving data is incrementally aggregated using the given reducer.
+
+        Example:
+        ::
+
+            >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\
+            ...   .reduce(lambda a, b: a[0] + b[0], b[1])
+
+        :param reduce_function: The reduce function.
+        :param window_function: The window function.
+        :param output_type: Type information for the result type of the window 
function.
+        :return: The data stream that is the result of applying the reduce 
function to the window.
+
+        .. versionadded:: 1.16.0
+        """
+        if window_function is None:
+            internal_window_function = InternalSingleValueAllWindowFunction(
+                PassThroughAllWindowFunction())  # type: InternalWindowFunction
+            if output_type is None:
+                output_type = self.get_input_type()
+        elif isinstance(window_function, AllWindowFunction):
+            internal_window_function = 
InternalSingleValueAllWindowFunction(window_function)
+        elif isinstance(window_function, ProcessAllWindowFunction):
+            internal_window_function = 
InternalSingleValueProcessAllWindowFunction(window_function)
+        else:
+            raise TypeError("window_function should be a AllWindowFunction or "
+                            "ProcessAllWindowFunction")
+
+        reducing_state_descriptor = ReducingStateDescriptor(WINDOW_STATE_NAME,
+                                                            reduce_function,
+                                                            
self.get_input_type())
+        func_desc = type(reduce_function).__name__
+        if window_function is not None:
+            func_desc = "%s, %s" % (func_desc, type(window_function).__name__)
+
+        return self._get_result_data_stream(internal_window_function,
+                                            reducing_state_descriptor,
+                                            func_desc,
+                                            output_type)
+
+    def aggregate(self,
+                  aggregate_function: AggregateFunction,
+                  window_function: Union[AllWindowFunction, 
ProcessAllWindowFunction] = None,
+                  accumulator_type: TypeInformation = None,
+                  output_type: TypeInformation = None) -> DataStream:
+        """
+        Applies the given window function to each window. The window function 
is called for each
+        evaluation of the window for each key individually. The output of the 
window function is
+        interpreted as a regular non-windowed stream.
+
+        Arriving data is incrementally aggregated using the given aggregate 
function. This means
+        that the window function typically has only a single value to process 
when called.
+
+        Example:
+        ::
+
+            >>> class AverageAggregate(AggregateFunction):
+            ...     def create_accumulator(self) -> Tuple[int, int]:
+            ...         return 0, 0
+            ...
+            ...     def add(self, value: Tuple[str, int], accumulator: 
Tuple[int, int]) \\
+            ...             -> Tuple[int, int]:
+            ...         return accumulator[0] + value[1], accumulator[1] + 1
+            ...
+            ...     def get_result(self, accumulator: Tuple[int, int]) -> 
float:
+            ...         return accumulator[0] / accumulator[1]
+            ...
+            ...     def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> 
Tuple[int, int]:
+            ...         return a[0] + b[0], a[1] + b[1]
+            ...
+            >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\
+            ...   .aggregate(AverageAggregate(),
+            ...              accumulator_type=Types.TUPLE([Types.LONG(), 
Types.LONG()]),
+            ...              output_type=Types.DOUBLE())
+
+        :param aggregate_function: The aggregation function that is used for 
incremental
+                                   aggregation.
+        :param window_function: The window function.
+        :param accumulator_type: Type information for the internal accumulator 
type of the
+                                 aggregation function.
+        :param output_type: Type information for the result type of the window 
function.
+        :return: The data stream that is the result of applying the window 
function to the window.
+
+        .. versionadded:: 1.16.0
+        """
+        if window_function is None:
+            internal_window_function = InternalSingleValueAllWindowFunction(
+                PassThroughAllWindowFunction())  # type: InternalWindowFunction
+        elif isinstance(window_function, AllWindowFunction):
+            internal_window_function = 
InternalSingleValueAllWindowFunction(window_function)
+        elif isinstance(window_function, ProcessAllWindowFunction):
+            internal_window_function = 
InternalSingleValueProcessAllWindowFunction(window_function)
+        else:
+            raise TypeError("window_function should be a AllWindowFunction or "
+                            "ProcessAllWindowFunction")
+
+        if accumulator_type is None:
+            accumulator_type = Types.PICKLED_BYTE_ARRAY()
+        elif isinstance(accumulator_type, list):
+            accumulator_type = RowTypeInfo(accumulator_type)
+
+        aggregating_state_descriptor = 
AggregatingStateDescriptor(WINDOW_STATE_NAME,
+                                                                  
aggregate_function,
+                                                                  
accumulator_type)
+        func_desc = type(aggregate_function).__name__
+        if window_function is not None:
+            func_desc = "%s, %s" % (func_desc, type(window_function).__name__)
+        return self._get_result_data_stream(internal_window_function,
+                                            aggregating_state_descriptor,
+                                            func_desc,
+                                            output_type)
+
     def apply(self,
               window_function: AllWindowFunction,
               output_type: TypeInformation = None) -> DataStream:
diff --git a/flink-python/pyflink/datastream/functions.py 
b/flink-python/pyflink/datastream/functions.py
index 196db408c00..3b1c2fd3b0b 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -1078,6 +1078,12 @@ class PassThroughWindowFunction(WindowFunction[IN, IN, 
KEY, W]):
         yield from inputs
 
 
+class PassThroughAllWindowFunction(AllWindowFunction[IN, IN, W]):
+
+    def apply(self, window: W, inputs: Iterable[IN]) -> Iterable[IN]:
+        yield from inputs
+
+
 class InternalWindowFunction(Function, Generic[IN, OUT, KEY, W]):
 
     class InternalWindowContext(ABC):
@@ -1133,6 +1139,28 @@ class 
InternalSingleValueWindowFunction(InternalWindowFunction[IN, OUT, KEY, W])
         pass
 
 
+class InternalSingleValueAllWindowFunction(InternalWindowFunction[IN, OUT, 
int, W]):
+
+    def __init__(self, wrapped_function: AllWindowFunction):
+        self._wrapped_function = wrapped_function
+
+    def open(self, runtime_context: RuntimeContext):
+        self._wrapped_function.open(runtime_context)
+
+    def close(self):
+        self._wrapped_function.close()
+
+    def process(self,
+                key: int,
+                window: W,
+                context: InternalWindowFunction.InternalWindowContext,
+                input_data: IN) -> Iterable[OUT]:
+        return self._wrapped_function.apply(window, [input_data])
+
+    def clear(self, window: W, context: 
InternalWindowFunction.InternalWindowContext):
+        pass
+
+
 class InternalIterableWindowFunction(InternalWindowFunction[Iterable[IN], OUT, 
KEY, W]):
 
     def __init__(self, wrapped_function: WindowFunction):
@@ -1247,6 +1275,34 @@ class 
InternalSingleValueProcessWindowFunction(InternalWindowFunction[IN, OUT, K
         self._wrapped_function.clear(self._internal_context)
 
 
+class InternalSingleValueProcessAllWindowFunction(InternalWindowFunction[IN, 
OUT, int, W]):
+
+    def __init__(self, wrapped_function: ProcessAllWindowFunction):
+        self._wrapped_function = wrapped_function
+        self._internal_context = \
+            InternalProcessAllWindowContext()  # type: 
InternalProcessAllWindowContext
+
+    def open(self, runtime_context: RuntimeContext):
+        self._wrapped_function.open(runtime_context)
+
+    def close(self):
+        self._wrapped_function.close()
+
+    def process(self,
+                key: int,
+                window: W,
+                context: InternalWindowFunction.InternalWindowContext,
+                input_data: IN) -> Iterable[OUT]:
+        self._internal_context._window = window
+        self._internal_context._underlying = context
+        return self._wrapped_function.process(self._internal_context, 
[input_data])
+
+    def clear(self, window: W, context: 
InternalWindowFunction.InternalWindowContext):
+        self._internal_context._window = window
+        self._internal_context._underlying = context
+        self._wrapped_function.clear(self._internal_context)
+
+
 class 
InternalIterableProcessWindowFunction(InternalWindowFunction[Iterable[IN], OUT, 
KEY, W]):
 
     def __init__(self, wrapped_function: ProcessWindowFunction):
diff --git a/flink-python/pyflink/datastream/tests/test_window.py 
b/flink-python/pyflink/datastream/tests/test_window.py
index 6ab159584d3..b6433f409ce 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -453,6 +453,142 @@ class WindowTests(PyFlinkStreamingTestCase):
         expected = ['(0,5,4)', '(15,20,1)', '(5,10,3)']
         self.assert_equals_sorted(expected, results)
 
+    def test_window_all_reduce(self):
+        self.env.set_parallelism(1)
+        data_stream = self.env.from_collection([
+            ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
15)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
+            .reduce(lambda a, b: (a[0], a[1] + b[1]),
+                    output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
+            .add_sink(self.test_sink)
+
+        self.env.execute('test_window_all_reduce')
+        results = self.test_sink.get_results()
+        expected = ['(a,15)', '(a,6)', '(a,23)']
+        self.assert_equals_sorted(expected, results)
+
+    def test_window_all_reduce_process(self):
+        self.env.set_parallelism(1)
+        data_stream = self.env.from_collection([
+            ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
15)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        class MyProcessFunction(ProcessAllWindowFunction):
+
+            def process(self, context: 'ProcessAllWindowFunction.Context',
+                        elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
+                yield "current window start at {}, reduce result {}".format(
+                    context.window().start,
+                    next(iter(elements)),
+                )
+
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
+            .reduce(lambda a, b: (a[0], a[1] + b[1]),
+                    window_function=MyProcessFunction(),
+                    output_type=Types.STRING()) \
+            .add_sink(self.test_sink)
+
+        self.env.execute('test_window_all_reduce_process')
+        results = self.test_sink.get_results()
+        expected = ["current window start at 1, reduce result ('a', 6)",
+                    "current window start at 6, reduce result ('a', 23)",
+                    "current window start at 15, reduce result ('a', 15)"]
+        self.assert_equals_sorted(expected, results)
+
+    def test_window_all_aggregate(self):
+        self.env.set_parallelism(1)
+        data_stream = self.env.from_collection([
+            ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
15)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        class MyAggregateFunction(AggregateFunction):
+
+            def create_accumulator(self) -> Tuple[str, Dict[int, int]]:
+                return '', {0: 0, 1: 0}
+
+            def add(self, value: Tuple[str, int], accumulator: Tuple[str, 
Dict[int, int]]
+                    ) -> Tuple[str, Dict[int, int]]:
+                number_map = accumulator[1]
+                number_map[value[1] % 2] += 1
+                return value[0], number_map
+
+            def get_result(self, accumulator: Tuple[str, Dict[int, int]]) -> 
Tuple[str, int]:
+                number_map = accumulator[1]
+                return accumulator[0], number_map[0] - number_map[1]
+
+            def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: 
Tuple[str, Dict[int, int]]
+                      ) -> Tuple[str, Dict[int, int]]:
+                number_map_a = acc_a[1]
+                number_map_b = acc_b[1]
+                new_number_map = {
+                    0: number_map_a[0] + number_map_b[0],
+                    1: number_map_a[1] + number_map_b[1]
+                }
+                return acc_a[0], new_number_map
+
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
+            .aggregate(MyAggregateFunction(),
+                       output_type=Types.TUPLE([Types.STRING(), Types.INT()])) 
\
+            .add_sink(self.test_sink)
+
+        self.env.execute('test_window_all_aggregate')
+        results = self.test_sink.get_results()
+        expected = ['(a,-1)', '(b,-1)', '(b,1)']
+        self.assert_equals_sorted(expected, results)
+
+    def test_window_all_aggregate_process(self):
+        self.env.set_parallelism(1)
+        data_stream = self.env.from_collection([
+            ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
15)],
+            type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
DataStream
+        watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+            .with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        class MyAggregateFunction(AggregateFunction):
+            def create_accumulator(self) -> Tuple[int, str]:
+                return 0, ''
+
+            def add(self, value: Tuple[str, int], accumulator: Tuple[int, 
str]) -> Tuple[int, str]:
+                return value[1] + accumulator[0], value[0]
+
+            def get_result(self, accumulator: Tuple[str, int]):
+                return accumulator[1], accumulator[0]
+
+            def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
+                return acc_a[0] + acc_b[0], acc_a[1]
+
+        class MyProcessWindowFunction(ProcessAllWindowFunction):
+
+            def process(self, context: ProcessAllWindowFunction.Context,
+                        elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
+                agg_result = next(iter(elements))
+                yield "key {} timestamp sum {}".format(agg_result[0], 
agg_result[1])
+
+        data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+            
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
+            .aggregate(MyAggregateFunction(),
+                       window_function=MyProcessWindowFunction(),
+                       accumulator_type=Types.TUPLE([Types.INT(), 
Types.STRING()]),
+                       output_type=Types.STRING()) \
+            .add_sink(self.test_sink)
+
+        self.env.execute('test_window_all_aggregate_process')
+        results = self.test_sink.get_results()
+        expected = ['key b timestamp sum 6',
+                    'key b timestamp sum 23',
+                    'key a timestamp sum 15']
+        self.assert_equals_sorted(expected, results)
+
 
 class SecondColumnTimestampAssigner(TimestampAssigner):
 

Reply via email to