[
https://issues.apache.org/jira/browse/KAFKA-18069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906243#comment-17906243
]
Khoa Nguyen commented on KAFKA-18069:
-------------------------------------
Hi [~rhishikeshj], can I take this one?
> Out of order output from windowed aggregations
> ----------------------------------------------
>
> Key: KAFKA-18069
> URL: https://issues.apache.org/jira/browse/KAFKA-18069
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.9.0
> Environment: 1 Kafka broker running in docker.
> Streams app written in Clojure using java lib version 3.9.0
> Reporter: Rhishikesh Joshi
> Priority: Major
>
> We have an input topic which sends many messages to the topic every 250ms.
> (say 10k). The key is unique id from a set of IDs. So every set of messages
> at the 250ms mark will have some subset of the overall keys (product-ids etc)
> We have a kafka streams app running windowed aggregations over it.
> Aggregations use grouping by the same key as the key of the input topic. And
> aggregate for 10s windows over it, resulting in 1 value for each key for
> every 10 second window.
> We are using suppression for this with an `untilWindowCloses` suppression so
> that we get only 1 value per grouping key for every aggregation window.
> We are not using any grace periods since our input topic is guaranteed to
> send messages in timestamp order (timestamp is a property of the event itself)
> All topics have 3 partitions each
> Say we have 2 application instances running these stream topologies. Number
> of stream threads is set to 3.
> Now our problem seems to be, that in the event of a rebalance, say if one of
> the apps terminates or restarts for some other reason, we see out of order
> messages on the output topic on individual partitions
> So for example, messages for the aggregation window for 10:00:00, seems to
> land up on the partition after the messages for window 10:00:20 etc.
> We are using processing.guarantee = exactly_once_v2
>
> Further,
> Looks like there's an even simpler way to reproduce this issue.
> If we just run 1 streams app instance with 3 stream threads (3 partitions for
> the input topic). Stop the streams app by killing it and then restart the
> same app, we start seeing out of order messages on the output topic. So
> window aggregations for an earlier window end up on the partition after a
> later window.
> In our streams app, we do have an uncaught exception handler (a java one, not
> the streams uncaught handler) and we ensure that we close the streams cleanly
> in this case. And i can confirm that i do see the state go from pending
> shutdown to not running. So no unclean or uncommitted state anywhere i
> suppose.
> Yet when i start the streams app back up, there's out-of-order messages on
> output.
>
> Suggestion from Matthias Sax on slack
> {quote}The only other thing you could try is, to move off suppress() and use
> windowedBy(...).emitStrategy(...).aggregate(...) instead
> {quote}
>
> Tried this but that also does not help, so maybe its not related just to
> suppression.
>
> Seems like a pretty routine use-case is failing. or am i wrong in assuming
> that kafka streams can guarantee output ordering of windowed aggregations?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)