[ https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903148#comment-16903148 ]
Matthias J. Sax commented on KAFKA-8769: ---------------------------------------- We also need to consider the overhead. It not uncommon to have use-cased with millions of unique keys and key-based tracking might be cost prohibitive. > Consider computing stream time independently per key > ---------------------------------------------------- > > Key: KAFKA-8769 > URL: https://issues.apache.org/jira/browse/KAFKA-8769 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Priority: Major > > Currently, Streams uses a concept of "stream time", which is computed as the > highest timestamp observed by stateful operators, per partition. This concept > of time backs grace period, retention time, and suppression. > For use cases in which data is produced to topics in roughly chronological > order (as in db change capture), this reckoning is fine. > Some use cases have a different pattern, though. For example, in IOT > applications, it's common for sensors to save up quite a bit of data and then > dump it all at once into the topic. See > https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware > for a concrete example of the use case. > I have heard of cases where each sensor dumps 24 hours' worth of data at a > time into the topic. This results in a pattern in which, when reading a > single partition, the operators observe a lot of consecutive records for one > key that increase in timestamp for 24 hours, then a bunch of consecutive > records for another key that are also increasing in timestamp over the same > 24 hour period. With our current stream-time definition, this means that the > partition's stream time increases while reading the first key's data, but > then stays paused while reading the second key's data, since the second batch > of records all have timestamps in the "past". > E.g: > {noformat} > A@t0 (stream time: 0) > A@t1 (stream time: 1) > A@t2 (stream time: 2) > A@t3 (stream time: 3) > B@t0 (stream time: 3) > B@t1 (stream time: 3) > B@t2 (stream time: 3) > B@t3 (stream time: 3) > {noformat} > This pattern results in an unfortunate compromise in which folks are required > to set the grace period to the max expected time skew, for example 24 hours, > or Streams will just drop the second key's data (since it is late). But, this > means that if they want to use Suppression for "final results", they have to > wait 24 hours for the result. > This tradeoff is not strictly necessary, though, because each key represents > a logically independent sequence of events. Tracking by partition is simply > convenient, but typically not logically meaningful. That is, the partitions > are just physically independent sequences of events, so it's convenient to > track stream time at this granularity. It would be just as correct, and more > useful for IOT-like use cases, to track time independently for each key. > However, before considering this change, we need to solve the > testing/low-traffic problem. This is the opposite issue, where a partition > doesn't get enough traffic to advance stream time and results remain "stuck" > in the suppression buffers. We can provide some mechanism to force the > advancement of time across all partitions, for use in testing when you want > to flush out all results, or in production when some topic is low volume. We > shouldn't consider tracking time _more_ granularly until this problem is > solved, since it would just make the low-traffic problem worse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)