The comment from Kasier Chen on the patch is correct:

1. Objects.equals() depends on the typed class object.equal() function,
which may not be safe: for example users can override the equals function
to only compare a subset of all the fields, whereas for Kafka Streams we
need these two objects to be exactly the same.

2. ChangedSer / Deser is used to serialize the Changed<> type that has only
one of the old / new value not null. You need to extend this encoder /
decoder in a backward compatible way.


Guozhang


On Wed, Aug 24, 2016 at 7:48 PM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hi Guozhang,
>
> I've been working around this issue by dropping down to the Processor
> API, but, I was hoping you might be able to point out if there is a
> flaw is in this proposed change:
>
>     https://github.com/apache/kafka/compare/trunk...
> mfenniak:suppress-duplicate-repartition-output
>
> This adjusts KTableRepartitionMap so that if there's no change in the
> group-by key, the repartition processor just forwards the changed
> value onwards.  (This breaks a couple of tests that anticipate the
> exact existing output, so don't consider this a complete patch...)
>
> Mathieu
>
>
> On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> > Hi Mathieu,
> >
> > If you are only interested in the aggregate result "snapshot" but not its
> > change stream (note that KTable itself is not actually a "table" as in
> > RDBMS, but still a stream), you can try to use the queryable state
> feature
> > that is available in trunk, which will be available in 0.10.1.0 release.
> >
> > In sum, it allows you to query any states "snapshot" which is used in
> > aggregation operators in real time with state store provided APIs such as
> > get-by-key, range queries on windows, etc. Details can be found in thie
> KIP
> > (we are working on more docs / blog posts at the time):
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 67%3A+Queryable+state+for+Kafka+Streams
> >
> > Guozhang
> >
> >
> > On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> > mathieu.fenn...@replicon.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> Hm... I hadn't thought of the repartitioning involvement.
> >>
> >> I'm not confident I'm understanding completely, but I believe you're
> >> saying the decision to process data in this way is made before the
> >> data being processed is available, because the partition *may* change,
> >> because the groupBy key *may* change.
> >>
> >> I'm still feeling that I'm stuck getting corrupted output in the
> >> middle of an aggregation.
> >>
> >> It's especially problematic for me if the updates to the source KTable
> >> don't actually affect the results of the aggregation.  In the
> >> word-count example in my original e-mail, this might be similar to
> >> editing an unrelated field "author" in any article; doesn't actually
> >> affect the groupBy, doesn't affect the aggregation, but still results
> >> in the wrong output occurring temporarily.  (and inefficient
> >> processing)
> >>
> >> Are there any tools in Kafka Streams that might help me prevent
> >> downstream calculations if the relevant inputs haven't changed?  I was
> >> thinking I'd be able to use mapValues to pluck only relevant fields
> >> out of a KTable, materialize a new KTable (.through) from that, and
> >> then there'd be some state from which KS would be able to only invoke
> >> downstream nodes if data has changed... but it doesn't seem to work
> >> like that.
> >>
> >> Thanks so much for your responses Guozhang, I really appreciate your
> >> time to help me out.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >> > The problem is that Kafka Streams need to repartition the streams
> based
> >> on
> >> > the groupBy keys when doing aggregations. For your case, the original
> >> > stream may be read from a topic that is partitioned on "K", and you
> need
> >> to
> >> > first repartition on "category" on an intermediate topic before the
> >> > aggregation can be executed.
> >> >
> >> > Hence the old and new value may be sent to two different partitions of
> >> the
> >> > intermediate topic, and hence be processed by two different process
> (it
> >> > won't be the case in your application, since you mentioned the
> "category"
> >> > will never change). Since the library cannot tell if the groupBy key
> will
> >> > never change, it has to be conservative and do this subtract / add
> >> process
> >> > while receiving the old / new value.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
> >> > mathieu.fenn...@replicon.com> wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Thanks for responding.  Ah, I see what you're saying... in the case
> of
> >> >> an update to the KTable, the aggregator's subtractor result would be
> >> >> necessary if the group-by key changes in the update.
> >> >>
> >> >> It makes sense, but unfortunately the behavior leaves me feeling a
> >> >> little sketchy... when the group-by key doesn't change (which is
> >> >> guaranteed in my case), I'm outputting results that don't correspond
> >> >> at all to the inputs, temporarily.  It's immediately followed by a
> >> >> corrected result.
> >> >>
> >> >> Would it be a feasible optimization to not send the subtractor's
> >> >> result out of the aggregate, only in the case where the groupBy key
> >> >> does not change between the old record and the new record?
> >> >>
> >> >> Mathieu
> >> >>
> >> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >> > Hello Mathieu,
> >> >> >
> >> >> > Note that semantics of KTable aggregations (i.e.
> >> >> "KTable.groupBy.aggregate"
> >> >> > as in 0.10.0) and KStream aggregations (i.e.
> "KStream.aggregateByKey"
> >> as
> >> >> in
> >> >> > 0.10.0) are different, in the sense that when the table is updated
> >> (i.e.
> >> >> a
> >> >> > new record with the same key "K1" is received), the old record's
> >> effect
> >> >> on
> >> >> > the aggregation need to first be subtracted before the new record's
> >> >> effect
> >> >> > on the aggregation can be added; whereas in the latter case there
> is
> >> no
> >> >> > "old values" that are not overridden, hence only "adder"
> aggregator is
> >> >> > needed.
> >> >> >
> >> >> > So suppose your updated record on K1 is on a different "category",
> >> say:
> >> >> >
> >> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
> >> >> >
> >> >> >
> >> >> > Then the aggregated result should be:
> >> >> >
> >> >> > {key: "kafka", value: 2}
> >> >> > {key: "kafka2", value: 4}
> >> >> >
> >> >> >
> >> >> > Does this make sense now?
> >> >> >
> >> >> > Guozhang
> >> >> >
> >> >> >
> >> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
> >> >> > mathieu.fenn...@replicon.com> wrote:
> >> >> >
> >> >> >> Hello again, kafka-users,
> >> >> >>
> >> >> >> When I aggregate a KTable, a future input that updates a KTable's
> >> >> >> value for a specific key causes the aggregate's subtractor to be
> >> >> >> invoked, and then its adder.  This part is great, completely
> >> >> >> as-expected.
> >> >> >>
> >> >> >> But what I didn't expect is that the intermediate result of the
> >> >> >> subtractor would be sent downstream.  This value doesn't reflect
> the
> >> >> >> reality of the inputs to the aggregator, so sending it downstream
> is
> >> >> >> effectively sending "corrupt" data to the next processing node.
> Is
> >> >> >> this the expected behavior, or is this a bug?
> >> >> >>
> >> >> >> Take for example, a table of blog articles and an aggregator that
> >> >> >> counts the number of words in each category of the blog:
> >> >> >>
> >> >> >> topic: articles
> >> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
> >> >> >>   K2, {"category": "kafka", "text": "word1, word2"}
> >> >> >>
> >> >> >> articles.groupBy((k,v) -> v.category)
> >> >> >>   .aggregate(() -> 0,
> >> >> >>     (k,v,t) -> t + v.text.split(" ").length,
> >> >> >>     (k,v,t) -> t - v.text.split(" ").length
> >> >> >>   )
> >> >> >>
> >> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
> >> >> >> "kafka", value: 5}.  If I update one of the blog articles and
> send a
> >> >> >> new message to the articles topic:
> >> >> >>
> >> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
> >> >> >>
> >> >> >> The aggregator will first produce {key: "kafka", value: 2} when
> the
> >> >> >> subtractor is called, then will produce {key: "kafka", value: 6}
> when
> >> >> >> the adder is called.  The subtractor's calculation does not
> actually
> >> >> >> match the reality; K1 was never deleted, it was just updated.
> >> >> >>
> >> >> >> Mathieu
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > -- Guozhang
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to