Yes, your understanding is correct. To handle this, you can define a watermark strategy that will detect idleness and mark an input as idle. Please refer to these two documents[1][2] for more details.
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness [2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best regards, Yuxia 发件人: "deepthi s" <deepthi.sridha...@gmail.com> 收件人: "User" <user@flink.apache.org> 发送时间: 星期四, 2022年 12 月 22日 上午 9:46:00 主题: Using TumblingEventTimeWindows on low traffic kafka topic (Adding subject) On Wed, Dec 21, 2022 at 5:41 PM deepthi s < [ mailto:deepthi.sridha...@gmail.com | deepthi.sridha...@gmail.com ] > wrote: Hello, I am new to even-time processing and need some help. We have a kafka source with very low qps and multiple topic partitions have no data for long periods of time. Additionally, data from the source can come out of order (within bounds) and the application needs to process the events in order per key. So we wanted to try and sort the events in the application. I am using BoundedOutOfOrdernessWatermarks for generating the watermarks and using TumblingEventTimeWindows to collect the keyed events and sort them in the ProcessWindowFunction. I am seeing that the window doesn’t close at all and based on my understanding it is because there are no events for some source partitions. All operators have the same parallelism as source kafka partition count. Pseudo code for my processing: SingleOutputStreamOperator<MyEvent> myStream = env.fromSource( setupSource () , WatermarkStrategy. noWatermarks () , "Kafka Source" , TypeInformation. of (RowData. class )) .map(rowData -> convertToMyEvent(rowData)) .assignTimestampsAndWatermarks(WatermarkStrategy .<MyEvent> forBoundedOutOfOrderness ( Duration. ofMinutes (misAlignmentThresholdMinutes)) .withTimestampAssigner((event , timestamp) -> event. timestamp )) // Key the events by urn which is the key used for the output kafka topic .keyBy((event) -> event.urn.toString()) // Set up a tumbling window of misAlignmentThresholdMinutes .window(TumblingEventTimeWindows. of (Time. of (misAlignmentThresholdMinutes , TimeUnit. MINUTES ))) .process( new EventTimerOrderProcessFunction()) .sinkTo(setupSink()) ; Is the understanding correct that the based on the WatermarkStrategy I have, multiple operators will keep emitting LONG.MIN_VALUE - threshold if no events are read for those partitions, causing the downstream keyBy operator also to emit LONG.MIN_VALUE - threshold watermark (as the min of all watermarks it sees from its input map operators) and so the window doesn’t close at all? If yes, what is the right strategy to handle this? Is there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger? -- Regards, Deepthi -- Regards, Deepthi