[ 
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-28920:
------------------------------------

    Assignee: Luning Wang

> Release Testing: Verify Python DataStream Window
> ------------------------------------------------
>
>                 Key: FLINK-28920
>                 URL: https://issues.apache.org/jira/browse/FLINK-28920
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / Python
>    Affects Versions: 1.16.0
>            Reporter: Huang Xingbo
>            Assignee: Luning Wang
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job  in thread mode. For details of 
> Window, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy, 
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
>     def extract_timestamp(self, value, record_timestamp) -> int:
>         return int(value[1])
> def main():
>     config = Configuration()
>     # thread mode
>     config.set_string("python.execution-mode", "thread")
>     # process mode
>     # config.set_string("python.execution-mode", "process")
>     env = StreamExecutionEnvironment.get_execution_environment(config)
>     data_stream = env.from_collection([
>         ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 
> 15)],
>         type_info=Types.TUPLE([Types.STRING(), Types.INT()]))  # type: 
> DataStream
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>         .with_timestamp_assigner(SecondColumnTimestampAssigner())
>     class MyAggregateFunction(AggregateFunction):
>         def create_accumulator(self) -> Tuple[int, str]:
>             return 0, ''
>         def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) 
> -> Tuple[int, str]:
>             return value[1] + accumulator[0], value[0]
>         def get_result(self, accumulator: Tuple[str, int]):
>             return accumulator[1], accumulator[0]
>         def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
>             return acc_a[0] + acc_b[0], acc_a[1]
>     ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>         .key_by(lambda x: x[0], key_type=Types.STRING()) \
>         .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
>         .aggregate(MyAggregateFunction(),
>                    accumulator_type=Types.TUPLE([Types.INT(), 
> Types.STRING()]),
>                    output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
>     ds.print()
>     env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
>     main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to