John Roesler created KAFKA-8769:
-----------------------------------

             Summary: Consider computing stream time independently per key
                 Key: KAFKA-8769
                 URL: https://issues.apache.org/jira/browse/KAFKA-8769
             Project: Kafka
          Issue Type: Improvement
            Reporter: John Roesler


Currently, Streams uses a concept of "stream time", which is computed as the 
highest timestamp observed by stateful operators, per partition. This concept 
of time backs grace period, retention time, and suppression.

For use cases in which data is produced to topics in roughly chronological 
order (as in db change capture), this reckoning is fine.

Some use cases have a different pattern, though. For example, in IOT 
applications, it's common for sensors to save up quite a bit of data and then 
dump it all at once into the topic. See 
https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
 for a concrete example of the use case.

I have heard of cases where each sensor dumps 24 hours' worth of data at a time 
into the topic. This results in a pattern in which, when reading a single 
partition, the operators observe a lot of consecutive records for one key that 
increase in timestamp for 24 hours, then a bunch of consecutive records for 
another key that are also increasing in timestamp over the same 24 hour period. 
With our current stream-time definition, this means that the partition's stream 
time increases while reading the first key's data, but then stays paused while 
reading the second key's data, since the second batch of records all have 
timestamps in the "past".

E.g:
{noformat}
A@t0 (stream time: 0)
A@t1 (stream time: 1)
A@t2 (stream time: 2)
A@t3 (stream time: 3)
B@t0 (stream time: 3)
B@t1 (stream time: 3)
B@t2 (stream time: 3)
B@t3 (stream time: 3)
{noformat}

This pattern results in an unfortunate compromise in which folks are required 
to set the grace period to the max expected time skew, for example 24 hours, or 
Streams will just drop the second key's data (since it is late). But, this 
means that if they want to use Suppression for "final results", they have to 
wait 24 hours for the result.

This tradeoff is not strictly necessary, though, because each key represents a 
logically independent sequence of events. Tracking by partition is simply 
convenient, but typically not logically meaningful. That is, the partitions are 
just physically independent sequences of events, so it's convenient to track 
stream time at this granularity. It would be just as correct, and more useful 
for IOT-like use cases, to track time independently for each key.

However, before considering this change, we need to solve the 
testing/low-traffic problem. This is the opposite issue, where a partition 
doesn't get enough traffic to advance stream time and results remain "stuck" in 
the suppression buffers. We can provide some mechanism to force the advancement 
of time across all partitions, for use in testing when you want to flush out 
all results, or in production when some topic is low volume. We shouldn't 
consider tracking time _more_ granularly until this problem is solved, since it 
would just make the low-traffic problem worse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to