Hi everyone,

I am reading messages from a Kafka topic with 2 partitions and using event
time. This is my code:

.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String>
collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the kafka source parallelism to 1. I
guess that is because messages from multiple partitions arrive out-of-order
to the timestamp exctractor according to this thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
correct?
So I replaced the AscendingTimestampExtractor with a
BoundedOutOfOrdernessGenerator as in the documentation example
<https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
(with
a higher delay) in order to handle out-of-order events, but I still can't
get any output. Why is that?

Best,
Yassine

Reply via email to