Hi Yassine,

Could you just remove the window and the apply, and  just put a print() after 
the:
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
>     @Override
>     public long extractAscendingTimestamp(Request req) {
>         return req.ts;
>     }
> })

This at least will tell us if reading from Kafka works as expected.

Kostas

> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com> wrote:
> 
> Hi everyone,
> 
> I am reading messages from a Kafka topic with 2 partitions and using event 
> time. This is my code:
> 
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
>     @Override
>     public long extractAscendingTimestamp(Request req) {
>         return req.ts;
>     }
> })
> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
> .apply((TimeWindow window, Iterable<Request> iterable, Collector<String> 
> collector) -> {
>     collector.collect("Window: " + window.toString());
>     for (Request req : iterable) {
>         collector.collect(req.toString());
>     }
> })
> .print()
> 
> I could get an output only when setting the kafka source parallelism to 1. I 
> guess that is because messages from multiple partitions arrive out-of-order 
> to the timestamp exctractor according to this thread 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
>  correct?
> So I replaced the AscendingTimestampExtractor with a 
> BoundedOutOfOrdernessGenerator as in the documentation example 
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
>  (with a higher delay) in order to handle out-of-order events, but I still 
> can't get any output. Why is that?
> 
> Best,
> Yassine
> 

Reply via email to