This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new db23add  [FLINK-26969][python][examples] Add a few examples of window 
operation in Python DataStream API
db23add is described below

commit db23add96305e163328283859fb644af72350018
Author: zhangjingcun <cun8c...@163.com>
AuthorDate: Fri Apr 1 19:51:04 2022 +0800

    [FLINK-26969][python][examples] Add a few examples of window operation in 
Python DataStream API
    
    This closes #19328.
---
 .../pyflink/datastream/tests/test_window.py        |   2 +-
 .../examples/datastream/windowing/__init__.py      |  17 ++++
 .../windowing/session_with_dynamic_gap_window.py   | 103 +++++++++++++++++++++
 .../windowing/session_with_gap_window.py           | 103 +++++++++++++++++++++
 .../datastream/windowing/sliding_time_window.py    |  97 +++++++++++++++++++
 .../datastream/windowing/tumbling_count_window.py  |  82 ++++++++++++++++
 .../datastream/windowing/tumbling_time_window.py   |  97 +++++++++++++++++++
 7 files changed, 500 insertions(+), 1 deletion(-)

diff --git a/flink-python/pyflink/datastream/tests/test_window.py 
b/flink-python/pyflink/datastream/tests/test_window.py
index 72d7c0d..b8cdf7e 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -372,7 +372,7 @@ class 
CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountW
 
     def process(self,
                 key: str,
-                content: ProcessWindowFunction.Context,
+                context: ProcessWindowFunction.Context,
                 elements: Iterable[tuple]) -> Iterable[tuple]:
         return [(key, len([e for e in elements]))]
 
diff --git a/flink-python/pyflink/examples/datastream/windowing/__init__.py 
b/flink-python/pyflink/examples/datastream/windowing/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git 
a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
 
b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
new file mode 100644
index 0000000..c2f495b
--- /dev/null
+++ 
b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction
+from pyflink.datastream.window import EventTimeSessionWindows, \
+    SessionWindowTimeGapExtractor, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int(value[1])
+
+
+class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
+    def extract(self, element: tuple) -> int:
+        return element[1]
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, 
TimeWindow]):
+    def process(self,
+                key: str,
+                context: ProcessWindowFunction.Context[TimeWindow],
+                elements: Iterable[tuple]) -> Iterable[tuple]:
+        return [(key, context.window().start, context.window().end, len([e for 
e in elements]))]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+    output_path = known_args.output
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    # write all the data to one file
+    env.set_parallelism(1)
+
+    # define the source
+    data_stream = env.from_collection([
+        ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), 
('hi', 15)],
+        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+    # define the watermark strategy
+    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+        .with_timestamp_assigner(MyTimestampAssigner())
+
+    ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+        .key_by(lambda x: x[0], key_type=Types.STRING()) \
+        
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor()))
 \
