If you want to use event time processing with in-order data, then you
can use an AscendingTimestampExtractor [1].

David

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamp_extractors.html#assigners-with-ascending-timestamps

On Thu, Aug 22, 2019 at 4:03 PM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> thanks for the detail explanation! I removed my implementation of the 
> watermark which is not necessary in my case. I will only use Watermarkers if 
> I am dealing with out of order events.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
> On Wed, Aug 21, 2019 at 9:09 PM David Anderson <da...@ververica.com> wrote:
>>
>> What Watermarks do is to advance the event time clock. You can
>> consider a Watermark(t) as an assertion about the completeness of the
>> stream -- it marks a point in the stream and says that at that point,
>> the stream is (probably) now complete up to time t.
>>
>> The autoWatermarkInterval determines how often new Watermarks are
>> created -- or in other words, how often the event-time clock will be
>> able to move forward. From what you've presented, it seems like you
>> can leave this at its default, which is 200 msec. This means that five
>> times a second, as your application runs, each parallel instance will
>> create a new watermark (assuming there's been new data and that the
>> event time clock can be advanced).
>>
>> getCurrentWatermark() should NOT be implemented in terms of
>> System.currentTimeMillis -- you do not want your watermarking to
>> depend on the current processing time if you can possibly avoid it.
>> Part of the beauty of event time processing is being able to run your
>> application on historic data as well as live, real-time data, and this
>> is only possible if your watermarks depend on timestamps recorded in
>> the events, rather than System.currentTimeMillis.
>>
>> You should also try to decouple your watermarking strategy from the
>> specific processing you intend to later, downstream. The primary
>> concern you need to have when implementing the watermarking is to
>> consider how much out-of-orderness your data may have. A typical
>> timestamp assigner and watermark generator will look something like
>> this, assuming that your event stream will have its timestamps at most
>> 10 seconds out of order, and that your events have a timestamp field:
>>
>> DataStream<MyEvent> withTimestampsAndWatermarks =
>>     stream.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
>>
>>         @Override
>>         public long extractTimestamp(MyEvent element) {
>>             return element.timestamp;
>>         }
>> });
>>
>> As for your specific application requirements, you might find it
>> simpler to rely on State Time-to-Live [1] rather than clearing state
>> yourself.
>>
>> There's no need to retain the state until the windowed join is
>> completed, since the operator executing the join can't access the
>> state in the CoProcessFunction. The CoProcessFunction should clear the
>> state whenever it is done with it; no other part of your job will
>> access it.
>>
>> If there is a risk that the CoProcessFunction will create state that
>> isn't freed, and you don't for some reason find State TTL a good
>> solution for this, then you can use either a processing time or event
>> time timer to trigger a call to onTimer in which you can free the
>> state. For example,
>>
>> timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000);
>>
>> registers an event time timer for 60 seconds after the timestamp in an
>> event -- meaning, take the event's timestamp, add 60 seconds, and wait
>> until the current Watermark has surpassed that point in time.
>>
>> The Flink training website has tutorials [2] and exercises [3] on these 
>> topics.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
>> [2] https://training.ververica.com/lessons/event-time-watermarks.html
>> [3] 
>> https://training.ververica.com/exercises/rideEnrichment-processfunction.html
>>
>>
>> On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez
>> <felipe.o.gutier...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > I am a little confused about watermarkers in Flink.
>> >
>> > My application is using EventTime. My sources are calling 
>> > ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a 
>> > CoProcessFunction which merge the two streams. I have a state on this 
>> > function and I want to clean this state every time that I trigger the 
>> > window of my next operator. The next operator is a join which is using a 
>> > window of 1 minute [1].
>> >
>> > stream01 = source01.connect(sideoutput02).keyBy().process(new 
>> > MyCoProcessFunction);
>> > stream02 = source02.connect(sideoutput01).keyBy().process(new 
>> > MyCoProcessFunction);
>> > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print();
>> >
>> > I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 
>> > seconds), or if I have to add .assignTimestampsAndWatermarks(new 
>> > MyAssignerWithPeriodicWatermarks()) and write the logic on the method 
>> > getCurrentWatermark(). In my case that I want a watermark every 60 
>> > seconds, I guess this method (getCurrentWatermark()) should have "return 
>> > new Watermark(System.currentTimeMillis() + 60000);". but it should be - or 
>> > +.
>> >
>> > Then, on the CoProcessFunction what is the time that I should pass on 
>> > context.timerService().registerEventTimeTimer() and what is the logic that 
>> > I should use in the onTimer() method?
>> >
>> > [1] 
>> > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47
>> >
>> > Thanks,
>> > Felipe
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com

Reply via email to