[ https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo updated FLINK-28920: --------------------------------- Summary: Release Testing: Verify Python DataStream Window (was: Release Testing: Running Python DataStream Window Job) > Release Testing: Verify Python DataStream Window > ------------------------------------------------ > > Key: FLINK-28920 > URL: https://issues.apache.org/jira/browse/FLINK-28920 > Project: Flink > Issue Type: Sub-task > Components: API / Python > Affects Versions: 1.16.0 > Reporter: Huang Xingbo > Priority: Blocker > Labels: release-testing > Fix For: 1.16.0 > > > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python datastream window job in thread mode. For details of > Window, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/]. > {code:python} > from typing import Iterable, Tuple, Dict > from pyflink.common import Configuration > from pyflink.common.typeinfo import Types > from pyflink.common.watermark_strategy import WatermarkStrategy, > TimestampAssigner > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.functions import AggregateFunction > from pyflink.datastream.window import EventTimeSessionWindows > def main(): > config = Configuration() > # thread mode > config.set_string("python.execution-mode", "thread") > # process mode > # config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > data_stream = env.from_collection([ > ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', > 15)], > type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: > DataStream > watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ > .with_timestamp_assigner(SecondColumnTimestampAssigner()) > class MyAggregateFunction(AggregateFunction): > def create_accumulator(self) -> Tuple[int, str]: > return 0, '' > def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) > -> Tuple[int, str]: > return value[1] + accumulator[0], value[0] > def get_result(self, accumulator: Tuple[str, int]): > return accumulator[1], accumulator[0] > def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]): > return acc_a[0] + acc_b[0], acc_a[1] > ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \ > .key_by(lambda x: x[0], key_type=Types.STRING()) \ > .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \ > .aggregate(MyAggregateFunction(), > accumulator_type=Types.TUPLE([Types.INT(), > Types.STRING()]), > output_type=Types.TUPLE([Types.STRING(), Types.INT()])) > ds.print() > env.execute('test_window_aggregate_accumulator_type') > if __name__ == '__main__': > main() > {code} > * run the python datastream window job and watch the result > {code:bash} > $ python demo.py > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)