Hi Aljoscha,

Please excuse me for the late response; I've been busy for the whole
previous week.
I used the custom watermark debugger (with 1.1, I changed
super.processWatermark(mark)
to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
watremark is printed at the end of the stream with the value WM: Watermark
@ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f495
34e3a210e9.

I uploaded the dataset I'm using as an input here :
https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing
 ,the first column corresponds to the timestamp.

You can find the code below. Thanks you for your help.

import com.opencsv.CSVParser;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import java.util.*;

/**
 * Created by ymarzougui on 11/1/2016.
 */
public class SortedSessionsAssigner {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple3<Long,String,String>> waterMarked =
env.readTextFile("file:///E:\\data\\anonymized.csv")
                .flatMap(new RichFlatMapFunction<String,
Tuple3<Long,String,String>>() {
                    public CSVParser csvParser;

                    @Override
                    public void open(Configuration config) {
                        csvParser = new CSVParser(',', '"');
                    }

                    @Override
                    public void flatMap(String in,
Collector<Tuple3<Long,String,String>> clctr) throws Exception {
                        String[] result = csvParser.parseLine(in);
                        clctr.collect(Tuple3.of(Long.parseLong(result[0]),
result[1], result[2]));
                    }
                })
                .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Tuple3<Long,String,String>>() {
                    @Override
                    public long
extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) {
                        return tuple3.f0;
                    }
                });

        DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions =
waterMarked
                .keyBy(1)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
                .apply(new
WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>,
Long>, Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple3<Long, String, String>> iterable,
Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws
Exception {
                        TreeMap<String,Double> treeMap = new
TreeMap<String, Double>();
                        Long session_count = 0L;
                        for (Tuple3<Long, String, String> tuple3 :
iterable){
                            treeMap.put(tuple3.f2,
treeMap.getOrDefault(tuple3.f2, 0.0) + 1);
                            session_count += 1;
                        }
                        collector.collect(Tuple2.of(treeMap,
session_count));

                    }
                }).setParallelism(8);

        waterMarked.transform("WatermarkDebugger", waterMarked.getType(),
new WatermarkDebugger<Tuple3<Long, String, String>>());

        //sessions.writeAsCsv("file:///E:\\data\\sessions.csv",
FileSystem.WriteMode.OVERWRITE).setParallelism(1);

        env.execute("Sorted Sessions Assigner");

    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws
Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // 1.2-snapshot
            super.processWatermark(mark);
            // 1.1-snapshot
            //super.output.emitWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }

}

Best,
Yassine

2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> could you please try adding this custom watermark debugger to see what's
> going on with the element timestamps and watermarks:
>
> public static class WatermarkDebugger<T>
>         extends AbstractStreamOperator<T> implements
> OneInputStreamOperator<T, T> {
>     private static final long serialVersionUID = 1L;
>
>     @Override
>     public void processElement(StreamRecord<T> element) throws Exception {
>         System.out.println("ELEMENT: " + element);
>         output.collect(element);
>     }
>
>     @Override
>     public void processWatermark(Watermark mark) throws Exception {
>         super.processWatermark(mark);
>         System.out.println("WM: " + mark);
>     }
> }
>
> you can use it like this:
> input.transform("WatermarkDebugger", input.getType(), new
> WatermarkDebugger<Tuple2<String, Integer>>());
>
> That should give us something to work with.
>
> Cheers,
> Aljoscha
>
> On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote:
>
> I'll add Aljoscha and Kostas Kloudas to the conversation. They have the
> best overview over the changes to the window operator between 1.1. and 1.2.
>
> On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> I forgot to mention : the watermark extractor is the one included in Flink
> API.
>
> 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
> Hi robert,
>
> Yes, I am using the same code, just swithcing the version in pom.xml to
> 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at
> the time of the question)). Here is the watermark assignment :
>
> .assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<Tuple3<Long,String,String>>()
> {
>     @Override
>         public long extractAscendingTimestamp(Tuple3<Long,String,String>
> tuple3) {
>             return tuple3.f0;
>         }
> })
>
> Best,
> Yassine
>
> 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
>
> Hi Yassine,
> are you sure your watermark extractor is the same between the two
> versions. It sounds a bit like the watermarks for the 1.2 code are not
> generated correctly.
>
> Regards,
> Robert
>
>
> On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
> Hi all,
>
> With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows
> boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing
> in memory and the windows results are not emitted until the whole stream is
> processed. Is this a temporary behaviour due to the developments in
> 1.2-SNAPSHOT, or a bug?
>
> I am using a code similar to the follwoing:
>
> env.setParallelism(1);
>
> DataStream<T> sessions = env
>     .readTextFile()
>     .flatMap()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>())
>     .keyBy(1)
>     .window(EventTimeSessionWindows.withGap(Time.minutes(5)))
>     .apply().setParallelism(32)
>
> sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv();
> sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv();
>
> Best,
> Yassine
>
>
>
>
>
>

Reply via email to