This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new efff43cb274 [FLINK-28452][python] Support "Reduce" and "aggregate" operations of "window_all" in Python datastream API efff43cb274 is described below commit efff43cb2740db8b24d295299a2f73451a56658f Author: zhangjingcun <cun8c...@163.com> AuthorDate: Fri Jul 8 16:25:19 2022 +0800 [FLINK-28452][python] Support "Reduce" and "aggregate" operations of "window_all" in Python datastream API This closes #20219. --- flink-python/pyflink/datastream/data_stream.py | 127 ++++++++++++++++++- flink-python/pyflink/datastream/functions.py | 56 +++++++++ .../pyflink/datastream/tests/test_window.py | 136 +++++++++++++++++++++ 3 files changed, 318 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 98ee3060a1d..1c808d45e89 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -40,7 +40,11 @@ from pyflink.datastream.functions import (_get_python_env, FlatMapFunction, MapF InternalIterableAllWindowFunction, ProcessAllWindowFunction, InternalIterableProcessAllWindowFunction, - BroadcastProcessFunction, KeyedBroadcastProcessFunction) + BroadcastProcessFunction, + KeyedBroadcastProcessFunction, + InternalSingleValueAllWindowFunction, + PassThroughAllWindowFunction, + InternalSingleValueProcessAllWindowFunction) from pyflink.datastream.output_tag import OutputTag from pyflink.datastream.slot_sharing_group import SlotSharingGroup from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor, \ @@ -2080,6 +2084,127 @@ class AllWindowedStream(object): self._late_data_output_tag = output_tag return self + def reduce(self, + reduce_function: Union[Callable, ReduceFunction], + window_function: Union[AllWindowFunction, ProcessAllWindowFunction] = None, + output_type: TypeInformation = None) -> DataStream: + """ + Applies the given window function to each window. The window function is called for each + evaluation of the window for each key individually. The output of the window function is + interpreted as a regular non-windowed stream. + + Arriving data is incrementally aggregated using the given reducer. + + Example: + :: + + >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\ + ... .reduce(lambda a, b: a[0] + b[0], b[1]) + + :param reduce_function: The reduce function. + :param window_function: The window function. + :param output_type: Type information for the result type of the window function. + :return: The data stream that is the result of applying the reduce function to the window. + + .. versionadded:: 1.16.0 + """ + if window_function is None: + internal_window_function = InternalSingleValueAllWindowFunction( + PassThroughAllWindowFunction()) # type: InternalWindowFunction + if output_type is None: + output_type = self.get_input_type() + elif isinstance(window_function, AllWindowFunction): + internal_window_function = InternalSingleValueAllWindowFunction(window_function) + elif isinstance(window_function, ProcessAllWindowFunction): + internal_window_function = InternalSingleValueProcessAllWindowFunction(window_function) + else: + raise TypeError("window_function should be a AllWindowFunction or " + "ProcessAllWindowFunction") + + reducing_state_descriptor = ReducingStateDescriptor(WINDOW_STATE_NAME, + reduce_function, + self.get_input_type()) + func_desc = type(reduce_function).__name__ + if window_function is not None: + func_desc = "%s, %s" % (func_desc, type(window_function).__name__) + + return self._get_result_data_stream(internal_window_function, + reducing_state_descriptor, + func_desc, + output_type) + + def aggregate(self, + aggregate_function: AggregateFunction, + window_function: Union[AllWindowFunction, ProcessAllWindowFunction] = None, + accumulator_type: TypeInformation = None, + output_type: TypeInformation = None) -> DataStream: + """ + Applies the given window function to each window. The window function is called for each + evaluation of the window for each key individually. The output of the window function is + interpreted as a regular non-windowed stream. + + Arriving data is incrementally aggregated using the given aggregate function. This means + that the window function typically has only a single value to process when called. + + Example: + :: + + >>> class AverageAggregate(AggregateFunction): + ... def create_accumulator(self) -> Tuple[int, int]: + ... return 0, 0 + ... + ... def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) \\ + ... -> Tuple[int, int]: + ... return accumulator[0] + value[1], accumulator[1] + 1 + ... + ... def get_result(self, accumulator: Tuple[int, int]) -> float: + ... return accumulator[0] / accumulator[1] + ... + ... def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]: + ... return a[0] + b[0], a[1] + b[1] + ... + >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\ + ... .aggregate(AverageAggregate(), + ... accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]), + ... output_type=Types.DOUBLE()) + + :param aggregate_function: The aggregation function that is used for incremental + aggregation. + :param window_function: The window function. + :param accumulator_type: Type information for the internal accumulator type of the + aggregation function. + :param output_type: Type information for the result type of the window function. + :return: The data stream that is the result of applying the window function to the window. + + .. versionadded:: 1.16.0 + """ + if window_function is None: + internal_window_function = InternalSingleValueAllWindowFunction( + PassThroughAllWindowFunction()) # type: InternalWindowFunction + elif isinstance(window_function, AllWindowFunction): + internal_window_function = InternalSingleValueAllWindowFunction(window_function) + elif isinstance(window_function, ProcessAllWindowFunction): + internal_window_function = InternalSingleValueProcessAllWindowFunction(window_function) + else: + raise TypeError("window_function should be a AllWindowFunction or " + "ProcessAllWindowFunction") + + if accumulator_type is None: + accumulator_type = Types.PICKLED_BYTE_ARRAY() + elif isinstance(accumulator_type, list): + accumulator_type = RowTypeInfo(accumulator_type) + + aggregating_state_descriptor = AggregatingStateDescriptor(WINDOW_STATE_NAME, + aggregate_function, + accumulator_type) + func_desc = type(aggregate_function).__name__ + if window_function is not None: + func_desc = "%s, %s" % (func_desc, type(window_function).__name__) + return self._get_result_data_stream(internal_window_function, + aggregating_state_descriptor, + func_desc, + output_type) + def apply(self, window_function: AllWindowFunction, output_type: TypeInformation = None) -> DataStream: diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py index 196db408c00..3b1c2fd3b0b 100644 --- a/flink-python/pyflink/datastream/functions.py +++ b/flink-python/pyflink/datastream/functions.py @@ -1078,6 +1078,12 @@ class PassThroughWindowFunction(WindowFunction[IN, IN, KEY, W]): yield from inputs +class PassThroughAllWindowFunction(AllWindowFunction[IN, IN, W]): + + def apply(self, window: W, inputs: Iterable[IN]) -> Iterable[IN]: + yield from inputs + + class InternalWindowFunction(Function, Generic[IN, OUT, KEY, W]): class InternalWindowContext(ABC): @@ -1133,6 +1139,28 @@ class InternalSingleValueWindowFunction(InternalWindowFunction[IN, OUT, KEY, W]) pass +class InternalSingleValueAllWindowFunction(InternalWindowFunction[IN, OUT, int, W]): + + def __init__(self, wrapped_function: AllWindowFunction): + self._wrapped_function = wrapped_function + + def open(self, runtime_context: RuntimeContext): + self._wrapped_function.open(runtime_context) + + def close(self): + self._wrapped_function.close() + + def process(self, + key: int, + window: W, + context: InternalWindowFunction.InternalWindowContext, + input_data: IN) -> Iterable[OUT]: + return self._wrapped_function.apply(window, [input_data]) + + def clear(self, window: W, context: InternalWindowFunction.InternalWindowContext): + pass + + class InternalIterableWindowFunction(InternalWindowFunction[Iterable[IN], OUT, KEY, W]): def __init__(self, wrapped_function: WindowFunction): @@ -1247,6 +1275,34 @@ class InternalSingleValueProcessWindowFunction(InternalWindowFunction[IN, OUT, K self._wrapped_function.clear(self._internal_context) +class InternalSingleValueProcessAllWindowFunction(InternalWindowFunction[IN, OUT, int, W]): + + def __init__(self, wrapped_function: ProcessAllWindowFunction): + self._wrapped_function = wrapped_function + self._internal_context = \ + InternalProcessAllWindowContext() # type: InternalProcessAllWindowContext + + def open(self, runtime_context: RuntimeContext): + self._wrapped_function.open(runtime_context) + + def close(self): + self._wrapped_function.close() + + def process(self, + key: int, + window: W, + context: InternalWindowFunction.InternalWindowContext, + input_data: IN) -> Iterable[OUT]: + self._internal_context._window = window + self._internal_context._underlying = context + return self._wrapped_function.process(self._internal_context, [input_data]) + + def clear(self, window: W, context: InternalWindowFunction.InternalWindowContext): + self._internal_context._window = window + self._internal_context._underlying = context + self._wrapped_function.clear(self._internal_context) + + class InternalIterableProcessWindowFunction(InternalWindowFunction[Iterable[IN], OUT, KEY, W]): def __init__(self, wrapped_function: ProcessWindowFunction): diff --git a/flink-python/pyflink/datastream/tests/test_window.py b/flink-python/pyflink/datastream/tests/test_window.py index 6ab159584d3..b6433f409ce 100644 --- a/flink-python/pyflink/datastream/tests/test_window.py +++ b/flink-python/pyflink/datastream/tests/test_window.py @@ -453,6 +453,142 @@ class WindowTests(PyFlinkStreamingTestCase): expected = ['(0,5,4)', '(15,20,1)', '(5,10,3)'] self.assert_equals_sorted(expected, results) + def test_window_all_reduce(self): + self.env.set_parallelism(1) + data_stream = self.env.from_collection([ + ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(SecondColumnTimestampAssigner()) + data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ + .reduce(lambda a, b: (a[0], a[1] + b[1]), + output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ + .add_sink(self.test_sink) + + self.env.execute('test_window_all_reduce') + results = self.test_sink.get_results() + expected = ['(a,15)', '(a,6)', '(a,23)'] + self.assert_equals_sorted(expected, results) + + def test_window_all_reduce_process(self): + self.env.set_parallelism(1) + data_stream = self.env.from_collection([ + ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(SecondColumnTimestampAssigner()) + + class MyProcessFunction(ProcessAllWindowFunction): + + def process(self, context: 'ProcessAllWindowFunction.Context', + elements: Iterable[Tuple[str, int]]) -> Iterable[str]: + yield "current window start at {}, reduce result {}".format( + context.window().start, + next(iter(elements)), + ) + + data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ + .reduce(lambda a, b: (a[0], a[1] + b[1]), + window_function=MyProcessFunction(), + output_type=Types.STRING()) \ + .add_sink(self.test_sink) + + self.env.execute('test_window_all_reduce_process') + results = self.test_sink.get_results() + expected = ["current window start at 1, reduce result ('a', 6)", + "current window start at 6, reduce result ('a', 23)", + "current window start at 15, reduce result ('a', 15)"] + self.assert_equals_sorted(expected, results) + + def test_window_all_aggregate(self): + self.env.set_parallelism(1) + data_stream = self.env.from_collection([ + ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(SecondColumnTimestampAssigner()) + + class MyAggregateFunction(AggregateFunction): + + def create_accumulator(self) -> Tuple[str, Dict[int, int]]: + return '', {0: 0, 1: 0} + + def add(self, value: Tuple[str, int], accumulator: Tuple[str, Dict[int, int]] + ) -> Tuple[str, Dict[int, int]]: + number_map = accumulator[1] + number_map[value[1] % 2] += 1 + return value[0], number_map + + def get_result(self, accumulator: Tuple[str, Dict[int, int]]) -> Tuple[str, int]: + number_map = accumulator[1] + return accumulator[0], number_map[0] - number_map[1] + + def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: Tuple[str, Dict[int, int]] + ) -> Tuple[str, Dict[int, int]]: + number_map_a = acc_a[1] + number_map_b = acc_b[1] + new_number_map = { + 0: number_map_a[0] + number_map_b[0], + 1: number_map_a[1] + number_map_b[1] + } + return acc_a[0], new_number_map + + data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ + .aggregate(MyAggregateFunction(), + output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \ + .add_sink(self.test_sink) + + self.env.execute('test_window_all_aggregate') + results = self.test_sink.get_results() + expected = ['(a,-1)', '(b,-1)', '(b,1)'] + self.assert_equals_sorted(expected, results) + + def test_window_all_aggregate_process(self): + self.env.set_parallelism(1) + data_stream = self.env.from_collection([ + ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(SecondColumnTimestampAssigner()) + + class MyAggregateFunction(AggregateFunction): + def create_accumulator(self) -> Tuple[int, str]: + return 0, '' + + def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]: + return value[1] + accumulator[0], value[0] + + def get_result(self, accumulator: Tuple[str, int]): + return accumulator[1], accumulator[0] + + def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): + return acc_a[0] + acc_b[0], acc_a[1] + + class MyProcessWindowFunction(ProcessAllWindowFunction): + + def process(self, context: ProcessAllWindowFunction.Context, + elements: Iterable[Tuple[str, int]]) -> Iterable[str]: + agg_result = next(iter(elements)) + yield "key {} timestamp sum {}".format(agg_result[0], agg_result[1]) + + data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ + .aggregate(MyAggregateFunction(), + window_function=MyProcessWindowFunction(), + accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]), + output_type=Types.STRING()) \ + .add_sink(self.test_sink) + + self.env.execute('test_window_all_aggregate_process') + results = self.test_sink.get_results() + expected = ['key b timestamp sum 6', + 'key b timestamp sum 23', + 'key a timestamp sum 15'] + self.assert_equals_sorted(expected, results) + class SecondColumnTimestampAssigner(TimestampAssigner):