[ 
https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Ellis updated KAFKA-12446:
------------------------------
    Summary: Define KGroupedTable#aggregate subtractor + adder order of 
execution  (was: Make )

> Define KGroupedTable#aggregate subtractor + adder order of execution
> --------------------------------------------------------------------
>
>                 Key: KAFKA-12446
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12446
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ben Ellis
>            Priority: Minor
>
> Currently, when an update is processed by KGroupedTable#aggregate, the 
> subtractor is called first, then the adder. But per the docs the order of 
> execution is not defined (ie. could change in future releases).
> [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
> {quote}When subsequent non-null values are received for a key (e.g., UPDATE), 
> then (1) the subtractor is called with the old value as stored in the table 
> and (2) the adder is called with the new value of the input record that was 
> just received. The order of execution for the subtractor and adder is not 
> defined.
> {quote}
> This ticket proposes making the current order of execution part of the public 
> contract.
> That would allow Kafka Streams DSL users the freedom to use aggregates such 
> as: 
> {{aggregate(}}
> {{  HashMap::new,}}
> {{  (aggKey, newValue, aggValue) ->}}{{{ // adder}}
> {{     aggValue.put(newValue.getKey(), newValue.getValue())}}
> {{     return aggValue;   }}}{{,}}
> {{  (aggKey, oldValue, aggValue) ->}}{{{ // subtractor}}
> {{     aggValue.remove(oldValue.getKey())}}
> {{     return aggValue;}}
> {{   }}}
> {{)}}
> and handle updates where key remains the same but value changes.
> The Kafka Music Example at
> [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345]
> relies on the subtractor being called first.
>  
> See discussion at 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
> See also the more general point made at 
> [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined]
>  
> {quote}If the adder and subtractor are non-commutative operations and the 
> order in which they are executed can vary, you can end up with different 
> results depending on the order of execution of adder and subtractor. An 
> example of a useful non-commutative operation would be something like if 
> we’re aggregating records into a Set:
> {quote}
>  
> {{.aggregate[Set[Animal]](Set.empty)(}}
> {{  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + 
> animalValue,}}
> {{  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - 
> animalValue}}
> {{)}}
> {quote}In this example, for duplicated events, if the adder is called before 
> the subtractor you would end up removing the value entirely from the set 
> (which would be problematic for most use-cases I imagine).
> {quote}
> As [~mjsax] notes on 
> [https://github.com/confluentinc/kafka-streams-examples/issues/380]
>  
> {quote}the implementation used the same order since 0.10.0 release and it was 
> never changed
> {quote}
> so making this behavior part of the standard amounts to making official what 
> has already been stable for a long time.
> Cost:
>  *  Limits your options for the future. If you ever needed Kafka Streams to 
> change the order of execution (or make that order indeterminate instead of 
> its current hard coded order), you would have to make that a breaking change.
> Benefit:
>  * Encourages wider use of the KGroupedTable#aggregate method (current lack 
> of a defined order prevents using aggregate with non-commutative 
> adder/subtractor functions)
>  * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing 
> that a given order can be relied upon makes the method itself easier to 
> understand)
>  
>  
> ----
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to