Looking at the aggregate documentation
<http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating> one
of the required items is an "initializer", no arguments and returns a value.

Shouldn't this initializer follow a similar approach of Java's
computIfAbsent
<https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function->
and
pass the key being initialized to said "initializer"?

KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    (aggKey) -> 0L, // Difference here
    (aggKey, newValue, aggValue) -> ...

The documentation says "When a *record key* is received for the first time,
the initializer is called (and called before the adder).", so there may be
use cases where the new value may be influenced by the key being
initialized.

Reply via email to