The comment from Kasier Chen on the patch is correct: 1. Objects.equals() depends on the typed class object.equal() function, which may not be safe: for example users can override the equals function to only compare a subset of all the fields, whereas for Kafka Streams we need these two objects to be exactly the same.
2. ChangedSer / Deser is used to serialize the Changed<> type that has only one of the old / new value not null. You need to extend this encoder / decoder in a backward compatible way. Guozhang On Wed, Aug 24, 2016 at 7:48 PM, Mathieu Fenniak < mathieu.fenn...@replicon.com> wrote: > Hi Guozhang, > > I've been working around this issue by dropping down to the Processor > API, but, I was hoping you might be able to point out if there is a > flaw is in this proposed change: > > https://github.com/apache/kafka/compare/trunk... > mfenniak:suppress-duplicate-repartition-output > > This adjusts KTableRepartitionMap so that if there's no change in the > group-by key, the repartition processor just forwards the changed > value onwards. (This breaks a couple of tests that anticipate the > exact existing output, so don't consider this a complete patch...) > > Mathieu > > > On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > Hi Mathieu, > > > > If you are only interested in the aggregate result "snapshot" but not its > > change stream (note that KTable itself is not actually a "table" as in > > RDBMS, but still a stream), you can try to use the queryable state > feature > > that is available in trunk, which will be available in 0.10.1.0 release. > > > > In sum, it allows you to query any states "snapshot" which is used in > > aggregation operators in real time with state store provided APIs such as > > get-by-key, range queries on windows, etc. Details can be found in thie > KIP > > (we are working on more docs / blog posts at the time): > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 67%3A+Queryable+state+for+Kafka+Streams > > > > Guozhang > > > > > > On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > >> Hi Guozhang, > >> > >> Hm... I hadn't thought of the repartitioning involvement. > >> > >> I'm not confident I'm understanding completely, but I believe you're > >> saying the decision to process data in this way is made before the > >> data being processed is available, because the partition *may* change, > >> because the groupBy key *may* change. > >> > >> I'm still feeling that I'm stuck getting corrupted output in the > >> middle of an aggregation. > >> > >> It's especially problematic for me if the updates to the source KTable > >> don't actually affect the results of the aggregation. In the > >> word-count example in my original e-mail, this might be similar to > >> editing an unrelated field "author" in any article; doesn't actually > >> affect the groupBy, doesn't affect the aggregation, but still results > >> in the wrong output occurring temporarily. (and inefficient > >> processing) > >> > >> Are there any tools in Kafka Streams that might help me prevent > >> downstream calculations if the relevant inputs haven't changed? I was > >> thinking I'd be able to use mapValues to pluck only relevant fields > >> out of a KTable, materialize a new KTable (.through) from that, and > >> then there'd be some state from which KS would be able to only invoke > >> downstream nodes if data has changed... but it doesn't seem to work > >> like that. > >> > >> Thanks so much for your responses Guozhang, I really appreciate your > >> time to help me out. > >> > >> Mathieu > >> > >> > >> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > The problem is that Kafka Streams need to repartition the streams > based > >> on > >> > the groupBy keys when doing aggregations. For your case, the original > >> > stream may be read from a topic that is partitioned on "K", and you > need > >> to > >> > first repartition on "category" on an intermediate topic before the > >> > aggregation can be executed. > >> > > >> > Hence the old and new value may be sent to two different partitions of > >> the > >> > intermediate topic, and hence be processed by two different process > (it > >> > won't be the case in your application, since you mentioned the > "category" > >> > will never change). Since the library cannot tell if the groupBy key > will > >> > never change, it has to be conservative and do this subtract / add > >> process > >> > while receiving the old / new value. > >> > > >> > > >> > Guozhang > >> > > >> > > >> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak < > >> > mathieu.fenn...@replicon.com> wrote: > >> > > >> >> Hi Guozhang, > >> >> > >> >> Thanks for responding. Ah, I see what you're saying... in the case > of > >> >> an update to the KTable, the aggregator's subtractor result would be > >> >> necessary if the group-by key changes in the update. > >> >> > >> >> It makes sense, but unfortunately the behavior leaves me feeling a > >> >> little sketchy... when the group-by key doesn't change (which is > >> >> guaranteed in my case), I'm outputting results that don't correspond > >> >> at all to the inputs, temporarily. It's immediately followed by a > >> >> corrected result. > >> >> > >> >> Would it be a feasible optimization to not send the subtractor's > >> >> result out of the aggregate, only in the case where the groupBy key > >> >> does not change between the old record and the new record? > >> >> > >> >> Mathieu > >> >> > >> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> >> > Hello Mathieu, > >> >> > > >> >> > Note that semantics of KTable aggregations (i.e. > >> >> "KTable.groupBy.aggregate" > >> >> > as in 0.10.0) and KStream aggregations (i.e. > "KStream.aggregateByKey" > >> as > >> >> in > >> >> > 0.10.0) are different, in the sense that when the table is updated > >> (i.e. > >> >> a > >> >> > new record with the same key "K1" is received), the old record's > >> effect > >> >> on > >> >> > the aggregation need to first be subtracted before the new record's > >> >> effect > >> >> > on the aggregation can be added; whereas in the latter case there > is > >> no > >> >> > "old values" that are not overridden, hence only "adder" > aggregator is > >> >> > needed. > >> >> > > >> >> > So suppose your updated record on K1 is on a different "category", > >> say: > >> >> > > >> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"} > >> >> > > >> >> > > >> >> > Then the aggregated result should be: > >> >> > > >> >> > {key: "kafka", value: 2} > >> >> > {key: "kafka2", value: 4} > >> >> > > >> >> > > >> >> > Does this make sense now? > >> >> > > >> >> > Guozhang > >> >> > > >> >> > > >> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak < > >> >> > mathieu.fenn...@replicon.com> wrote: > >> >> > > >> >> >> Hello again, kafka-users, > >> >> >> > >> >> >> When I aggregate a KTable, a future input that updates a KTable's > >> >> >> value for a specific key causes the aggregate's subtractor to be > >> >> >> invoked, and then its adder. This part is great, completely > >> >> >> as-expected. > >> >> >> > >> >> >> But what I didn't expect is that the intermediate result of the > >> >> >> subtractor would be sent downstream. This value doesn't reflect > the > >> >> >> reality of the inputs to the aggregator, so sending it downstream > is > >> >> >> effectively sending "corrupt" data to the next processing node. > Is > >> >> >> this the expected behavior, or is this a bug? > >> >> >> > >> >> >> Take for example, a table of blog articles and an aggregator that > >> >> >> counts the number of words in each category of the blog: > >> >> >> > >> >> >> topic: articles > >> >> >> K1, {"category": "kafka", "text": "word1, word2, word3"} > >> >> >> K2, {"category": "kafka", "text": "word1, word2"} > >> >> >> > >> >> >> articles.groupBy((k,v) -> v.category) > >> >> >> .aggregate(() -> 0, > >> >> >> (k,v,t) -> t + v.text.split(" ").length, > >> >> >> (k,v,t) -> t - v.text.split(" ").length > >> >> >> ) > >> >> >> > >> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key: > >> >> >> "kafka", value: 5}. If I update one of the blog articles and > send a > >> >> >> new message to the articles topic: > >> >> >> > >> >> >> K1, {"category": "kafka", "text": "word1, word2, word3, word4"} > >> >> >> > >> >> >> The aggregator will first produce {key: "kafka", value: 2} when > the > >> >> >> subtractor is called, then will produce {key: "kafka", value: 6} > when > >> >> >> the adder is called. The subtractor's calculation does not > actually > >> >> >> match the reality; K1 was never deleted, it was just updated. > >> >> >> > >> >> >> Mathieu > >> >> >> > >> >> > > >> >> > > >> >> > > >> >> > -- > >> >> > -- Guozhang > >> >> > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang