Hi Nara,
yes, the watermark in TimerService is not covered by the checkpoint, everytime the job is restarted from a previous checkpoint, it is reset to Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint, especially when we need to support rescaling(it seems not like a purely keyed or a operate state), maybe @Stefan or @Aljoscha could give you more useful information about why it wasn't covered by the checkpoint. Best, Sihua On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha...@gmail.com> wrote: Hi, Is it possible the watermark in TimerService not getting reset when a job is restarted from a previous checkpoint? I would expect the watermark in a TimerService also to go back in time. I have the following ProcessFunction implementation. override def processElement( e: TraceEvent, ctx: ProcessFunction[ TraceEvent, Trace ]#Context, out: Collector[Trace] ): Unit = { if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) { registry.counter("tracing.outOfOrderEvents").increment() } else { .... } I am noticing the outOfOrderEvents are reported until new events are read in to the stream since the last restart. Regards, Nara