[ 
https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12446:
------------------------------------
    Labels: kip  (was: )

> Define KGroupedTable#aggregate subtractor + adder order of execution
> --------------------------------------------------------------------
>
>                 Key: KAFKA-12446
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12446
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ben Ellis
>            Assignee: Ben Ellis
>            Priority: Minor
>              Labels: kip
>
> Currently, when an update is processed by KGroupedTable#aggregate, the 
> subtractor is called first, then the adder. But per the docs the order of 
> execution is not defined (ie. could change in future releases).
> [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
> {quote}When subsequent non-null values are received for a key (e.g., UPDATE), 
> then (1) the subtractor is called with the old value as stored in the table 
> and (2) the adder is called with the new value of the input record that was 
> just received. The order of execution for the subtractor and adder is not 
> defined.
> {quote}
> This ticket proposes making the current order of execution part of the public 
> contract.
> That would allow Kafka Streams DSL users the freedom to use aggregates such 
> as: 
> {code:java}
> aggregate(
>   HashMap::new,
>   (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), 
> newValue.getValue() }, // adder
>   (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // 
> subtractor
> ){code}
> and handle updates where key remains the same but value changes.
> The Kafka Music Example at
> [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345]
> relies on the subtractor being called first.
>  
> See discussion at 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
> See also the more general point made at 
> [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined]
>  
> {quote}If the adder and subtractor are non-commutative operations and the 
> order in which they are executed can vary, you can end up with different 
> results depending on the order of execution of adder and subtractor. An 
> example of a useful non-commutative operation would be something like if 
> we’re aggregating records into a Set:{color:#172b4d} {color}
> {quote}
> {code:java}
> .aggregate[Set[Animal]](Set.empty)(
>  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
>  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - 
> animalValue
> )
> {code}
> {quote}In this example, for duplicated events, if the adder is called before 
> the subtractor you would end up removing the value entirely from the set 
> (which would be problematic for most use-cases I imagine).
> {quote}
> As [~mjsax] notes on 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
>  
> {quote}the implementation used the same order since 0.10.0 release and it was 
> never changed
> {quote}
> so making this behavior part of the standard amounts to making official what 
> has already been stable for a long time.
> Cost:
>  *  Limits your options for the future. If you ever needed Kafka Streams to 
> change the order of execution (or make that order indeterminate instead of 
> its current hard coded order), you would have to make that a breaking change.
> Benefit:
>  * Encourages wider use of the KGroupedTable#aggregate method (current lack 
> of a defined order prevents using aggregate with non-commutative 
> adder/subtractor functions)
>  * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing 
> that a given order can be relied upon makes the method itself easier to 
> understand)
>  
>  
> ----
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to