One explanation would be that during catch up, data is consumer with higher
throughput because its just read from Kafka.
Hence, you'd see also more late data per minute while the job catches up
until it reads data at the rate at which it is produced into Kafka.

Would that explain your observations?

Best, Fabian

2018-05-30 23:56 GMT+02:00 Narayanan Arunachalam <
narayanan.arunacha...@gmail.com>:

> Thanks for the explanation. I looked at this metric closely and noticed
> there are some events arriving in out of order. The hypothesis I have is,
> when the job is restarted, all of the small out of order chunks add up and
> show a significant number. The graph below shows the number of out of order
> events every min. The job was started with new state at 11:53 am and then
> restarted with the previous checkpoint at 1:24 pm.
>
> That said, after restart the out of order events number is very high
> though :thinking_face:
>
>
>
>
>
> On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Nara and Sihua,
>>
>> That's indeed an unexpected behavior and it would be good to identify the
>> reason for the late data.
>>
>> As Sihua said, watermarks are currently not checkpointed and reset to
>> Long.MIN_VALUE upon restart.
>> AFAIK, the main reason why WMs are not checkpointed is that the special
>> type of operator state that is required for this (union-list state) wasn't
>> available when the mechanism was implemented.
>> I think there are plans to address this shortcoming (see FLINK-5601 [1]).
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-5601
>>
>> 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
>> narayanan.arunacha...@gmail.com>:
>>
>>> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
>>> outOfOrderEvents are reported. Because the event time on the data will
>>> always be greater than Long.MIN_VALUE.
>>>
>>> Following are the steps to reproduce this scenario.
>>> - A source to produce events with timestamps that is increasing for
>>> every event produced
>>> - Use TimeCharacteristic.EventTime
>>> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness
>>> set to 60s.
>>> - Enable checkpoints
>>> - ProcessFunction impl to report a counter to some metrics backend when
>>> the timestamp of the event is less than currentWatermark
>>> - No out of order events will be reported initially. After few
>>> checkpoints are created, cancel and restart the job from a previous
>>> checkpoint.
>>>
>>> *Note*: The event stream really doesn't have out of order data. Job
>>> restart from a checkpoint causes this artificial out of order events
>>> because of the watermark value.
>>>
>>> Regards,
>>> Nara
>>>
>>>
>>>
>>>
>>> On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote:
>>>
>>>> Hi Nara,
>>>>
>>>> yes, the watermark in TimerService is not covered by the checkpoint,
>>>> everytime the job is restarted from a previous checkpoint, it is reset to
>>>> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
>>>> especially when we need to support rescaling(it seems not like a purely
>>>> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
>>>> useful information about why it wasn't covered by the checkpoint.
>>>>
>>>> Best, Sihua
>>>>
>>>>
>>>>
>>>> On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha
>>>> l...@gmail.com> <narayanan.arunacha...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Is it possible the watermark in TimerService not getting reset when a
>>>> job is restarted from a previous checkpoint? I would expect the watermark
>>>> in a TimerService also to go back in time.
>>>>
>>>> I have the following ProcessFunction implementation.
>>>>
>>>>   override def processElement(
>>>>     e: TraceEvent,
>>>>     ctx: ProcessFunction[
>>>>       TraceEvent,
>>>>       Trace
>>>>     ]#Context,
>>>>     out: Collector[Trace]
>>>>   ): Unit = {
>>>>
>>>>     if (e.getAnnotationTime() < ctx.timerService().currentWatermark())
>>>> {
>>>>       registry.counter("tracing.outOfOrderEvents").increment()
>>>>     } else {
>>>>     ....
>>>>     }
>>>>
>>>> I am noticing the outOfOrderEvents are reported until new events are
>>>> read in to the stream since the last restart.
>>>>
>>>> Regards,
>>>> Nara
>>>>
>>>>
>>>
>>
>

Reply via email to