+        .process(CountWindowProcessFunction(),
+                 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), 
Types.INT()]))
+
+    # define the sink
+    if output_path is not None:
+        ds.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        ds.print()
+
+    # submit for execution
+    env.execute()
diff --git 
a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py 
b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
new file mode 100644
index 0000000..524a889
--- /dev/null
+++ 
b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, RollingPolicy, 
OutputFileConfig
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction
+from pyflink.datastream.window import EventTimeSessionWindows, \
+    SessionWindowTimeGapExtractor, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int(value[1])
+
+
+class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
+    def extract(self, element: tuple) -> int:
+        return element[1]
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, 
TimeWindow]):
+    def process(self,
+                key: str,
+                context: ProcessWindowFunction.Context[TimeWindow],
+                elements: Iterable[tuple]) -> Iterable[tuple]:
+        return [(key, context.window().start, context.window().end, len([e for 
e in elements]))]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+    output_path = known_args.output
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    # write all the data to one file
+    env.set_parallelism(1)
+
+    # define the source
+    data_stream = env.from_collection([
+        ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), 
('hi', 15)],
+        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+    # define the watermark strategy
+    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+        .with_timestamp_assigner(MyTimestampAssigner())
+
+    ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+        .key_by(lambda x: x[0], key_type=Types.STRING()) \
+        .window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
+        .process(CountWindowProcessFunction(),
+                 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), 
Types.INT()]))
+
+    # define the sink
+    if output_path is not None:
+        ds.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        ds.print()
+
+    # submit for execution
+    env.execute()
diff --git 
a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py 
b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
new file mode 100644
index 0000000..de3343c
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction
+from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int(value[1])
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, 
TimeWindow]):
+    def process(self,
+                key: str,
+                context: ProcessWindowFunction.Context[TimeWindow],
+                elements: Iterable[tuple]) -> Iterable[tuple]:
+        return [(key, context.window().start, context.window().end, len([e for 
e in elements]))]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+    output_path = known_args.output
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    # write all the data to one file
+    env.set_parallelism(1)
+
+    # define the source
+    data_stream = env.from_collection([
+        ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), 
('hi', 9), ('hi', 15)],
+        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+    # define the watermark strategy
+    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+        .with_timestamp_assigner(MyTimestampAssigner())
+
+    ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+        .key_by(lambda x: x[0], key_type=Types.STRING()) \
+        .window(SlidingEventTimeWindows.of(Time.milliseconds(5), 
Time.milliseconds(2))) \
+        .process(CountWindowProcessFunction(),
+                 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), 
Types.INT()]))
+
+    # define the sink
+    if output_path is not None:
+        ds.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        ds.print()
+
+    # submit for execution
+    env.execute()
diff --git 
a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py 
b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
new file mode 100644
index 0000000..0847576
--- /dev/null
+++ 
b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
RollingPolicy
+
+from pyflink.common import Types, Encoder
+from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
+from pyflink.datastream.window import CountWindow
+
+
+class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
+    def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
+        result = 0
+        for i in inputs:
+            result += i[0]
+        return [(key, result)]
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+    output_path = known_args.output
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    # write all the data to one file
+    env.set_parallelism(1)
+
+    # define the source
+    data_stream = env.from_collection([
+        (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 
'hello'), (6, 'hello')],
+        type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
+
+    ds = data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+        .count_window(2) \
+        .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+
+    # define the sink
+    if output_path is not None:
+        ds.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        ds.print()
+
+    # submit for execution
+    env.execute()
diff --git 
a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py 
b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
new file mode 100644
index 0000000..50cdd01
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig, 
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction
+from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+    def extract_timestamp(self, value, record_timestamp) -> int:
+        return int(value[1])
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, 
TimeWindow]):
+    def process(self,
+                key: str,
+                context: ProcessWindowFunction.Context[TimeWindow],
+                elements: Iterable[tuple]) -> Iterable[tuple]:
+        return [(key, context.window().start, context.window().end, len([e for 
e in elements]))]
+
+    def clear(self, context: ProcessWindowFunction.Context) -> None:
+        pass
+
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        '--output',
+        dest='output',
+        required=False,
+        help='Output file to write results to.')
+
+    argv = sys.argv[1:]
+    known_args, _ = parser.parse_known_args(argv)
+    output_path = known_args.output
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    # write all the data to one file
+    env.set_parallelism(1)
+
+    # define the source
+    data_stream = env.from_collection([
+        ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), 
('hi', 9), ('hi', 15)],
+        type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+    # define the watermark strategy
+    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+        .with_timestamp_assigner(MyTimestampAssigner())
+
+    ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+        .key_by(lambda x: x[0], key_type=Types.STRING()) \
+        .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
+        .process(CountWindowProcessFunction(),
+                 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), 
Types.INT()]))
+
+    # define the sink
+    if output_path is not None:
+        ds.sink_to(
+            sink=FileSink.for_row_format(
+                base_path=output_path,
+                encoder=Encoder.simple_string_encoder())
+            .with_output_file_config(
+                OutputFileConfig.builder()
+                .with_part_prefix("prefix")
+                .with_part_suffix(".ext")
+                .build())
+            .with_rolling_policy(RollingPolicy.default_rolling_policy())
+            .build()
+        )
+    else:
+        print("Printing result to stdout. Use --output to specify output 
path.")
+        ds.print()
+
+    # submit for execution
+    env.execute()

Reply via email to