Hi Kostas,

When I remove the window and the apply() and put print() after
assignTimestampsAndWatermarks,
the messages are printed correctly:

2> Request{ts=2015-01-01, 06:15:34:000}
2> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 19:10:00:000}
2> Request{ts=2015-01-02, 23:36:51:000}
2> Request{ts=2015-01-03, 17:38:47:000}
...

But strangely using only one task. If I set the source parallelism to 1
using env.addSource(kafka).setParallelism(1) (the window and the apply()
still removed), results are printed using all available slots (number of
CPU cores):

4> Request{ts=2015-01-01, 06:15:34:000}
4> Request{ts=2015-01-02, 16:38:10:000}
2> Request{ts=2015-01-02, 19:10:00:000}
4> Request{ts=2015-01-02, 23:36:51:000}
1> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-03, 17:38:47:000}
3> Request{ts=2015-01-03, 17:56:42:000}
...

Now if I keep the window and apply() with without specifying source
parallelism, no messages are printed (only regular kafka consumer and flink
logs), and if the source parallelism is set to 1, messages are printed
correctly:

1> Window: TimeWindow{start=1420070400000, end=1420156800000}
2> Request{ts=2015-01-01, 06:15:34:000}
1> Request{ts=2015-01-02, 16:38:10:000}
4> Request{ts=2015-01-02, 19:10:00:000}
3> Window: TimeWindow{start=1420156800000, end=1420243200000}
3> Request{ts=2015-01-02, 18:58:41:000}
2> Request{ts=2015-01-02, 23:36:51:000}
3> Window: TimeWindow{start=1420416000000, end=1420502400000}
2> Request{ts=2015-01-03, 17:38:47:000}
4> Window: TimeWindow{start=1420243200000, end=1420329600000}
1> Request{ts=2015-01-03, 17:56:42:000}
1> Request{ts=2015-01-05, 17:13:45:000}
4> Request{ts=2015-01-05, 01:25:55:000}
2> Request{ts=2015-01-05, 14:27:45:000}
...

On Wed, Jul 27, 2016 at 1:41 PM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Yassine,
>
> Could you just remove the window and the apply, and  just put a print()
> after the:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
>     @Override
>     public long extractAscendingTimestamp(Request req) {
>         return req.ts;
>     }
> })
>
>
> This at least will tell us if reading from Kafka works as expected.
>
> Kostas
>
> On Jul 25, 2016, at 3:39 PM, Yassin Marzouki <yassmar...@gmail.com> wrote:
>
> Hi everyone,
>
> I am reading messages from a Kafka topic with 2 partitions and using event
> time. This is my code:
>
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
>     @Override
>     public long extractAscendingTimestamp(Request req) {
>         return req.ts;
>     }
> })
> .windowAll(TumblingEventTimeWindows.of(Time.days(1)))
> .apply((TimeWindow window, Iterable<Request> iterable, Collector<String>
> collector) -> {
>     collector.collect("Window: " + window.toString());
>     for (Request req : iterable) {
>         collector.collect(req.toString());
>     }
> })
> .print()
>
> I could get an output only when setting the kafka source parallelism to 1. I
> guess that is because messages from multiple partitions arrive out-of-order
> to the timestamp exctractor according to this thread
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-td4782.html#a4804>,
> correct?
> So I replaced the AscendingTimestampExtractor with a
> BoundedOutOfOrdernessGenerator as in the documentation example
> <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#tab_java_3>
>  (with
> a higher delay) in order to handle out-of-order events, but I still can't
> get any output. Why is that?
>
> Best,
> Yassine
>
>
>

Reply via email to