Yes, your understanding is correct. To handle this, you can define a watermark 
strategy that will detect idleness and mark an input as idle. 
Please refer to these two documents[1][2] for more details. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness
 
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
 

Best regards, 
Yuxia 


发件人: "deepthi s" <deepthi.sridha...@gmail.com> 
收件人: "User" <user@flink.apache.org> 
发送时间: 星期四, 2022年 12 月 22日 上午 9:46:00 
主题: Using TumblingEventTimeWindows on low traffic kafka topic 

(Adding subject) 

On Wed, Dec 21, 2022 at 5:41 PM deepthi s < [ 
mailto:deepthi.sridha...@gmail.com | deepthi.sridha...@gmail.com ] > wrote: 





Hello, I am new to even-time processing and need some help. 



We have a kafka source with very low qps and multiple topic partitions have no 
data for long periods of time. Additionally, data from the source can come out 
of order (within bounds) and the application needs to process the events in 
order per key. So we wanted to try and sort the events in the application. 




I am using BoundedOutOfOrdernessWatermarks for generating the watermarks and 
using TumblingEventTimeWindows to collect the keyed events and sort them in the 
ProcessWindowFunction. I am seeing that the window doesn’t close at all and 
based on my understanding it is because there are no events for some source 
partitions. All operators have the same parallelism as source kafka partition 
count. 



Pseudo code for my processing: 



SingleOutputStreamOperator<MyEvent> myStream = 

env.fromSource( 

setupSource () , 

WatermarkStrategy. noWatermarks () , 
"Kafka Source" , 

TypeInformation. of (RowData. class )) 
.map(rowData -> convertToMyEvent(rowData)) 
.assignTimestampsAndWatermarks(WatermarkStrategy 
.<MyEvent> forBoundedOutOfOrderness ( 
Duration. ofMinutes (misAlignmentThresholdMinutes)) 
.withTimestampAssigner((event , timestamp) -> event. timestamp )) 
// Key the events by urn which is the key used for the output kafka topic 
.keyBy((event) -> event.urn.toString()) 
// Set up a tumbling window of misAlignmentThresholdMinutes 

.window(TumblingEventTimeWindows. of (Time. of (misAlignmentThresholdMinutes , 
TimeUnit. MINUTES ))) 
.process( new EventTimerOrderProcessFunction()) 

.sinkTo(setupSink()) ; 



Is the understanding correct that the based on the WatermarkStrategy I have, 
multiple operators will keep emitting LONG.MIN_VALUE - threshold if no events 
are read for those partitions, causing the downstream keyBy operator also to 
emit LONG.MIN_VALUE - threshold watermark (as the min of all watermarks it sees 
from its input map operators) and so the window doesn’t close at all? If yes, 
what is the right strategy to handle this? Is there a way to combine 
EventTimeTrigger with ProcessingTimeoutTrigger? 




-- 
Regards, 
Deepthi 





-- 
Regards, 
Deepthi 

Reply via email to