I used the custom watermark debugger (with 1.1, I changed
to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
watremark is printed at the end of the stream with the value WM: Watermark
@ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
printed periodically. I am  using the following revision of 1.2-SNAPSHOT :

I uploaded the dataset I'm using as an input here :
 ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

 * Created by ymarzougui on 11/1/2016.
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =

        DataStream<Tuple3<Long,String,String>> waterMarked =
                .flatMap(new RichFlatMapFunction<String,
Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');

                    public void flatMap(String in,
Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
result[1], result[2]));
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    public long
extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions =
WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>,
Long>, Tuple, TimeWindow>() {

                    public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple3<Long, String, String>> iterable,
Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws
Exception {
                        TreeMap<String,Double> treeMap = new
TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 :
treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;


        waterMarked.transform("WatermarkDebugger", waterMarked.getType(),
new WatermarkDebugger<Tuple3<Long, String, String>>());


        env.execute("Sorted Sessions Assigner");


    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        public void processElement(StreamRecord<T> element) throws
Exception {
            System.out.println("ELEMENT: " + element);

        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            // 1.1-snapshot
            System.out.println("WM: " + mark);



> Hi,
> could you please try adding this custom watermark debugger to see what's
> going on with the element timestamps and watermarks:
> you can use it like this:
> input.transform("WatermarkDebugger", input.getType(), new
> WatermarkDebugger<Tuple2<String, Integer>>());
> That should give us something to work with.
> Cheers,
> Aljoscha
On Mon, 5 Dec 2016 at 18:54 Robert Metzger
> I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
> best overview over the changes to the window operator between 1.1. and 1.2.
On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI
> y.marzou...@mindlytix.com> wrote:
I forgot to mention : the watermark extractor is the one included in Flink API.
> API.
2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI
> Hi robert,
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
> Best,
> Yassine
2016-12-05 11:24 GMT+01:00 Robert Metzger
> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
> Regards,
> Robert
On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI
> y.marzou...@mindlytix.com> wrote:
> Hi all,
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
> I am using a code similar to the follwoing:
> env.setParallelism(1);
> DataStream<T> sessions = env
>     .readTextFile()
>     .flatMap()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>     .keyBy(1)
>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>     .apply().setParallelism(32)
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
> Best,
> Yassine

