[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-23 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/24/22 5:55 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is*  effectively guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-23 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/24/22 5:53 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no record arrives for the 'next' window for a 
certain key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 4:00 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have large percent 
of our aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 3:59 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have large percent 
of our aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 1:10 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-19 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 1:10 PM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences).  With evict per-key we hope to have our 
aggregations available for users less than 2 minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  Or 
declaratively in streams-props as 'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-11-25 Thread Dmitry Sergeev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16981396#comment-16981396
 ] 

Dmitry Sergeev edited comment on KAFKA-8769 at 11/25/19 1:29 PM:
-

Hi, [~mjsax]! I'm facing the same problem with my current project. We collect 
health telemetry data from wearables and calculate the windowed average using 
Kafka Streams. The message key in our case is a user id. Sometimes a user is 
offline but the wearable still collects data and then uploads it when the user 
is back online. We're experiencing "Skipping record for expired window." log 
because KStreamWindowAggregate already saw a newer timestamp from other keys...


was (Author: dsergeev):
Hi, [~mjsax]! I'm facing the same problem with my current project. We collect 
health telemetry data from wearables and calculate the windowed average using 
Kafka Streams. The message key in our case is a user id. Sometimes a user is 
offline but the wearable still collects data and then uploads it when the user 
is back online. We're experiencing "Skipping record for expired window." log 
because PartitionGroup already saw a newer timestamp from other keys...

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-10-28 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961615#comment-16961615
 ] 

Richard Yu edited comment on KAFKA-8769 at 10/29/19 2:39 AM:
-

[~vvcephei]  [~mjsax]  Bumping the KIP. Want to see if we can get the KIP in.


was (Author: yohan123):
Bumping the KIP. Want to see if we can get the KIP in.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906692#comment-16906692
 ] 

Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:50 PM:
-

Notably, a problem with this approach is that keys which are evicted from the 
cache will no longer have their timestamps tracked. If the situation occurs 
when we receive a record whose key has no tracked timestamp, then we could 
insert the key with that record timestamp as that particular key's largest time 
into the cache. Admittedly, the order in which we receive records does not 
guarantee that we have a monotonically increasing timestamp, but the latest 
records we receive and its timestamp are fair approximations.


was (Author: yohan123):
Notably, a problem with this approach is that keys which are evicted from the 
cache will no longer have their timestamps tracked. If the situation occurs 
when we receive a record whose key has no tracked timestamp, then we could 
insert the key with that record timestamp as that particular key's partition 
time into the cache. Admittedly, the order in which we receive records does not 
guarantee that we have a monotonically increasing timestamp, but the latest 
records we receive and its timestamp are fair approximations.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2019-08-13 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16906674#comment-16906674
 ] 

Richard Yu edited comment on KAFKA-8769 at 8/13/19 10:29 PM:
-

[~vvcephei] On the issue regarding low traffic: would this mechanism 
necessarily be a public API and what could it look like?  Just wondering if 
this requires a KIP.

And about the scalability of the key-based partition time tracking. Just some 
thoughts at the moment. We don't have to store _all_ the keys and its 
timestamp. That really would be wasteful if at least a significant portion of 
them is not used often at a particular time. Instead, we could use some sort of 
cache (maybe LFU or LRU) which could store the most popular keys at a certain 
period in a StreamTask's lifetime.  


was (Author: yohan123):
[~vvcephei] On the issue regarding low traffic: would this mechanism 
necessarily be a public API and what could it look like?  Just wondering if 
this requires a KIP.

And about the scalability of the key-based partition time tracking. Just some 
thoughts at the moment. We don't have to store _all_ the keys and its 
timestamp. That really would be wasteful if at least a significant portion of 
them is not used often at a particular time. Instead, we could use some sort of 
cache (maybe LFU or LRU) which could store the most popular keys at a certain 
period in time.  

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)