fqaiser94 opened a new pull request #10747: URL: https://github.com/apache/kafka/pull/10747
# Problem For context, this issue was initially raised in the following [thread](https://lists.apache.org/thread.html/rc3c1d07375e25341ab0467e4f8526a88f9fcc825a766d15922a9ed7d%40%3Cusers.kafka.apache.org%3E ) on the Kafka users mailing list. During `KTable.groupBy`, we write into a repartition topic. Since the grouping key can change, we need to send separate events for the oldValue and the newValue to downstream nodes (where they will be “subtracted” and “added” respectively from/to the aggregate for the old key and the new key respectively). However, sending the oldValue and the newValue as separate events is not strictly necessary when the grouping key does not change and doing so poses two challenges for users: 1. Firstly, the resulting KTable (i.e. the result of `KTable.groupBy(???).aggregate(???)`) can briefly be in an “inconsistent” state where the oldValue has been “subtracted” from the aggregate for the key but the newValue has not yet been “added” to the aggregate of the key because each event (oldValue, newValue) is processed separately. 2. Secondly, if users fail to correctly configure their producers correctly to avoid re-ordering during `send()`, it’s possible the newValue may be sent (and therefore processed by the aggregator) before the oldValue. If the user’s`adder` and `subtractor` functions are non-commutative, this would put the aggregate in a permanently “inconsistent” state. Whilst there are ways to get around this issue by dropping down to the Processor API level, it would be nicer if this was handled by Kafka Streams more seamlessly. # Proposed solution If the grouping key has not changed, the oldValue and newValue events are guaranteed to be processed by the same processor. As such, we should be able to send them as a single `Change<T>` event. The subtractor and adder functions can then be executed (in that order) and the KTable can be updated in a single “atomic” operation. In this way, we are able to remove any possibility of ending up in an “inconsistent” state. Also, note that sending the oldValue and newValue in the same event ensures that they can’t be re-ordered relative to each other (irrespective of how a user has configured producer). This PR is an implementation of this idea but I have some concerns which I’m not sure how to handle: 1. Matthias previously mentioned that this fix depends on a correct implementation of `.equals()` method for the key type. Not sure what we can do about this other than perhaps add a doc somewhere to state this assumption for users? 2. With the change to the `ChangeSerializer` and `ChangeDeserializer` classes, I don’t think you will be able to just upgrade from a previous version of Kafka Streams easily. Not sure how these types of “breaking” changes are typically handled. Is it simply a matter of noting this in the relevant upgrade doc? Or do we want to write more code to handle this? # Why is the linked ticket KAFKA-12446? I’ve chosen KAFKA-12446 as the ticket number because it’s highly related but to be clear, this PR is doing much more than what the ticket is actually proposing. I can create a separate ticket for this but wanted to first see if there is any appetite for these stronger guarantees I'm proposing. If not, I’m happy to cut this PR down to just what is being asked for in the ticket (which is basically to just publicly document the existing behaviour). Please feel free to let me know if I’m going about this the wrong way. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org