This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new db23add [FLINK-26969][python][examples] Add a few examples of window operation in Python DataStream API db23add is described below commit db23add96305e163328283859fb644af72350018 Author: zhangjingcun <cun8c...@163.com> AuthorDate: Fri Apr 1 19:51:04 2022 +0800 [FLINK-26969][python][examples] Add a few examples of window operation in Python DataStream API This closes #19328. --- .../pyflink/datastream/tests/test_window.py | 2 +- .../examples/datastream/windowing/__init__.py | 17 ++++ .../windowing/session_with_dynamic_gap_window.py | 103 +++++++++++++++++++++ .../windowing/session_with_gap_window.py | 103 +++++++++++++++++++++ .../datastream/windowing/sliding_time_window.py | 97 +++++++++++++++++++ .../datastream/windowing/tumbling_count_window.py | 82 ++++++++++++++++ .../datastream/windowing/tumbling_time_window.py | 97 +++++++++++++++++++ 7 files changed, 500 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/datastream/tests/test_window.py b/flink-python/pyflink/datastream/tests/test_window.py index 72d7c0d..b8cdf7e 100644 --- a/flink-python/pyflink/datastream/tests/test_window.py +++ b/flink-python/pyflink/datastream/tests/test_window.py @@ -372,7 +372,7 @@ class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountW def process(self, key: str, - content: ProcessWindowFunction.Context, + context: ProcessWindowFunction.Context, elements: Iterable[tuple]) -> Iterable[tuple]: return [(key, len([e for e in elements]))] diff --git a/flink-python/pyflink/examples/datastream/windowing/__init__.py b/flink-python/pyflink/examples/datastream/windowing/__init__.py new file mode 100644 index 0000000..65b48d4 --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py new file mode 100644 index 0000000..c2f495b --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py @@ -0,0 +1,103 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import argparse +from typing import Iterable + +from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy + +from pyflink.common import Types, WatermarkStrategy, Encoder +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction +from pyflink.datastream.window import EventTimeSessionWindows, \ + SessionWindowTimeGapExtractor, TimeWindow + + +class MyTimestampAssigner(TimestampAssigner): + def extract_timestamp(self, value, record_timestamp) -> int: + return int(value[1]) + + +class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor): + def extract(self, element: tuple) -> int: + return element[1] + + +class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]): + def process(self, + key: str, + context: ProcessWindowFunction.Context[TimeWindow], + elements: Iterable[tuple]) -> Iterable[tuple]: + return [(key, context.window().start, context.window().end, len([e for e in elements]))] + + def clear(self, context: ProcessWindowFunction.Context) -> None: + pass + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output file to write results to.') + + argv = sys.argv[1:] + known_args, _ = parser.parse_known_args(argv) + output_path = known_args.output + + env = StreamExecutionEnvironment.get_execution_environment() + # write all the data to one file + env.set_parallelism(1) + + # define the source + data_stream = env.from_collection([ + ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) + + # define the watermark strategy + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(MyTimestampAssigner()) + + ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .key_by(lambda x: x[0], key_type=Types.STRING()) \ + .window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \ + .process(CountWindowProcessFunction(), + Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()])) + + # define the sink + if output_path is not None: + ds.sink_to( + sink=FileSink.for_row_format( + base_path=output_path, + encoder=Encoder.simple_string_encoder()) + .with_output_file_config( + OutputFileConfig.builder() + .with_part_prefix("prefix") + .with_part_suffix(".ext") + .build()) + .with_rolling_policy(RollingPolicy.default_rolling_policy()) + .build() + ) + else: + print("Printing result to stdout. Use --output to specify output path.") + ds.print() + + # submit for execution + env.execute() diff --git a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py new file mode 100644 index 0000000..524a889 --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py @@ -0,0 +1,103 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import argparse +from typing import Iterable + +from pyflink.datastream.connectors import FileSink, RollingPolicy, OutputFileConfig + +from pyflink.common import Types, WatermarkStrategy, Time, Encoder +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction +from pyflink.datastream.window import EventTimeSessionWindows, \ + SessionWindowTimeGapExtractor, TimeWindow + + +class MyTimestampAssigner(TimestampAssigner): + def extract_timestamp(self, value, record_timestamp) -> int: + return int(value[1]) + + +class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor): + def extract(self, element: tuple) -> int: + return element[1] + + +class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]): + def process(self, + key: str, + context: ProcessWindowFunction.Context[TimeWindow], + elements: Iterable[tuple]) -> Iterable[tuple]: + return [(key, context.window().start, context.window().end, len([e for e in elements]))] + + def clear(self, context: ProcessWindowFunction.Context) -> None: + pass + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output file to write results to.') + + argv = sys.argv[1:] + known_args, _ = parser.parse_known_args(argv) + output_path = known_args.output + + env = StreamExecutionEnvironment.get_execution_environment() + # write all the data to one file + env.set_parallelism(1) + + # define the source + data_stream = env.from_collection([ + ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) + + # define the watermark strategy + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(MyTimestampAssigner()) + + ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .key_by(lambda x: x[0], key_type=Types.STRING()) \ + .window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \ + .process(CountWindowProcessFunction(), + Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()])) + + # define the sink + if output_path is not None: + ds.sink_to( + sink=FileSink.for_row_format( + base_path=output_path, + encoder=Encoder.simple_string_encoder()) + .with_output_file_config( + OutputFileConfig.builder() + .with_part_prefix("prefix") + .with_part_suffix(".ext") + .build()) + .with_rolling_policy(RollingPolicy.default_rolling_policy()) + .build() + ) + else: + print("Printing result to stdout. Use --output to specify output path.") + ds.print() + + # submit for execution + env.execute() diff --git a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py new file mode 100644 index 0000000..de3343c --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py @@ -0,0 +1,97 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import argparse +from typing import Iterable + +from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy + +from pyflink.common import Types, WatermarkStrategy, Time, Encoder +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction +from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow + + +class MyTimestampAssigner(TimestampAssigner): + def extract_timestamp(self, value, record_timestamp) -> int: + return int(value[1]) + + +class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]): + def process(self, + key: str, + context: ProcessWindowFunction.Context[TimeWindow], + elements: Iterable[tuple]) -> Iterable[tuple]: + return [(key, context.window().start, context.window().end, len([e for e in elements]))] + + def clear(self, context: ProcessWindowFunction.Context) -> None: + pass + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output file to write results to.') + + argv = sys.argv[1:] + known_args, _ = parser.parse_known_args(argv) + output_path = known_args.output + + env = StreamExecutionEnvironment.get_execution_environment() + # write all the data to one file + env.set_parallelism(1) + + # define the source + data_stream = env.from_collection([ + ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) + + # define the watermark strategy + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(MyTimestampAssigner()) + + ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .key_by(lambda x: x[0], key_type=Types.STRING()) \ + .window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \ + .process(CountWindowProcessFunction(), + Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()])) + + # define the sink + if output_path is not None: + ds.sink_to( + sink=FileSink.for_row_format( + base_path=output_path, + encoder=Encoder.simple_string_encoder()) + .with_output_file_config( + OutputFileConfig.builder() + .with_part_prefix("prefix") + .with_part_suffix(".ext") + .build()) + .with_rolling_policy(RollingPolicy.default_rolling_policy()) + .build() + ) + else: + print("Printing result to stdout. Use --output to specify output path.") + ds.print() + + # submit for execution + env.execute() diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py new file mode 100644 index 0000000..0847576 --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py @@ -0,0 +1,82 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import argparse +from typing import Iterable + +from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy + +from pyflink.common import Types, Encoder +from pyflink.datastream import StreamExecutionEnvironment, WindowFunction +from pyflink.datastream.window import CountWindow + + +class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]): + def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]): + result = 0 + for i in inputs: + result += i[0] + return [(key, result)] + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output file to write results to.') + + argv = sys.argv[1:] + known_args, _ = parser.parse_known_args(argv) + output_path = known_args.output + + env = StreamExecutionEnvironment.get_execution_environment() + # write all the data to one file + env.set_parallelism(1) + + # define the source + data_stream = env.from_collection([ + (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')], + type_info=Types.TUPLE([Types.INT(), Types.STRING()])) + + ds = data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \ + .count_window(2) \ + .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) + + # define the sink + if output_path is not None: + ds.sink_to( + sink=FileSink.for_row_format( + base_path=output_path, + encoder=Encoder.simple_string_encoder()) + .with_output_file_config( + OutputFileConfig.builder() + .with_part_prefix("prefix") + .with_part_suffix(".ext") + .build()) + .with_rolling_policy(RollingPolicy.default_rolling_policy()) + .build() + ) + else: + print("Printing result to stdout. Use --output to specify output path.") + ds.print() + + # submit for execution + env.execute() diff --git a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py new file mode 100644 index 0000000..50cdd01 --- /dev/null +++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py @@ -0,0 +1,97 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import sys + +import argparse +from typing import Iterable + +from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy + +from pyflink.common import Types, WatermarkStrategy, Time, Encoder +from pyflink.common.watermark_strategy import TimestampAssigner +from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction +from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow + + +class MyTimestampAssigner(TimestampAssigner): + def extract_timestamp(self, value, record_timestamp) -> int: + return int(value[1]) + + +class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]): + def process(self, + key: str, + context: ProcessWindowFunction.Context[TimeWindow], + elements: Iterable[tuple]) -> Iterable[tuple]: + return [(key, context.window().start, context.window().end, len([e for e in elements]))] + + def clear(self, context: ProcessWindowFunction.Context) -> None: + pass + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '--output', + dest='output', + required=False, + help='Output file to write results to.') + + argv = sys.argv[1:] + known_args, _ = parser.parse_known_args(argv) + output_path = known_args.output + + env = StreamExecutionEnvironment.get_execution_environment() + # write all the data to one file + env.set_parallelism(1) + + # define the source + data_stream = env.from_collection([ + ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)], + type_info=Types.TUPLE([Types.STRING(), Types.INT()])) + + # define the watermark strategy + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(MyTimestampAssigner()) + + ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ + .key_by(lambda x: x[0], key_type=Types.STRING()) \ + .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \ + .process(CountWindowProcessFunction(), + Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()])) + + # define the sink + if output_path is not None: + ds.sink_to( + sink=FileSink.for_row_format( + base_path=output_path, + encoder=Encoder.simple_string_encoder()) + .with_output_file_config( + OutputFileConfig.builder() + .with_part_prefix("prefix") + .with_part_suffix(".ext") + .build()) + .with_rolling_policy(RollingPolicy.default_rolling_policy()) + .build() + ) + else: + print("Printing result to stdout. Use --output to specify output path.") + ds.print() + + # submit for execution + env.execute()