[ 
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498061#comment-17498061
 ] 

Matteo edited comment on KAFKA-13678 at 2/25/22, 11:07 AM:
-----------------------------------------------------------

hi guys, I'm Matteo and I'm working in the same team of Lorenzo.

I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).


was (Author: JIRAUSER285799):
I can't get your point about determinism. If the punctuator set the next 
ring-time with respect to the current event-timestamp, the determinism would 
perfectly hold even in case of reprocessing.

Let's examine a practical use case: I want to monitor 2 sensors. The sensors 
are designed to send an "ok" signal and, sometimes, an "error" signal, when 
things are wrong. Let's imagine we want to be notified when a sensor sends an 
"error" signal and no "ok" signals in the next 5 minutes. Something like "if 
you're in an error state and the situation doesn't change for 5 minutes, then I 
want to take a particular action". On the other hand, if we receive an error 
signal but an "ok" signal arrives in the next 5 minutes, then we don't care 
about the error.
Now, let's imagine this situation: we receive an "error" signal from sensor 1, 
at event time t0. After a while we receive an "error signal" at event time t1, 
this time from sensor 2. We expect the behavior to be waked up at time t0+5min 
and at time t1+5min to take the appropriate actions (as soon as we do not 
receive any "ok" signal in the meanwhile). First, let's clarify that the time 
reference has necessarily to be the event time: indeed, if we receive an 
"error" event and the ingestion stops for 10 minutes (for example because of a 
network problem) we don't want to trigger any action as, actually, the sensor 
could have sent an "ok" signal in the meanwhile but we weren't able to consume 
it yet.
With the current punctuator semantic, this use case is impossible to implement.

On the other hand, making a punctuator to set the "wake up" trigger with 
respect to the current event timestamp would do the work. Two notes here:
1) the "current" timestamp event is of course a "best effort" approach as the 
granularity (and, so, the precision) of the time measurement depends on the 
granularity of the incoming events.
2) the semantic of a component like that would remain the same no matter if you 
are in "real time" or "reprocess" situation, preserving the determinism 
(please, give me a counter-example that could explain why the determinism 
wouldn't hold).

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-13678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13678
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.0.0
>            Reporter: Lorenzo Cagnatel
>            Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs 
> immediately as documented, but the second one is not triggered at *t_schedule 
> + interval* but it could happen before that time. 
> For example, assume that we schedule a punctuation every 10 sec at timestamp 
> 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one, 
> breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type, 
> final Punctuator punctuator) {
>    switch (type) {
>       case STREAM_TIME:
>          // align punctuation to 0L, punctuate as soon as we have data
>          return schedule(0L, interval, type, punctuator);
>       case WALL_CLOCK_TIME:
>          // align punctuation to now, punctuate after interval has elapsed
>          return schedule(time.milliseconds() + interval, interval, type, 
> punctuator);
>       default:
>          throw new IllegalArgumentException("Unrecognized PunctuationType: " 
> + type);
>    }
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to