[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-23 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/24/22 5:55 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is*  effectively guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-23 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/24/22 5:53 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 4:00 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have large percent 
of our aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 3:59 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have large percent 
of our aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 1:10 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 1:10 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time 

[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV commented on KAFKA-8769:


Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)