I forgot to mention : the watermark extractor is the one included in Flink

2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi robert,
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
> Best,
> Yassine
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>> Hi Yassine,
>> are you sure your watermark extractor is the same between the two
>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>> generated correctly.
>> Regards,
>> Robert
>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>> Hi all,
>>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>>> in memory and the windows results are not emitted until the whole stream is
>>> processed. Is this a temporary behaviour due to the developments in
>>> 1.2-SNAPSHOT, or a bug?
>>> I am using a code similar to the follwoing:
>>> env.setParallelism(1);
>>> DataStream<T> sessions = env
>>>     .readTextFile()
>>>     .flatMap()
>>>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>>     .keyBy(1)
>>>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>>     .apply().setParallelism(32)
>>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>> Best,
>>> Yassine

Reply via email to