please remove me from this group On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak <[email protected]> wrote:
> Hi Adam, > > give me some time, will make such a chart. last time i didn't get along > well with giphy and ruined all your charts. > Hopefully i can get it done today > > On 08.09.2018 16:00, Adam Bellemare wrote: > > Hi Jan > > > > I have included a diagram of what I attempted on the KIP. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-GroupBy+Reduce/Aggregate > > > > I attempted this back at the start of my own implementation of this > > solution, and since I could not get it to work I have since discarded the > > code. At this point in time, if you wish to continue pursuing for your > > groupBy solution, I ask that you please create a diagram on the KIP > > carefully explaining your solution. Please feel free to use the image I > > just posted as a starting point. I am having trouble understanding your > > explanations but I think that a carefully constructed diagram will clear > up > > any misunderstandings. Alternately, please post a comprehensive PR with > > your solution. I can only guess at what you mean, and since I value my > own > > time as much as you value yours, I believe it is your responsibility to > > provide an implementation instead of me trying to guess. > > > > Adam > > > > > > > > > > > > > > > > > > > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak <[email protected]> > > wrote: > > > >> Hi James, > >> > >> nice to see you beeing interested. kafka streams at this point supports > >> all sorts of joins as long as both streams have the same key. > >> Adam is currently implementing a join where a KTable and a KTable can > have > >> a one to many relation ship (1:n). We exploit that rocksdb is a > >> datastore that keeps data sorted (At least exposes an API to access the > >> stored data in a sorted fashion). > >> > >> I think the technical caveats are well understood now and we are > basically > >> down to philosophy and API Design ( when Adam sees my newest message). > >> I have a lengthy track record of loosing those kinda arguments within > the > >> streams community and I have no clue why. So I literally can't wait for > you > >> to churn through this thread and give you opinion on how we should > design > >> the return type of the oneToManyJoin and how many power we want to give > to > >> the user vs "simplicity" (where simplicity isn't really that as users > still > >> need to understand it I argue) > >> > >> waiting for you to join in on the discussion > >> > >> Best Jan > >> > >> > >> > >> On 07.09.2018 15:49, James Kwan wrote: > >> > >>> I am new to this group and I found this subject interesting. Sounds > like > >>> you guys want to implement a join table of two streams? Is there > somewhere > >>> I can see the original requirement or proposal? > >>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <[email protected]> > >>>> wrote: > >>>> > >>>> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote: > >>>> > >>>>> I'm currently testing using a Windowed Store to store the highwater > >>>>> mark. > >>>>> By all indications this should work fine, with the caveat being that > it > >>>>> can > >>>>> only resolve out-of-order arrival for up to the size of the window > (ie: > >>>>> 24h, 72h, etc). This would remove the possibility of it being > unbounded > >>>>> in > >>>>> size. > >>>>> > >>>>> With regards to Jan's suggestion, I believe this is where we will > have > >>>>> to > >>>>> remain in disagreement. While I do not disagree with your statement > >>>>> about > >>>>> there likely to be additional joins done in a real-world workflow, I > do > >>>>> not > >>>>> see how you can conclusively deal with out-of-order arrival of > >>>>> foreign-key > >>>>> changes and subsequent joins. I have attempted what I think you have > >>>>> proposed (without a high-water, using groupBy and reduce) and found > >>>>> that if > >>>>> the foreign key changes too quickly, or the load on a stream thread > is > >>>>> too > >>>>> high, the joined messages will arrive out-of-order and be incorrectly > >>>>> propagated, such that an intermediate event is represented as the > final > >>>>> event. > >>>>> > >>>> Can you shed some light on your groupBy implementation. There must be > >>>> some sort of flaw in it. > >>>> I have a suspicion where it is, I would just like to confirm. The idea > >>>> is bullet proof and it must be > >>>> an implementation mess up. I would like to clarify before we draw a > >>>> conclusion. > >>>> > >>>> Repartitioning the scattered events back to their original > >>>>> partitions is the only way I know how to conclusively deal with > >>>>> out-of-order events in a given time frame, and to ensure that the > data > >>>>> is > >>>>> eventually consistent with the input events. > >>>>> > >>>>> If you have some code to share that illustrates your approach, I > would > >>>>> be > >>>>> very grateful as it would remove any misunderstandings that I may > have. > >>>>> > >>>> ah okay you were looking for my code. I don't have something easily > >>>> readable here as its bloated with OO-patterns. > >>>> > >>>> its anyhow trivial: > >>>> > >>>> @Override > >>>> public T apply(K aggKey, V value, T aggregate) > >>>> { > >>>> Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary > >>>> U toModifyKey = mapper.apply(value); > >>>> << this is the place where people actually gonna have > issues > >>>> and why you probably couldn't do it. we would need to find a solution > here. > >>>> I didn't realize that yet. > >>>> << we propagate the field in the joiner, so that we can > pick > >>>> it up in an aggregate. Probably you have not thought of this in your > >>>> approach right? > >>>> << I am very open to find a generic solution here. In my > >>>> honest opinion this is broken in KTableImpl.GroupBy that it looses > the keys > >>>> and only maintains the aggregate key. > >>>> << I abstracted it away back then way before i was > thinking > >>>> of oneToMany join. That is why I didn't realize its significance here. > >>>> << Opinions? > >>>> > >>>> for (V m : current) > >>>> { > >>>> currentStateAsMap.put(mapper.apply(m), m); > >>>> } > >>>> if (isAdder) > >>>> { > >>>> currentStateAsMap.put(toModifyKey, value); > >>>> } > >>>> else > >>>> { > >>>> currentStateAsMap.remove(toModifyKey); > >>>> if(currentStateAsMap.isEmpty()){ > >>>> return null; > >>>> } > >>>> } > >>>> retrun asAggregateType(currentStateAsMap) > >>>> } > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Thanks, > >>>>> Adam > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak < > [email protected]> > >>>>> wrote: > >>>>> > >>>>> Thanks Adam for bringing Matthias to speed! > >>>>>> about the differences. I think re-keying back should be optional at > >>>>>> best. > >>>>>> I would say we return a KScatteredTable with reshuffle() returning > >>>>>> KTable<originalKey,Joined> to make the backwards repartitioning > >>>>>> optional. > >>>>>> I am also in a big favour of doing the out of order processing using > >>>>>> group > >>>>>> by instead high water mark tracking. > >>>>>> Just because unbounded growth is just scary + It saves us the header > >>>>>> stuff. > >>>>>> > >>>>>> I think the abstraction of always repartitioning back is just not so > >>>>>> strong. Like the work has been done before we partition back and > >>>>>> grouping > >>>>>> by something else afterwards is really common. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote: > >>>>>> > >>>>>> Hi Matthias > >>>>>>> Thank you for your feedback, I do appreciate it! > >>>>>>> > >>>>>>> While name spacing would be possible, it would require to > deserialize > >>>>>>> > >>>>>>>> user headers what implies a runtime overhead. I would suggest to > no > >>>>>>>> namespace for now to avoid the overhead. If this becomes a > problem in > >>>>>>>> the future, we can still add name spacing later on. > >>>>>>>> > >>>>>>>> Agreed. I will go with using a reserved string and document it. > >>>>>>> > >>>>>>> > >>>>>>> My main concern about the design it the type of the result KTable: > If > >>>>>>> I > >>>>>>> understood the proposal correctly, > >>>>>>> > >>>>>>> > >>>>>>> In your example, you have table1 and table2 swapped. Here is how it > >>>>>>> works > >>>>>>> currently: > >>>>>>> > >>>>>>> 1) table1 has the records that contain the foreign key within their > >>>>>>> value. > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, > >>>>>>> <c,(fk=B,bar=3)> > >>>>>>> table2 input stream: <A,X>, <B,Y> > >>>>>>> > >>>>>>> 2) A Value mapper is required to extract the foreign key. > >>>>>>> table1 foreign key mapper: ( value => value.fk ) > >>>>>>> > >>>>>>> The mapper is applied to each element in table1, and a new combined > >>>>>>> key is > >>>>>>> made: > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, > >>>>>>> (fk=B,bar=3)> > >>>>>>> > >>>>>>> 3) The rekeyed events are copartitioned with table2: > >>>>>>> > >>>>>>> a) Stream Thread with Partition 0: > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)> > >>>>>>> Table2: <A,X> > >>>>>>> > >>>>>>> b) Stream Thread with Partition 1: > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> > >>>>>>> Table2: <B,Y> > >>>>>>> > >>>>>>> 4) From here, they can be joined together locally by applying the > >>>>>>> joiner > >>>>>>> function. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> At this point, Jan's design and my design deviate. My design goes > on > >>>>>>> to > >>>>>>> repartition the data post-join and resolve out-of-order arrival of > >>>>>>> records, > >>>>>>> finally returning the data keyed just the original key. I do not > >>>>>>> expose > >>>>>>> the > >>>>>>> CombinedKey or any of the internals outside of the joinOnForeignKey > >>>>>>> function. This does make for larger footprint, but it removes all > >>>>>>> agency > >>>>>>> for resolving out-of-order arrivals and handling CombinedKeys from > the > >>>>>>> user. I believe that this makes the function much easier to use. > >>>>>>> > >>>>>>> Let me know if this helps resolve your questions, and please feel > >>>>>>> free to > >>>>>>> add anything else on your mind. > >>>>>>> > >>>>>>> Thanks again, > >>>>>>> Adam > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax < > >>>>>>> [email protected]> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>>> I am just catching up on this thread. I did not read everything so > >>>>>>>> far, > >>>>>>>> but want to share couple of initial thoughts: > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Headers: I think there is a fundamental difference between header > >>>>>>>> usage > >>>>>>>> in this KIP and KP-258. For 258, we add headers to changelog topic > >>>>>>>> that > >>>>>>>> are owned by Kafka Streams and nobody else is supposed to write > into > >>>>>>>> them. In fact, no user header are written into the changelog topic > >>>>>>>> and > >>>>>>>> thus, there are not conflicts. > >>>>>>>> > >>>>>>>> Nevertheless, I don't see a big issue with using headers within > >>>>>>>> Streams. > >>>>>>>> As long as we document it, we can have some "reserved" header keys > >>>>>>>> and > >>>>>>>> users are not allowed to use when processing data with Kafka > Streams. > >>>>>>>> IMHO, this should be ok. > >>>>>>>> > >>>>>>>> I think there is a safe way to avoid conflicts, since these > headers > >>>>>>>> are > >>>>>>>> > >>>>>>>>> only needed in internal topics (I think): > >>>>>>>>> For internal and changelog topics, we can namespace all headers: > >>>>>>>>> * user-defined headers are namespaced as "external." + headerKey > >>>>>>>>> * internal headers are namespaced as "internal." + headerKey > >>>>>>>>> > >>>>>>>>> While name spacing would be possible, it would require to > >>>>>>>> deserialize > >>>>>>>> user headers what implies a runtime overhead. I would suggest to > no > >>>>>>>> namespace for now to avoid the overhead. If this becomes a > problem in > >>>>>>>> the future, we can still add name spacing later on. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> My main concern about the design it the type of the result KTable: > >>>>>>>> If I > >>>>>>>> understood the proposal correctly, > >>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... > >>>>>>>> KTable<K2,V2> table2 = ... > >>>>>>>> > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); > >>>>>>>> > >>>>>>>> implies that the `joinedTable` has the same key as the left input > >>>>>>>> table. > >>>>>>>> IMHO, this does not work because if table2 contains multiple rows > >>>>>>>> that > >>>>>>>> join with a record in table1 (what is the main purpose of a > foreign > >>>>>>>> key > >>>>>>>> join), the result table would only contain a single join result, > but > >>>>>>>> not > >>>>>>>> multiple. > >>>>>>>> > >>>>>>>> Example: > >>>>>>>> > >>>>>>>> table1 input stream: <A,X> > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> > >>>>>>>> > >>>>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). > If > >>>>>>>> the > >>>>>>>> result key is the same key as key of table1, this implies that the > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not > both. > >>>>>>>> Because the share the same key, whatever result record we emit > later, > >>>>>>>> overwrite the previous result. > >>>>>>>> > >>>>>>>> This is the reason why Jan originally proposed to use a > combination > >>>>>>>> of > >>>>>>>> both primary keys of the input tables as key of the output table. > >>>>>>>> This > >>>>>>>> makes the keys of the output table unique and we can store both in > >>>>>>>> the > >>>>>>>> output table: > >>>>>>>> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> > >>>>>>>> > >>>>>>>> > >>>>>>>> Thoughts? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: > >>>>>>>> > >>>>>>>> Just on remark here. > >>>>>>>>> The high-watermark could be disregarded. The decision about the > >>>>>>>>> forward > >>>>>>>>> depends on the size of the aggregated map. > >>>>>>>>> Only 1 element long maps would be unpacked and forwarded. 0 > element > >>>>>>>>> maps > >>>>>>>>> would be published as delete. Any other count > >>>>>>>>> of map entries is in "waiting for correct deletes to > arrive"-state. > >>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: > >>>>>>>>> > >>>>>>>>> It does look like I could replace the second repartition store > and > >>>>>>>>>> highwater store with a groupBy and reduce. However, it looks > like > >>>>>>>>>> I > >>>>>>>>>> would > >>>>>>>>>> still need to store the highwater value within the materialized > >>>>>>>>>> store, > >>>>>>>>>> > >>>>>>>>>> to > >>>>>>>>> compare the arrival of out-of-order records (assuming my > >>>>>>>>> understanding > >>>>>>>>> of > >>>>>>>>> THIS is correct...). This in effect is the same as the design I > have > >>>>>>>>> now, > >>>>>>>>> just with the two tables merged together. > >>>>>>>>> > >
