[ https://issues.apache.org/jira/browse/FLINK-31099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo resolved FLINK-31099. ---------------------------------- Resolution: Fixed Merged into master via ca770b3d905936d8a93071210bd6542b6733221d Merged into release-1.17 via c7c035a2413c04cd75948d8364e0770b97499901 Merged into release-1.16 via e3c0060e7fca53e0e01cb91e00607c8146b85604 > Chained WindowOperator throws NPE in PyFlink ThreadMode > ------------------------------------------------------- > > Key: FLINK-31099 > URL: https://issues.apache.org/jira/browse/FLINK-31099 > Project: Flink > Issue Type: Bug > Components: API / Python > Affects Versions: 1.17.0, 1.16.1 > Reporter: Huang Xingbo > Assignee: Huang Xingbo > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0, 1.16.2 > > > Test case > {code:python} > config = Configuration() > config.set_string("python.execution-mode", "process") > env = StreamExecutionEnvironment.get_execution_environment(config) > class MyTimestampAssigner(TimestampAssigner, ABC): > def extract_timestamp(self, value: tuple, record_timestamp: int) -> int: > return value[0] > ds = env.from_collection( > [(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1), > (1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1), > (1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)] > ).assign_timestamps_and_watermarks( > > WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner()) > ) > ds.key_by( > lambda x: (x[0], x[1], x[2]) > ).window( > TumblingEventTimeWindows.of(Time.minutes(1)) > ).reduce( > lambda x, y: (x[0], x[1], x[2], x[3] + y[3]), > output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), > Types.INT()]) > # ).filter( > # lambda x: x[1] == "a1" > ).map( > lambda x: (x[0], x[1], x[3]), > output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()]) > ).print() > env.execute() > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)