[ https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-12446: ------------------------------------ Labels: kip (was: ) > Define KGroupedTable#aggregate subtractor + adder order of execution > -------------------------------------------------------------------- > > Key: KAFKA-12446 > URL: https://issues.apache.org/jira/browse/KAFKA-12446 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Ben Ellis > Assignee: Ben Ellis > Priority: Minor > Labels: kip > > Currently, when an update is processed by KGroupedTable#aggregate, the > subtractor is called first, then the adder. But per the docs the order of > execution is not defined (ie. could change in future releases). > [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating] > {quote}When subsequent non-null values are received for a key (e.g., UPDATE), > then (1) the subtractor is called with the old value as stored in the table > and (2) the adder is called with the new value of the input record that was > just received. The order of execution for the subtractor and adder is not > defined. > {quote} > This ticket proposes making the current order of execution part of the public > contract. > That would allow Kafka Streams DSL users the freedom to use aggregates such > as: > {code:java} > aggregate( > HashMap::new, > (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), > newValue.getValue() }, // adder > (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // > subtractor > ){code} > and handle updates where key remains the same but value changes. > The Kafka Music Example at > [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345] > relies on the subtractor being called first. > > See discussion at > [https://github.com/confluentinc/kafka-streams-examples/issues/380] > See also the more general point made at > [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined] > > {quote}If the adder and subtractor are non-commutative operations and the > order in which they are executed can vary, you can end up with different > results depending on the order of execution of adder and subtractor. An > example of a useful non-commutative operation would be something like if > we’re aggregating records into a Set:{color:#172b4d} {color} > {quote} > {code:java} > .aggregate[Set[Animal]](Set.empty)( > adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue, > subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - > animalValue > ) > {code} > {quote}In this example, for duplicated events, if the adder is called before > the subtractor you would end up removing the value entirely from the set > (which would be problematic for most use-cases I imagine). > {quote} > As [~mjsax] notes on > [https://github.com/confluentinc/kafka-streams-examples/issues/380] > > {quote}the implementation used the same order since 0.10.0 release and it was > never changed > {quote} > so making this behavior part of the standard amounts to making official what > has already been stable for a long time. > Cost: > * Limits your options for the future. If you ever needed Kafka Streams to > change the order of execution (or make that order indeterminate instead of > its current hard coded order), you would have to make that a breaking change. > Benefit: > * Encourages wider use of the KGroupedTable#aggregate method (current lack > of a defined order prevents using aggregate with non-commutative > adder/subtractor functions) > * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing > that a given order can be relied upon makes the method itself easier to > understand) > > > ---- > -- This message was sent by Atlassian Jira (v8.20.10#820010)