Thanks for the feedback. In the case of an aggregator I think it is simpler since we already have access to the key in the "Aggregator" implementation, meaning that we can already do something wrong with the current API if we're not paying attention.
At any rate I'll wait for new developments. On Thu, May 4, 2017 at 9:55 PM Matthias J. Sax <matth...@confluent.io> wrote: > Yes. Key can be of any type and we cannot enforce immutable types at API > level, and thus, it could get modified as a "side effect". > > The problem is, that if the key would be modified, it would corrupt data > partitioning and thus would lead to wrong result. > > It's not possible to modify the key via return -- the returned value > will only be the initial value for the aggregation. > > > -Matthias > > On 5/4/17 6:00 PM, João Peixoto wrote: > > So the reasoning is that the key may be a mutable object which in term > > could potentially cause disaster? > > > > Just clarifying because I think the initializer should only return a > value > > (as it does right now). > > On Thu, May 4, 2017 at 3:02 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Currently, you don't get any hold on the key, because the key must be > >> protected from modification. > >> > >> Thus, the initial value must be the same for all keys atm. > >> > >> We are aware that this limits some use cases and there is already a > >> proposal to add keys to some API. > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > >> > >> It might make sense to include the `Initializer` interface into KIP-149, > >> too. > >> > >> Atm, you would need to write some custom code to tackle this use case. > >> > >> -Matthias > >> > >> > >> On 5/3/17 10:38 PM, João Peixoto wrote: > >>> Looking at the aggregate documentation > >>> < > http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating> > >> one > >>> of the required items is an "initializer", no arguments and returns a > >> value. > >>> > >>> Shouldn't this initializer follow a similar approach of Java's > >>> computIfAbsent > >>> < > >> > https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function- > >>> > >>> and > >>> pass the key being initialized to said "initializer"? > >>> > >>> KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( > >>> (aggKey) -> 0L, // Difference here > >>> (aggKey, newValue, aggValue) -> ... > >>> > >>> The documentation says "When a *record key* is received for the first > >> time, > >>> the initializer is called (and called before the adder).", so there may > >> be > >>> use cases where the new value may be influenced by the key being > >>> initialized. > >>> > >> > >> > > > >