[ https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ben Ellis updated KAFKA-12446: ------------------------------ Summary: Define KGroupedTable#aggregate subtractor + adder order of execution (was: Make ) > Define KGroupedTable#aggregate subtractor + adder order of execution > -------------------------------------------------------------------- > > Key: KAFKA-12446 > URL: https://issues.apache.org/jira/browse/KAFKA-12446 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Ben Ellis > Priority: Minor > > Currently, when an update is processed by KGroupedTable#aggregate, the > subtractor is called first, then the adder. But per the docs the order of > execution is not defined (ie. could change in future releases). > [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating] > {quote}When subsequent non-null values are received for a key (e.g., UPDATE), > then (1) the subtractor is called with the old value as stored in the table > and (2) the adder is called with the new value of the input record that was > just received. The order of execution for the subtractor and adder is not > defined. > {quote} > This ticket proposes making the current order of execution part of the public > contract. > That would allow Kafka Streams DSL users the freedom to use aggregates such > as: > {{aggregate(}} > {{ HashMap::new,}} > {{ (aggKey, newValue, aggValue) ->}}{{{ // adder}} > {{ aggValue.put(newValue.getKey(), newValue.getValue())}} > {{ return aggValue; }}}{{,}} > {{ (aggKey, oldValue, aggValue) ->}}{{{ // subtractor}} > {{ aggValue.remove(oldValue.getKey())}} > {{ return aggValue;}} > {{ }}} > {{)}} > and handle updates where key remains the same but value changes. > The Kafka Music Example at > [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345] > relies on the subtractor being called first. > > See discussion at > [https://github.com/confluentinc/kafka-streams-examples/issues/380] > See also the more general point made at > [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined] > > {quote}If the adder and subtractor are non-commutative operations and the > order in which they are executed can vary, you can end up with different > results depending on the order of execution of adder and subtractor. An > example of a useful non-commutative operation would be something like if > we’re aggregating records into a Set: > {quote} > > {{.aggregate[Set[Animal]](Set.empty)(}} > {{ adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + > animalValue,}} > {{ subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > animalValue}} > {{)}} > {quote}In this example, for duplicated events, if the adder is called before > the subtractor you would end up removing the value entirely from the set > (which would be problematic for most use-cases I imagine). > {quote} > As [~mjsax] notes on > [https://github.com/confluentinc/kafka-streams-examples/issues/380] > > {quote}the implementation used the same order since 0.10.0 release and it was > never changed > {quote} > so making this behavior part of the standard amounts to making official what > has already been stable for a long time. > Cost: > * Limits your options for the future. If you ever needed Kafka Streams to > change the order of execution (or make that order indeterminate instead of > its current hard coded order), you would have to make that a breaking change. > Benefit: > * Encourages wider use of the KGroupedTable#aggregate method (current lack > of a defined order prevents using aggregate with non-commutative > adder/subtractor functions) > * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing > that a given order can be relied upon makes the method itself easier to > understand) > > > ---- > -- This message was sent by Atlassian Jira (v8.3.4#803005)