Currently, you don't get any hold on the key, because the key must be
protected from modification.

Thus, the initial value must be the same for all keys atm.

We are aware that this limits some use cases and there is already a
proposal to add keys to some API.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner

It might make sense to include the `Initializer` interface into KIP-149,
too.

Atm, you would need to write some custom code to tackle this use case.

-Matthias


On 5/3/17 10:38 PM, João Peixoto wrote:
> Looking at the aggregate documentation
> <http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating> one
> of the required items is an "initializer", no arguments and returns a value.
> 
> Shouldn't this initializer follow a similar approach of Java's
> computIfAbsent
> <https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function->
> and
> pass the key being initialized to said "initializer"?
> 
> KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
>     (aggKey) -> 0L, // Difference here
>     (aggKey, newValue, aggValue) -> ...
> 
> The documentation says "When a *record key* is received for the first time,
> the initializer is called (and called before the adder).", so there may be
> use cases where the new value may be influenced by the key being
> initialized.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to