Hello, I am new to even-time processing and need some help.

We have a kafka source with very low qps and multiple topic partitions have
no data for long periods of time. Additionally, data from the source can
come out of order (within bounds) and the application needs to process the
events in order per key.  So we wanted to try and sort the events in the

I am using BoundedOutOfOrdernessWatermarks for generating the watermarks
and using TumblingEventTimeWindows to collect the keyed events and sort
them in the ProcessWindowFunction. I am seeing that the window doesn’t
close at all and based on my understanding it is because there are no
events for some source partitions. All operators have the same parallelism
as source kafka partition count.

Pseudo code for my processing:

SingleOutputStreamOperator<MyEvent> myStream =



       "Kafka Source",

    .map(rowData -> convertToMyEvent(rowData))
        .withTimestampAssigner((event, timestamp) -> event.timestamp))
    // Key the events by urn which is the key used for the output kafka
    .keyBy((event) -> event.urn.toString())
    // Set up a tumbling window of misAlignmentThresholdMinutes

(misAlignmentThresholdMinutes, TimeUnit.*MINUTES*)))
    .process(new EventTimerOrderProcessFunction())


Is the understanding correct that the based on the WatermarkStrategy I
have, multiple operators will keep emitting *LONG.MIN_VALUE - threshold* if
no events are read for those partitions, causing the downstream keyBy
operator also to emit *LONG.MIN_VALUE - threshold* watermark (as the min of
all watermarks it sees from its input map operators) and so the window
doesn’t close at all? If yes, what is the right strategy to handle this? Is
there a way to combine EventTimeTrigger with ProcessingTimeoutTrigger?


