Hi Jan I have included a diagram of what I attempted on the KIP. https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-GroupBy+Reduce/Aggregate
I attempted this back at the start of my own implementation of this solution, and since I could not get it to work I have since discarded the code. At this point in time, if you wish to continue pursuing for your groupBy solution, I ask that you please create a diagram on the KIP carefully explaining your solution. Please feel free to use the image I just posted as a starting point. I am having trouble understanding your explanations but I think that a carefully constructed diagram will clear up any misunderstandings. Alternately, please post a comprehensive PR with your solution. I can only guess at what you mean, and since I value my own time as much as you value yours, I believe it is your responsibility to provide an implementation instead of me trying to guess. Adam On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi James, > > nice to see you beeing interested. kafka streams at this point supports > all sorts of joins as long as both streams have the same key. > Adam is currently implementing a join where a KTable and a KTable can have > a one to many relation ship (1:n). We exploit that rocksdb is a > datastore that keeps data sorted (At least exposes an API to access the > stored data in a sorted fashion). > > I think the technical caveats are well understood now and we are basically > down to philosophy and API Design ( when Adam sees my newest message). > I have a lengthy track record of loosing those kinda arguments within the > streams community and I have no clue why. So I literally can't wait for you > to churn through this thread and give you opinion on how we should design > the return type of the oneToManyJoin and how many power we want to give to > the user vs "simplicity" (where simplicity isn't really that as users still > need to understand it I argue) > > waiting for you to join in on the discussion > > Best Jan > > > > On 07.09.2018 15:49, James Kwan wrote: > >> I am new to this group and I found this subject interesting. Sounds like >> you guys want to implement a join table of two streams? Is there somewhere >> I can see the original requirement or proposal? >> >> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>> >>> On 05.09.2018 22:17, Adam Bellemare wrote: >>> >>>> I'm currently testing using a Windowed Store to store the highwater >>>> mark. >>>> By all indications this should work fine, with the caveat being that it >>>> can >>>> only resolve out-of-order arrival for up to the size of the window (ie: >>>> 24h, 72h, etc). This would remove the possibility of it being unbounded >>>> in >>>> size. >>>> >>>> With regards to Jan's suggestion, I believe this is where we will have >>>> to >>>> remain in disagreement. While I do not disagree with your statement >>>> about >>>> there likely to be additional joins done in a real-world workflow, I do >>>> not >>>> see how you can conclusively deal with out-of-order arrival of >>>> foreign-key >>>> changes and subsequent joins. I have attempted what I think you have >>>> proposed (without a high-water, using groupBy and reduce) and found >>>> that if >>>> the foreign key changes too quickly, or the load on a stream thread is >>>> too >>>> high, the joined messages will arrive out-of-order and be incorrectly >>>> propagated, such that an intermediate event is represented as the final >>>> event. >>>> >>> Can you shed some light on your groupBy implementation. There must be >>> some sort of flaw in it. >>> I have a suspicion where it is, I would just like to confirm. The idea >>> is bullet proof and it must be >>> an implementation mess up. I would like to clarify before we draw a >>> conclusion. >>> >>> Repartitioning the scattered events back to their original >>>> partitions is the only way I know how to conclusively deal with >>>> out-of-order events in a given time frame, and to ensure that the data >>>> is >>>> eventually consistent with the input events. >>>> >>>> If you have some code to share that illustrates your approach, I would >>>> be >>>> very grateful as it would remove any misunderstandings that I may have. >>>> >>> ah okay you were looking for my code. I don't have something easily >>> readable here as its bloated with OO-patterns. >>> >>> its anyhow trivial: >>> >>> @Override >>> public T apply(K aggKey, V value, T aggregate) >>> { >>> Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary >>> U toModifyKey = mapper.apply(value); >>> << this is the place where people actually gonna have issues >>> and why you probably couldn't do it. we would need to find a solution here. >>> I didn't realize that yet. >>> << we propagate the field in the joiner, so that we can pick >>> it up in an aggregate. Probably you have not thought of this in your >>> approach right? >>> << I am very open to find a generic solution here. In my >>> honest opinion this is broken in KTableImpl.GroupBy that it looses the keys >>> and only maintains the aggregate key. >>> << I abstracted it away back then way before i was thinking >>> of oneToMany join. That is why I didn't realize its significance here. >>> << Opinions? >>> >>> for (V m : current) >>> { >>> currentStateAsMap.put(mapper.apply(m), m); >>> } >>> if (isAdder) >>> { >>> currentStateAsMap.put(toModifyKey, value); >>> } >>> else >>> { >>> currentStateAsMap.remove(toModifyKey); >>> if(currentStateAsMap.isEmpty()){ >>> return null; >>> } >>> } >>> retrun asAggregateType(currentStateAsMap) >>> } >>> >>> >>> >>> >>> >>> Thanks, >>>> >>>> Adam >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com> >>>> wrote: >>>> >>>> Thanks Adam for bringing Matthias to speed! >>>>> >>>>> about the differences. I think re-keying back should be optional at >>>>> best. >>>>> I would say we return a KScatteredTable with reshuffle() returning >>>>> KTable<originalKey,Joined> to make the backwards repartitioning >>>>> optional. >>>>> I am also in a big favour of doing the out of order processing using >>>>> group >>>>> by instead high water mark tracking. >>>>> Just because unbounded growth is just scary + It saves us the header >>>>> stuff. >>>>> >>>>> I think the abstraction of always repartitioning back is just not so >>>>> strong. Like the work has been done before we partition back and >>>>> grouping >>>>> by something else afterwards is really common. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 05.09.2018 13:49, Adam Bellemare wrote: >>>>> >>>>> Hi Matthias >>>>>> >>>>>> Thank you for your feedback, I do appreciate it! >>>>>> >>>>>> While name spacing would be possible, it would require to deserialize >>>>>> >>>>>>> user headers what implies a runtime overhead. I would suggest to no >>>>>>> namespace for now to avoid the overhead. If this becomes a problem in >>>>>>> the future, we can still add name spacing later on. >>>>>>> >>>>>>> Agreed. I will go with using a reserved string and document it. >>>>>> >>>>>> >>>>>> >>>>>> My main concern about the design it the type of the result KTable: If >>>>>> I >>>>>> understood the proposal correctly, >>>>>> >>>>>> >>>>>> In your example, you have table1 and table2 swapped. Here is how it >>>>>> works >>>>>> currently: >>>>>> >>>>>> 1) table1 has the records that contain the foreign key within their >>>>>> value. >>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, >>>>>> <c,(fk=B,bar=3)> >>>>>> table2 input stream: <A,X>, <B,Y> >>>>>> >>>>>> 2) A Value mapper is required to extract the foreign key. >>>>>> table1 foreign key mapper: ( value => value.fk ) >>>>>> >>>>>> The mapper is applied to each element in table1, and a new combined >>>>>> key is >>>>>> made: >>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, >>>>>> (fk=B,bar=3)> >>>>>> >>>>>> 3) The rekeyed events are copartitioned with table2: >>>>>> >>>>>> a) Stream Thread with Partition 0: >>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)> >>>>>> Table2: <A,X> >>>>>> >>>>>> b) Stream Thread with Partition 1: >>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> >>>>>> Table2: <B,Y> >>>>>> >>>>>> 4) From here, they can be joined together locally by applying the >>>>>> joiner >>>>>> function. >>>>>> >>>>>> >>>>>> >>>>>> At this point, Jan's design and my design deviate. My design goes on >>>>>> to >>>>>> repartition the data post-join and resolve out-of-order arrival of >>>>>> records, >>>>>> finally returning the data keyed just the original key. I do not >>>>>> expose >>>>>> the >>>>>> CombinedKey or any of the internals outside of the joinOnForeignKey >>>>>> function. This does make for larger footprint, but it removes all >>>>>> agency >>>>>> for resolving out-of-order arrivals and handling CombinedKeys from the >>>>>> user. I believe that this makes the function much easier to use. >>>>>> >>>>>> Let me know if this helps resolve your questions, and please feel >>>>>> free to >>>>>> add anything else on your mind. >>>>>> >>>>>> Thanks again, >>>>>> Adam >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>>> I am just catching up on this thread. I did not read everything so >>>>>>> far, >>>>>>> but want to share couple of initial thoughts: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Headers: I think there is a fundamental difference between header >>>>>>> usage >>>>>>> in this KIP and KP-258. For 258, we add headers to changelog topic >>>>>>> that >>>>>>> are owned by Kafka Streams and nobody else is supposed to write into >>>>>>> them. In fact, no user header are written into the changelog topic >>>>>>> and >>>>>>> thus, there are not conflicts. >>>>>>> >>>>>>> Nevertheless, I don't see a big issue with using headers within >>>>>>> Streams. >>>>>>> As long as we document it, we can have some "reserved" header keys >>>>>>> and >>>>>>> users are not allowed to use when processing data with Kafka Streams. >>>>>>> IMHO, this should be ok. >>>>>>> >>>>>>> I think there is a safe way to avoid conflicts, since these headers >>>>>>> are >>>>>>> >>>>>>>> only needed in internal topics (I think): >>>>>>>> For internal and changelog topics, we can namespace all headers: >>>>>>>> * user-defined headers are namespaced as "external." + headerKey >>>>>>>> * internal headers are namespaced as "internal." + headerKey >>>>>>>> >>>>>>>> While name spacing would be possible, it would require to >>>>>>> deserialize >>>>>>> user headers what implies a runtime overhead. I would suggest to no >>>>>>> namespace for now to avoid the overhead. If this becomes a problem in >>>>>>> the future, we can still add name spacing later on. >>>>>>> >>>>>>> >>>>>>> >>>>>>> My main concern about the design it the type of the result KTable: >>>>>>> If I >>>>>>> understood the proposal correctly, >>>>>>> >>>>>>> KTable<K1,V1> table1 = ... >>>>>>> KTable<K2,V2> table2 = ... >>>>>>> >>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); >>>>>>> >>>>>>> implies that the `joinedTable` has the same key as the left input >>>>>>> table. >>>>>>> IMHO, this does not work because if table2 contains multiple rows >>>>>>> that >>>>>>> join with a record in table1 (what is the main purpose of a foreign >>>>>>> key >>>>>>> join), the result table would only contain a single join result, but >>>>>>> not >>>>>>> multiple. >>>>>>> >>>>>>> Example: >>>>>>> >>>>>>> table1 input stream: <A,X> >>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> >>>>>>> >>>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If >>>>>>> the >>>>>>> result key is the same key as key of table1, this implies that the >>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both. >>>>>>> Because the share the same key, whatever result record we emit later, >>>>>>> overwrite the previous result. >>>>>>> >>>>>>> This is the reason why Jan originally proposed to use a combination >>>>>>> of >>>>>>> both primary keys of the input tables as key of the output table. >>>>>>> This >>>>>>> makes the keys of the output table unique and we can store both in >>>>>>> the >>>>>>> output table: >>>>>>> >>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> >>>>>>> >>>>>>> >>>>>>> Thoughts? >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >>>>>>> >>>>>>> Just on remark here. >>>>>>>> The high-watermark could be disregarded. The decision about the >>>>>>>> forward >>>>>>>> depends on the size of the aggregated map. >>>>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element >>>>>>>> maps >>>>>>>> would be published as delete. Any other count >>>>>>>> of map entries is in "waiting for correct deletes to arrive"-state. >>>>>>>> >>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: >>>>>>>> >>>>>>>> It does look like I could replace the second repartition store and >>>>>>>>> highwater store with a groupBy and reduce. However, it looks like >>>>>>>>> I >>>>>>>>> would >>>>>>>>> still need to store the highwater value within the materialized >>>>>>>>> store, >>>>>>>>> >>>>>>>>> to >>>>>>>> compare the arrival of out-of-order records (assuming my >>>>>>>> understanding >>>>>>>> of >>>>>>>> THIS is correct...). This in effect is the same as the design I have >>>>>>>> now, >>>>>>>> just with the two tables merged together. >>>>>>>> >>>>>>> >