Hi Vijay,

Could you provide more information about your problem? For example
- Which kind of window do you use?
- What's the window size?
- A relatively complete code is better :-)

As for the problem, it is probably the event time has not reached the end
of the window. You can monitor the watermark in the web dashboard[1].
Also, changing even time to processing time is another way to verify if it
is a watermark problem.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html


On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> Observations on Watermarks:
> Read this great article:
> https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
>
> * Watermark means when for any event TS, when to stop waiting for arrival
> of earlier events.
> * Watermark t means all events with Timestamp < t have already arrived.
> * When to push data out - When watermark with TS >= t arrives
>
> Only *using incrementing current time for watermark seems to be working
> correctly* but not sure if it aligns up correctly with EventTime
> processing.
> *Using the incoming records intervalStart as the Watermark source  for
> EventTime causes data to not be pushed at all* in cases when i have just
> 5 records in the Source.
>
> My source generation for intervalStart has intervalStart incrementing at a
> regular interval.
> I tried using the intervalStart for my Watermark with a out of order late
> boundedness of 3 secs.
> The *AggregateFunction* I am using calls the add() fine but *never calls
> the getResult().*
> My assumption was that the AggregateFunction I am using would push the
> data to getResult
> based on the Watermark based on intervalStart incrementing beyong the
> previous watermark t.
> But it doesn't -is it because I have limited number of input records and
> once intervalStart gets to the end
> of the input records too fast, it stops incrementing the watermar and
> hence doesn't push data ?
>
> With System.currentTimeMillis, it happily keeps increasing and hence
> pushes the data.
>
> Created this class:
> public class MonitoringAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {
>     private long bound = 3 * 1000;//3 secs out of order bound in millisecs
>
>     public MonitoringAssigner(long bound) {
>         this.bound = bound;
>     }
>     public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {
>         long nextWatermark = extractedTimestamp - bound;
>         //simply emit a Watermark with every event
>         return new Watermark(nextWatermark);
>     }
>
>     @Override
>     public long extractTimestamp(Monitoring monitoring, long previousTS) {
>         /*LocalDateTime intervalStart =
> Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
> 02:21:06.057
>         long extractedTS = 
> Utils.getLongFromLocalDateTime(intervalStart);//*using
> this stopped pushing recs after a certain time*
>         return extractedTS;*/
>         return *System.currentTimeMillis*();//incrementing current time
>
>     }
>
>

Reply via email to