Now I see. What you're doing in this example is basically reassigning
timestamps to other elements in your stateful MapFunction. Flink internally
keeps track of the timestamp of an element. This can normally not be
changed, except by using a TimestampAssigner, which you're doing. Now, the
output from a MapFunction has the same timestamp as the input element. By
keeping an element in state and emitting it when the next element arrives
you emit it with the timestamp of that next element and that's the reason
why the end up in the "wrong" windows.

Does that help?

-
Aljoscha

On Thu, 26 Jan 2017 at 19:17 Nico <nico.franz...@gmail.com> wrote:

> Hi,
>
> can anyone help me with this problem? I don't get it. Forget the examples
> below, I've created a copy / paste example to reproduce the problem of
> incorrect results when using key-value state und windowOperator.
>
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream<Tuple2<String,Long>> stream = env.fromElements(
> new Tuple2<>("1",1485446260994L),
> new Tuple2<>("1",1485446266012L),
> new Tuple2<>("1",1485446271031L),
> new Tuple2<>("1",1485446276040L),
> new Tuple2<>("1",1485446281045L),
> new Tuple2<>("1",1485446286049L),
> new Tuple2<>("1",1485446291062L),
> new Tuple2<>("1",1485446296066L),
> new Tuple2<>("1",1485446302019L)
> );
>
> stream
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,
> Long>>(Time.seconds(0)) {
> @Override
> public long extractTimestamp(Tuple2<String, Long> stringLongTuple2) {
> return stringLongTuple2.f1;
> }
> })
> .keyBy("f0")
> .map(new MapTest())
> .keyBy("f0")
> .window(TumblingEventTimeWindows.of(Time.seconds(20)))
> .apply(new WindowFunction<Tuple2<String,Long>, Object, Tuple,
> TimeWindow>() {
> @Override
> public void apply(Tuple tuple, TimeWindow timeWindow,
> Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector)
> throws Exception {
>
> Set<Long> set = new HashSet<>();
> for(Tuple2<String,Long> t : iterable){
> set.add(t.f1);
> }
>
> StringBuilder sb = new StringBuilder();
>
> sb.append("Window [" +timeWindow.getStart() +" " +timeWindow.getEnd() +"]
> ");
> sb.append("Set " +set.toString());
> System.out.println(sb.toString());
> }
> })
> .print();
>
>
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
> }
>
> private static class MapTest extends
> RichMapFunction<Tuple2<String,Long>,Tuple2<String,Long>> {
>
> private transient ValueState<Tuple2<String, Long>> state;
>
> @Override
> public Tuple2<String, Long> map(Tuple2<String, Long> stringLongTuple2)
> throws Exception {
>
> Tuple2<String,Long> t = state.value();
>
> state.update(stringLongTuple2);
>
> if(t == null) return stringLongTuple2;
>
> return t;
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>
> ValueStateDescriptor<Tuple2<String,Long>> vsd = new ValueStateDescriptor<>(
> "lastEvent",
> TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
> null
> );
>
> state = getRuntimeContext().getState(vsd);
> }
> }
> }
>
>
> Output:
>
> Window [1485446260000 1485446280000] Set [1485446271031, 1485446260994,
> 1485446266012]
> Window [1485446280000 1485446300000] Set [1485446291062, 1485446281045,
> 1485446286049, 1485446276040]
> Window [1485446300000 1485446320000] Set [1485446296066]
>
> Best,
> Nico
>
> BTW ... I am using Flink 1.1.3.
>
>
> 2017-01-16 12:18 GMT+01:00 Nico <nico.franz...@gmail.com>:
>
> Hi Aljoscha,
>
> is was able to identify the root cause of the problem. It is my first map
> function using the ValueState. But first, the
> assignTimestampsAndWatermarks() is called after the connector to Kafka is
> generated:
>
> FlinkKafkaConsumer09<Car> carFlinkKafkaConsumer09  =
>       new FlinkKafkaConsumer09<>("Traffic", new Car(), properties);
>
> // Extrahieren der Timestamps mit max. Delay von 2s
> carFlinkKafkaConsumer09.assignTimestampsAndWatermarks(new 
> TimestampGenerator(Time.seconds(0)));
>
> In the map function I try to calculate the direction between two GPS data 
> points. For this, I store the last event in ValueState. The function looks 
> like this:
>
> private static class BearingMap extends RichMapFunction<Car, Car> {
>
>    private transient ValueState<Car> state;
>    private final double maxdiff = 12; // in Sekunden
>
>    @Override
>    public Car map(Car destination) throws Exception {
>
>       Car origin = state.value();
>       double olat, olon, dlat, dlon;
>
>       /**
>        *  Wenn State leer, berechne keine Richtung, sondern speichere Event 
> nur in State
>        */
>       if (origin == null){
>          state.update(destination);
>          // gebe Car ohne Aenderungen zurueck
>          return destination;
>       }
>
>       double diff = origin.getTimestamp()-destination.getTimestamp();
>
>            System.out.println("Differenz: " +diff);
>
>            if(Math.abs(diff) <= maxdiff*1000){
>
>          /*
>           * Bei spaeten Events, die jedoch noch in den erlaubten Delay fallen
>           */
>          if(diff > 0){
>             Car tmp = destination;
>             destination = origin;
>             origin = tmp;
>          }
>
>          /*
>           * Car tmp ist immer der Ursprung
>           */
>
>          double bearing = Helper.calculateBearing(
>                
> origin.getLat(),origin.getLon(),destination.getLat(),destination.getLon());
>
>          // Update des State's
>          state.update(destination);
>
>          origin.setDirection(bearing);
>          return origin;
>
>       }
>
>       // Bei zu spaeten Events behalte jetzigen Status und gebe diesen ohne 
> Richtung zurück
>          return origin;
>
>    }
>
>
>    @Override
>    public void open(Configuration parameters) throws Exception {
>
>       ValueStateDescriptor<Car> vsd = new ValueStateDescriptor<>(
>             "lastEvent",
>             Car.class,
>             null
>       );
>
>       state = getRuntimeContext().getState(vsd);
>    }
>
> }
>
> Together with the window function:
>
>
> private static class TimeWindowTest implements WindowFunction<Car, 
> Tuple9<Double, Double, Double, Double, Double, Double, Double, Integer, 
> List<String>>, Tuple, TimeWindow> {
>     @Override
>     public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Car> 
> iterable, Collector<Tuple9<Double, Double, Double, Double, Double, Double, 
> Double, Integer, List<String>>> collector) throws Exception {
>         String s = "Zeitfenster: " +timeWindow.getStart() +" - " + 
> timeWindow.getEnd() +"\n";
>         Set<Long> timestamps = new HashSet<Long>();
>
>         for( Car c : iterable){
>             timestamps.add(c.getTimestamp());
>         }
>
>         System.out.println( s +timestamps +"\n\n");
>     }
> }
>
> I get for :
>
> stream
>    .filter(new FilterFunction<Car>() {
>       @Override
>       public boolean filter(Car car) throws Exception {
>          return car.getId().equals("car.330");
>       }
>    })
>              .keyBy("id")
>              .map(new BearingMap())
>              .keyBy("id")
>              .window(TumblingEventTimeWindows.of(Time.seconds(10)))
>              .apply(new TimeWindowTest());
>
> So actually, when an event e1 arrives the Map Operator, it is stored in 
> ValueState and after the next element e2 arrives, e1
> will be forwarded. This is after 5 seconds. This generates the following 
> outcome. One Element is always around 5 seconds before the
> start of the window.
>
> Differenz: -5013.0
> Differenz: -5014.0
> Zeitfenster: 1484564690000 - 1484564700000 (Window times start and end)
> [1484564686236, 1484564691260]
>
>
> Differenz: -5009.0
> Differenz: -5007.0
> Zeitfenster: 1484564700000 - 1484564710000
> [1484564696273, 1484564701287]
>
>
> Differenz: -5005.0
> Differenz: -5014.0
> Zeitfenster: 1484564710000 - 1484564720000
> [1484564706296, 1484564711303]
>
>
> Best,
>
> Nico
>
>
>
> 2017-01-09 16:10 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> I'm assuming you also have the call to assignTimestampsAndWatermarks()
> somewhere in there as well, as in:
>
> stream
>       .assignTimestampsAndWatermarks(new TimestampGenerator()) // or
> somewhere else in the pipeline
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> Just checking, to make sure. If you have this we might have to dig a
> little deeper. Could you also please trying to bring the whole output of
> your apply() method in one go, i.e. collect all the output in a String and
> then have one call to System.out.println(), it could be that the output in
> the terminal is not completely in order.
>
> Cheers,
> Aljoscha
>
> On Mon, 2 Jan 2017 at 15:04 Nico <nico.franz...@gmail.com> wrote:
>
> Hi Aljoscha,
>
> thank you for having a look. Actually there is not too much code based on
> timestamps:
>
> stream
>       .keyBy("id")
>       .map(...)
>       .filter(...)
>       .map(...)
>       .keyBy("areaID")
>       .map(new KeyExtractor())
>       .keyBy("f1.areaID","f0.sinterval")
>       .window(TumblingEventTimeWindows.of(Time.seconds(20)))
>       .apply(new TrafficInformation());
>
> The map functions only enrich the data and don't change anything related
> to the timestamp.
>
> the apply function is:
>
> @Override
> public void apply(
> Tuple key,
> TimeWindow timeWindow,
> Iterable<Tuple2<DirectionInterval, Car>> cars,
> Collector<Tuple3<String, Double, Double>> out) throws Exception {
>
> System.out.println("Start: " +timeWindow.getStart());
> System.out.println("End: " +timeWindow.getEnd());
>
> for(Tuple2<DirectionInterval, Car> t : cars){
> System.out.println(t.f1);
> }
>
> System.out.println(t.f1) prints all information about a car, in which the
> timestep is embedded. The System gets the timestamp with the class:
>
> public class TimestampGenerator extends
> BoundedOutOfOrdernessTimestampExtractor <Car> {
>
>
>     public TimestampGenerator(Time maxOutOfOrderness){
>         super(maxOutOfOrderness);
>     }
>
>     @Override
>     public long extractTimestamp(Car car) {
>         return car.getTimestamp();
>     }
>
>
> Example output is presented in the previous post... it looks like the
> timestamp is rounded... I am confused :-/
>
> Best,
> Nico
>
> 2016-12-23 19:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
> Hi,
> could you please share code (and example data) for producing this output.
> I'd like to have a look.
>
> Cheers,
> Aljoscha
>
> On Wed, 21 Dec 2016 at 16:29 Nico <nico.franz...@gmail.com> wrote:
>
> Hi @all,
>
> I am using a TumblingEventTimeWindows.of(Time.seconds(20)) for testing.
> During this I found a strange behavior (at least for me) in the assignment
> of events.
>
> The first element of a new window is actually always part of the old
> window. I thought the events are late, but then they they would be dropped
> instead of assigned to the new window. Even with a allowedLateness of 10s
> the behavior remains the same.
>
> The used timeWindow.getStart() and getEnd in order to get the boundaries
> of the window.
>
> Can someone explain this?
>
> Best,
> Nico
>
>
> TimeWindows with Elements:
>
> Start: 1482332940000 - End: 1482332960000
> timestamp=1482332952907
>
> Start: 1482332960000 - End: 1482332980000
> timestamp=1482332958929
> timestamp=1482332963995
> timestamp=1482332969027
> timestamp=1482332974039
>
> Start: 1482332980000 - End: 1482333000000
> timestamp=1482332979059
> timestamp=1482332984072
> timestamp=1482332989081
> timestamp=1482332994089
>
> Start: 1482333000000 - End: 1482333020000
> timestamp=1482332999113
> timestamp=1482333004123
> timestamp=1482333009132
> timestamp=1482333014144
>
>
>
>
>

Reply via email to