Rhishikesh Joshi created KAFKA-18069:
----------------------------------------
Summary: Out of order output from windowed aggregations
Key: KAFKA-18069
URL: https://issues.apache.org/jira/browse/KAFKA-18069
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.9.0
Environment: 1 Kafka broker running in docker.
Streams app written in Clojure using java lib version 3.9.0
Reporter: Rhishikesh Joshi
We have an input topic which sends many messages to the topic every 250ms. (say
10k). The key is unique id from a set of IDs. So every set of messages at the
250ms mark will have some subset of the overall keys (product-ids etc)
We have a kafka streams app running windowed aggregations over it. Aggregations
use grouping by the same key as the key of the input topic. And aggregate for
10s windows over it, resulting in 1 value for each key for every 10 second
window.
We are using suppression for this with an `untilWindowCloses` suppression so
that we get only 1 value per grouping key for every aggregation window.
We are not using any grace periods since our input topic is guaranteed to send
messages in timestamp order (timestamp is a property of the event itself)
All topics have 3 partitions each
Say we have 2 application instances running these stream topologies. Number of
stream threads is set to 3.
Now our problem seems to be, that in the event of a rebalance, say if one of
the apps terminates or restarts for some other reason, we see out of order
messages on the output topic on individual partitions
So for example, messages for the aggregation window for 10:00:00, seems to land
up on the partition after the messages for window 10:00:20 etc.
We are using processing.guarantee = exactly_once_v2
Further,
Looks like there's an even simpler way to reproduce this issue.
If we just run 1 streams app instance with 3 stream threads (3 partitions for
the input topic). Stop the streams app by killing it and then restart the same
app, we start seeing out of order messages on the output topic. So window
aggregations for an earlier window end up on the partition after a later window.
In our streams app, we do have an uncaught exception handler (a java one, not
the streams uncaught handler) and we ensure that we close the streams cleanly
in this case. And i can confirm that i do see the state go from pending
shutdown to not running. So no unclean or uncommitted state anywhere i suppose.
Yet when i start the streams app back up, there's out-of-order messages on
output.
Suggestion from Matthias Jax on slack
{quote}The only other thing you could try is, to move off suppress() and use
windowedBy(...).emitStrategy(...).aggregate(...) instead
{quote}
Tried this but that also does not help, so maybe its not related just to
suppression.
Seems like a pretty routine use-case is failing. or am i wrong in assuming that
kafka streams can guarantee output ordering of windowed aggregations?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)