fqaiser94 opened a new pull request #10747:
URL: https://github.com/apache/kafka/pull/10747


   # Problem
   For context, this issue was initially raised in the following 
[thread](https://lists.apache.org/thread.html/rc3c1d07375e25341ab0467e4f8526a88f9fcc825a766d15922a9ed7d%40%3Cusers.kafka.apache.org%3E
 ) on the Kafka users mailing list. During `KTable.groupBy`, we write into a 
repartition topic. Since the grouping key can change, we need to send separate 
events for the oldValue and the newValue to downstream nodes (where they will 
be “subtracted” and “added” respectively from/to the aggregate for the old key 
and the new key respectively). 
   
   However, sending the oldValue and the newValue as separate events is not 
strictly necessary when the grouping key does not change and doing so poses two 
challenges for users: 
   1. Firstly, the resulting KTable (i.e. the result of 
`KTable.groupBy(???).aggregate(???)`) can briefly be in an “inconsistent” state 
where the oldValue has been “subtracted” from the aggregate for the key but the 
newValue has not yet been “added” to the aggregate of the key because each 
event (oldValue, newValue) is processed separately.  
   2. Secondly, if users fail to correctly configure their producers correctly 
to avoid re-ordering during `send()`, it’s possible the newValue may be sent 
(and therefore processed by the aggregator) before the oldValue. If the 
user’s`adder` and `subtractor` functions are non-commutative, this would put 
the aggregate in a permanently “inconsistent” state. 
   
   Whilst there are ways to get around this issue by dropping down to the 
Processor API level, it would be nicer if this was handled by Kafka Streams 
more seamlessly. 
   
   # Proposed solution
   If the grouping key has not changed, the oldValue and newValue events are 
guaranteed to be processed by the same processor. 
As such, we should be able 
to send them as a single `Change<T>` event. The subtractor and adder functions 
can then be executed (in that order) and the KTable can be updated in a single 
“atomic” operation. In this way, we are able to remove any possibility of 
ending up in an “inconsistent” state. Also, note that sending the oldValue and 
newValue in the same event ensures that they can’t be re-ordered relative to 
each other (irrespective of how a user has configured producer). 
   
   This PR is an implementation of this idea but I have some concerns which I’m 
not sure how to handle: 
   1. Matthias previously mentioned that this fix depends on a correct 
implementation of `.equals()` method for the key type. 
Not sure what we can do 
about this other than perhaps add a doc somewhere to state this assumption for 
users? 
   2. With the change to the `ChangeSerializer` and `ChangeDeserializer` 
classes, I don’t think you will be able to just upgrade from a previous version 
of Kafka Streams easily. Not sure how these types of “breaking” changes are 
typically handled. Is it simply a matter of noting this in the relevant upgrade 
doc? Or do we want to write  more code to handle this? 
   
   # Why is the linked ticket KAFKA-12446?
   I’ve chosen KAFKA-12446 as the ticket number because it’s highly related but 
to be clear, this PR is doing much more than what the ticket is actually 
proposing. I can create a separate ticket for this but wanted to first see if 
there is any appetite for these stronger guarantees I'm proposing. 
If not, I’m 
happy to cut this PR down to just what is being asked for in the ticket (which 
is basically to just publicly document the existing behaviour). 
Please feel 
free to let me know if I’m going about this the wrong way. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to