If you want to use event time processing with in-order data, then you can use an AscendingTimestampExtractor [1].
David [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamp_extractors.html#assigners-with-ascending-timestamps On Thu, Aug 22, 2019 at 4:03 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com> wrote: > > thanks for the detail explanation! I removed my implementation of the > watermark which is not necessary in my case. I will only use Watermarkers if > I am dealing with out of order events. > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > > On Wed, Aug 21, 2019 at 9:09 PM David Anderson <da...@ververica.com> wrote: >> >> What Watermarks do is to advance the event time clock. You can >> consider a Watermark(t) as an assertion about the completeness of the >> stream -- it marks a point in the stream and says that at that point, >> the stream is (probably) now complete up to time t. >> >> The autoWatermarkInterval determines how often new Watermarks are >> created -- or in other words, how often the event-time clock will be >> able to move forward. From what you've presented, it seems like you >> can leave this at its default, which is 200 msec. This means that five >> times a second, as your application runs, each parallel instance will >> create a new watermark (assuming there's been new data and that the >> event time clock can be advanced). >> >> getCurrentWatermark() should NOT be implemented in terms of >> System.currentTimeMillis -- you do not want your watermarking to >> depend on the current processing time if you can possibly avoid it. >> Part of the beauty of event time processing is being able to run your >> application on historic data as well as live, real-time data, and this >> is only possible if your watermarks depend on timestamps recorded in >> the events, rather than System.currentTimeMillis. >> >> You should also try to decouple your watermarking strategy from the >> specific processing you intend to later, downstream. The primary >> concern you need to have when implementing the watermarking is to >> consider how much out-of-orderness your data may have. A typical >> timestamp assigner and watermark generator will look something like >> this, assuming that your event stream will have its timestamps at most >> 10 seconds out of order, and that your events have a timestamp field: >> >> DataStream<MyEvent> withTimestampsAndWatermarks = >> stream.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { >> >> @Override >> public long extractTimestamp(MyEvent element) { >> return element.timestamp; >> } >> }); >> >> As for your specific application requirements, you might find it >> simpler to rely on State Time-to-Live [1] rather than clearing state >> yourself. >> >> There's no need to retain the state until the windowed join is >> completed, since the operator executing the join can't access the >> state in the CoProcessFunction. The CoProcessFunction should clear the >> state whenever it is done with it; no other part of your job will >> access it. >> >> If there is a risk that the CoProcessFunction will create state that >> isn't freed, and you don't for some reason find State TTL a good >> solution for this, then you can use either a processing time or event >> time timer to trigger a call to onTimer in which you can free the >> state. For example, >> >> timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000); >> >> registers an event time timer for 60 seconds after the timestamp in an >> event -- meaning, take the event's timestamp, add 60 seconds, and wait >> until the current Watermark has surpassed that point in time. >> >> The Flink training website has tutorials [2] and exercises [3] on these >> topics. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl >> [2] https://training.ververica.com/lessons/event-time-watermarks.html >> [3] >> https://training.ververica.com/exercises/rideEnrichment-processfunction.html >> >> >> On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez >> <felipe.o.gutier...@gmail.com> wrote: >> > >> > Hi, >> > >> > I am a little confused about watermarkers in Flink. >> > >> > My application is using EventTime. My sources are calling >> > ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a >> > CoProcessFunction which merge the two streams. I have a state on this >> > function and I want to clean this state every time that I trigger the >> > window of my next operator. The next operator is a join which is using a >> > window of 1 minute [1]. >> > >> > stream01 = source01.connect(sideoutput02).keyBy().process(new >> > MyCoProcessFunction); >> > stream02 = source02.connect(sideoutput01).keyBy().process(new >> > MyCoProcessFunction); >> > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); >> > >> > I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 >> > seconds), or if I have to add .assignTimestampsAndWatermarks(new >> > MyAssignerWithPeriodicWatermarks()) and write the logic on the method >> > getCurrentWatermark(). In my case that I want a watermark every 60 >> > seconds, I guess this method (getCurrentWatermark()) should have "return >> > new Watermark(System.currentTimeMillis() + 60000);". but it should be - or >> > +. >> > >> > Then, on the CoProcessFunction what is the time that I should pass on >> > context.timerService().registerEventTimeTimer() and what is the logic that >> > I should use in the onTimer() method? >> > >> > [1] >> > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47 >> > >> > Thanks, >> > Felipe >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com