I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
best overview over the changes to the window operator between 1.1. and 1.2.

On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi robert,
>>
>> Yes, I am using the same code, just swithcing the version in pom.xml to
>> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
>> the time of the question)). Here is the watermark assignment :
>>
>> .assignTimestampsAndWatermarks(new 
>> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
>> {
>>     @Override
>>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
>> tuple3) {
>>             return tuple3.f0;
>>         }
>> })
>>
>> Best,
>> Yassine
>>
>> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>>
>>> Hi Yassine,
>>> are you sure your watermark extractor is the same between the two
>>> versions. It sounds a bit like the watermarks for the 1.2 code are not
>>> generated correctly.
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
>>> y.marzou...@mindlytix.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
>>>> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
>>>> in memory and the windows results are not emitted until the whole stream is
>>>> processed. Is this a temporary behaviour due to the developments in
>>>> 1.2-SNAPSHOT, or a bug?
>>>>
>>>> I am using a code similar to the follwoing:
>>>>
>>>> env.setParallelism(1);
>>>>
>>>> DataStream<T> sessions = env
>>>>     .readTextFile()
>>>>     .flatMap()
>>>>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>>>>     .keyBy(1)
>>>>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>>>>     .apply().setParallelism(32)
>>>>
>>>> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
>>>> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>>>>
>>>> Best,
>>>> Yassine
>>>>
>>>
>>>
>>
>

Reply via email to