The problem is that Kafka Streams need to repartition the streams based on
the groupBy keys when doing aggregations. For your case, the original
stream may be read from a topic that is partitioned on "K", and you need to
first repartition on "category" on an intermediate topic before the
aggregation can be executed.

Hence the old and new value may be sent to two different partitions of the
intermediate topic, and hence be processed by two different process (it
won't be the case in your application, since you mentioned the "category"
will never change). Since the library cannot tell if the groupBy key will
never change, it has to be conservative and do this subtract / add process
while receiving the old / new value.


Guozhang


On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Guozhang,
>
> Thanks for responding.  Ah, I see what you're saying... in the case of
> an update to the KTable, the aggregator's subtractor result would be
> necessary if the group-by key changes in the update.
>
> It makes sense, but unfortunately the behavior leaves me feeling a
> little sketchy... when the group-by key doesn't change (which is
> guaranteed in my case), I'm outputting results that don't correspond
> at all to the inputs, temporarily.  It's immediately followed by a
> corrected result.
>
> Would it be a feasible optimization to not send the subtractor's
> result out of the aggregate, only in the case where the groupBy key
> does not change between the old record and the new record?
>
> Mathieu
>
> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Hello Mathieu,
> >
> > Note that semantics of KTable aggregations (i.e.
> "KTable.groupBy.aggregate"
> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" as
> in
> > 0.10.0) are different, in the sense that when the table is updated (i.e.
> a
> > new record with the same key "K1" is received), the old record's effect
> on
> > the aggregation need to first be subtracted before the new record's
> effect
> > on the aggregation can be added; whereas in the latter case there is no
> > "old values" that are not overridden, hence only "adder" aggregator is
> > needed.
> >
> > So suppose your updated record on K1 is on a different "category", say:
> >
> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
> >
> >
> > Then the aggregated result should be:
> >
> > {key: "kafka", value: 2}
> > {key: "kafka2", value: 4}
> >
> >
> > Does this make sense now?
> >
> > Guozhang
> >
> >
> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> >> Hello again, kafka-users,
> >>
> >> When I aggregate a KTable, a future input that updates a KTable's
> >> value for a specific key causes the aggregate's subtractor to be
> >> invoked, and then its adder.  This part is great, completely
> >> as-expected.
> >>
> >> But what I didn't expect is that the intermediate result of the
> >> subtractor would be sent downstream.  This value doesn't reflect the
> >> reality of the inputs to the aggregator, so sending it downstream is
> >> effectively sending "corrupt" data to the next processing node.  Is
> >> this the expected behavior, or is this a bug?
> >>
> >> Take for example, a table of blog articles and an aggregator that
> >> counts the number of words in each category of the blog:
> >>
> >> topic: articles
> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
> >>   K2, {"category": "kafka", "text": "word1, word2"}
> >>
> >> articles.groupBy((k,v) -> v.category)
> >>   .aggregate(() -> 0,
> >>     (k,v,t) -> t + v.text.split(" ").length,
> >>     (k,v,t) -> t - v.text.split(" ").length
> >>   )
> >>
> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
> >> "kafka", value: 5}.  If I update one of the blog articles and send a
> >> new message to the articles topic:
> >>
> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
> >>
> >> The aggregator will first produce {key: "kafka", value: 2} when the
> >> subtractor is called, then will produce {key: "kafka", value: 6} when
> >> the adder is called.  The subtractor's calculation does not actually
> >> match the reality; K1 was never deleted, it was just updated.
> >>
> >> Mathieu
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to