Hi Nara,

yes, the watermark in TimerService is not covered by the checkpoint, everytime 
the job is restarted from a previous checkpoint, it is reset to Long.MIN_VALUE. 
I can see it a bit tricky to cover it into the checkpoint, especially when we 
need to support rescaling(it seems not like a purely keyed or a operate state), 
maybe @Stefan or @Aljoscha could give you more useful information about why it 
wasn't covered by the checkpoint.


Best, Sihua






On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha...@gmail.com> 
wrote:
Hi,


Is it possible the watermark in TimerService not getting reset when a job is 
restarted from a previous checkpoint? I would expect the watermark in a 
TimerService also to go back in time.


I have the following ProcessFunction implementation.


  override def processElement(
    e: TraceEvent,
    ctx: ProcessFunction[
      TraceEvent,
      Trace
    ]#Context,
    out: Collector[Trace]
  ): Unit = {


    if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
      registry.counter("tracing.outOfOrderEvents").increment()
    } else {
    ....
    }


I am noticing the outOfOrderEvents are reported until new events are read in to 
the stream since the last restart.


Regards,
Nara

Reply via email to