Hi Jan Check for " highwaterMat " in the PR. I only changed the state store, not the ProcessorSupplier.
Thanks, Adam On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > > > On 24.09.2018 16:26, Adam Bellemare wrote: > >> @Guozhang >> >> Thanks for the information. This is indeed something that will be >> extremely >> useful for this KIP. >> >> @Jan >> Thanks for your explanations. That being said, I will not be moving ahead >> with an implementation using reshuffle/groupBy solution as you propose. >> That being said, if you wish to implement it yourself off of my current PR >> and submit it as a competitive alternative, I would be more than happy to >> help vet that as an alternate solution. As it stands right now, I do not >> really have more time to invest into alternatives without there being a >> strong indication from the binding voters which they would prefer. >> >> > Hey, total no worries. I think I personally gave up on the streams DSL for > some time already, otherwise I would have pulled this KIP through already. > I am currently reimplementing my own DSL based on PAPI. > > >> I will look at finishing up my PR with the windowed state store in the >> next >> week or so, exercising it via tests, and then I will come back for final >> discussions. In the meantime, I hope that any of the binding voters could >> take a look at the KIP in the wiki. I have updated it according to the >> latest plan: >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >> Support+non-key+joining+in+KTable >> >> I have also updated the KIP PR to use a windowed store. This could be >> replaced by the results of KIP-258 whenever they are completed. >> https://github.com/apache/kafka/pull/5527 >> >> Thanks, >> >> Adam >> > > Is the HighWatermarkResolverProccessorsupplier already updated in the PR? > expected it to change to Windowed<K>,Long Missing something? > > > >> >> >> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> Correction on my previous email: KAFKA-5533 is the wrong link, as it is >>> for >>> corresponding changelog mechanisms. But as part of KIP-258 we do want to >>> have "handling out-of-order data for source KTable" such that instead of >>> blindly apply the updates to the materialized store, i.e. following >>> offset >>> ordering, we will reject updates that are older than the current key's >>> timestamps, i.e. following timestamp ordering. >>> >>> >>> Guozhang >>> >>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>> Hello Adam, >>>> >>>> Thanks for the explanation. Regarding the final step (i.e. the high >>>> watermark store, now altered to be replaced with a window store), I >>>> think >>>> another current on-going KIP may actually help: >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>> >>>> >>>> This is for adding the timestamp into a key-value store (i.e. only for >>>> non-windowed KTable), and then one of its usage, as described in >>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that we can then >>>> "reject" updates from the source topics if its timestamp is smaller than >>>> the current key's latest update timestamp. I think it is very similar to >>>> what you have in mind for high watermark based filtering, while you only >>>> need to make sure that the timestamps of the joining records are >>>> >>> correctly >>> >>>> inherited though the whole topology to the final stage. >>>> >>>> Note that this KIP is for key-value store and hence non-windowed KTables >>>> only, but for windowed KTables we do not really have a good support for >>>> their joins anyways (https://issues.apache.org/jira/browse/KAFKA-7107) >>>> I >>>> think we can just consider non-windowed KTable-KTable non-key joins for >>>> now. In which case, KIP-258 should help. >>>> >>>> >>>> >>>> Guozhang >>>> >>>> >>>> >>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <jan.filip...@trivago.com >>>> > >>>> wrote: >>>> >>>> >>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >>>>> >>>>> Hi Guozhang >>>>>> >>>>>> Current highwater mark implementation would grow endlessly based on >>>>>> primary key of original event. It is a pair of (<this table primary >>>>>> >>>>> key>, >>> >>>> <highest offset seen for that key>). This is used to differentiate >>>>>> >>>>> between >>> >>>> late arrivals and new updates. My newest proposal would be to replace >>>>>> >>>>> it >>> >>>> with a Windowed state store of Duration N. This would allow the same >>>>>> behaviour, but cap the size based on time. This should allow for all >>>>>> late-arriving events to be processed, and should be customizable by >>>>>> the >>>>>> user to tailor to their own needs (ie: perhaps just 10 minutes of >>>>>> >>>>> window, >>> >>>> or perhaps 7 days...). >>>>>> >>>>>> Hi Adam, using time based retention can do the trick here. Even if I >>>>> would still like to see the automatic repartitioning optional since I >>>>> >>>> would >>> >>>> just reshuffle again. With windowed store I am a little bit sceptical >>>>> >>>> about >>> >>>> how to determine the window. So esentially one could run into problems >>>>> >>>> when >>> >>>> the rapid change happens near a window border. I will check you >>>>> implementation in detail, if its problematic, we could still check >>>>> _all_ >>>>> windows on read with not to bad performance impact I guess. Will let >>>>> you >>>>> know if the implementation would be correct as is. I wouldn't not like >>>>> >>>> to >>> >>>> assume that: offset(A) < offset(B) => timestamp(A) < timestamp(B). I >>>>> >>>> think >>> >>>> we can't expect that. >>>>> >>>>> >>>>>> >>>>>> @Jan >>>>>> I believe I understand what you mean now - thanks for the diagram, it >>>>>> did really help. You are correct that I do not have the original >>>>>> >>>>> primary >>> >>>> key available, and I can see that if it was available then you would be >>>>>> able to add and remove events from the Map. That being said, I >>>>>> >>>>> encourage >>> >>>> you to finish your diagrams / charts just for clarity for everyone >>>>>> >>>>> else. >>> >>>> >>>>>> Yeah 100%, this giphy thing is just really hard work. But I understand >>>>>> >>>>> the benefits for the rest. Sorry about the original primary key, We >>>>> have >>>>> join and Group by implemented our own in PAPI and basically not using >>>>> >>>> any >>> >>>> DSL (Just the abstraction). Completely missed that in original DSL its >>>>> >>>> not >>> >>>> there and just assumed it. total brain mess up on my end. Will finish >>>>> >>>> the >>> >>>> chart as soon as i get a quite evening this week. >>>>> >>>>> My follow up question for you is, won't the Map stay inside the State >>>>> >>>>>> Store indefinitely after all of the changes have propagated? Isn't >>>>>> this >>>>>> effectively the same as a highwater mark state store? >>>>>> >>>>>> Thing is that if the map is empty, substractor is gonna return `null` >>>>> >>>> and >>> >>>> the key is removed from the keyspace. But there is going to be a store >>>>> 100%, the good thing is that I can use this store directly for >>>>> materialize() / enableSendingOldValues() is a regular store, satisfying >>>>> all gurantees needed for further groupby / join. The Windowed store is >>>>> >>>> not >>> >>>> keeping the values, so for the next statefull operation we would >>>>> need to instantiate an extra store. or we have the window store also >>>>> >>>> have >>> >>>> the values then. >>>>> >>>>> Long story short. if we can flip in a custom group by before >>>>> repartitioning to the original primary key i think it would help the >>>>> >>>> users >>> >>>> big time in building efficient apps. Given the original primary key >>>>> >>>> issue I >>> >>>> understand that we do not have a solid foundation to build on. >>>>> Leaving primary key carry along to the user. very unfortunate. I could >>>>> understand the decision goes like that. I do not think its a good >>>>> >>>> decision. >>> >>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> Thanks >>>>>> Adam >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre < >>>>>> dumbreprajakta...@gmail.com <mailto:dumbreprajakta...@gmail.com>> >>>>>> >>>>> wrote: >>> >>>> >>>>>> please remove me from this group >>>>>> >>>>>> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak >>>>>> <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>> >>>>>> wrote: >>>>>> >>>>>> > Hi Adam, >>>>>> > >>>>>> > give me some time, will make such a chart. last time i didn't >>>>>> get along >>>>>> > well with giphy and ruined all your charts. >>>>>> > Hopefully i can get it done today >>>>>> > >>>>>> > On 08.09.2018 16:00, Adam Bellemare wrote: >>>>>> > > Hi Jan >>>>>> > > >>>>>> > > I have included a diagram of what I attempted on the KIP. >>>>>> > > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >>>>>> inKTable-GroupBy+Reduce/Aggregate >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >>>>>> ginKTable-GroupBy+Reduce/Aggregate> >>>>>> > > >>>>>> > > I attempted this back at the start of my own implementation >>>>>> of >>>>>> this >>>>>> > > solution, and since I could not get it to work I have since >>>>>> discarded the >>>>>> > > code. At this point in time, if you wish to continue pursuing >>>>>> for your >>>>>> > > groupBy solution, I ask that you please create a diagram on >>>>>> the KIP >>>>>> > > carefully explaining your solution. Please feel free to use >>>>>> the image I >>>>>> > > just posted as a starting point. I am having trouble >>>>>> understanding your >>>>>> > > explanations but I think that a carefully constructed diagram >>>>>> will clear >>>>>> > up >>>>>> > > any misunderstandings. Alternately, please post a >>>>>> comprehensive PR with >>>>>> > > your solution. I can only guess at what you mean, and since I >>>>>> value my >>>>>> > own >>>>>> > > time as much as you value yours, I believe it is your >>>>>> responsibility to >>>>>> > > provide an implementation instead of me trying to guess. >>>>>> > > >>>>>> > > Adam >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak >>>>>> <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>> >>>>>> > > wrote: >>>>>> > > >>>>>> > >> Hi James, >>>>>> > >> >>>>>> > >> nice to see you beeing interested. kafka streams at this >>>>>> point supports >>>>>> > >> all sorts of joins as long as both streams have the same >>>>>> key. >>>>>> > >> Adam is currently implementing a join where a KTable and a >>>>>> KTable can >>>>>> > have >>>>>> > >> a one to many relation ship (1:n). We exploit that rocksdb >>>>>> is >>>>>> >>>>> a >>> >>>> > >> datastore that keeps data sorted (At least exposes an API to >>>>>> access the >>>>>> > >> stored data in a sorted fashion). >>>>>> > >> >>>>>> > >> I think the technical caveats are well understood now and we >>>>>> >>>>> are >>> >>>> > basically >>>>>> > >> down to philosophy and API Design ( when Adam sees my newest >>>>>> message). >>>>>> > >> I have a lengthy track record of loosing those kinda >>>>>> arguments within >>>>>> > the >>>>>> > >> streams community and I have no clue why. So I literally >>>>>> can't wait for >>>>>> > you >>>>>> > >> to churn through this thread and give you opinion on how we >>>>>> should >>>>>> > design >>>>>> > >> the return type of the oneToManyJoin and how many power we >>>>>> want to give >>>>>> > to >>>>>> > >> the user vs "simplicity" (where simplicity isn't really that >>>>>> as users >>>>>> > still >>>>>> > >> need to understand it I argue) >>>>>> > >> >>>>>> > >> waiting for you to join in on the discussion >>>>>> > >> >>>>>> > >> Best Jan >>>>>> > >> >>>>>> > >> >>>>>> > >> >>>>>> > >> On 07.09.2018 15:49, James Kwan wrote: >>>>>> > >> >>>>>> > >>> I am new to this group and I found this subject >>>>>> interesting. Sounds >>>>>> > like >>>>>> > >>> you guys want to implement a join table of two streams? Is >>>>>> there >>>>>> > somewhere >>>>>> > >>> I can see the original requirement or proposal? >>>>>> > >>> >>>>>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak >>>>>> <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>> >>>>>> > >>>> wrote: >>>>>> > >>>> >>>>>> > >>>> >>>>>> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote: >>>>>> > >>>> >>>>>> > >>>>> I'm currently testing using a Windowed Store to store the >>>>>> highwater >>>>>> > >>>>> mark. >>>>>> > >>>>> By all indications this should work fine, with the caveat >>>>>> being that >>>>>> > it >>>>>> > >>>>> can >>>>>> > >>>>> only resolve out-of-order arrival for up to the size of >>>>>> the window >>>>>> > (ie: >>>>>> > >>>>> 24h, 72h, etc). This would remove the possibility of it >>>>>> >>>>> being >>> >>>> > unbounded >>>>>> > >>>>> in >>>>>> > >>>>> size. >>>>>> > >>>>> >>>>>> > >>>>> With regards to Jan's suggestion, I believe this is where >>>>>> we will >>>>>> > have >>>>>> > >>>>> to >>>>>> > >>>>> remain in disagreement. While I do not disagree with your >>>>>> statement >>>>>> > >>>>> about >>>>>> > >>>>> there likely to be additional joins done in a real-world >>>>>> workflow, I >>>>>> > do >>>>>> > >>>>> not >>>>>> > >>>>> see how you can conclusively deal with out-of-order >>>>>> arrival >>>>>> of >>>>>> > >>>>> foreign-key >>>>>> > >>>>> changes and subsequent joins. I have attempted what I >>>>>> think you have >>>>>> > >>>>> proposed (without a high-water, using groupBy and reduce) >>>>>> and found >>>>>> > >>>>> that if >>>>>> > >>>>> the foreign key changes too quickly, or the load on a >>>>>> stream thread >>>>>> > is >>>>>> > >>>>> too >>>>>> > >>>>> high, the joined messages will arrive out-of-order and be >>>>>> incorrectly >>>>>> > >>>>> propagated, such that an intermediate event is >>>>>> represented >>>>>> as the >>>>>> > final >>>>>> > >>>>> event. >>>>>> > >>>>> >>>>>> > >>>> Can you shed some light on your groupBy implementation. >>>>>> There must be >>>>>> > >>>> some sort of flaw in it. >>>>>> > >>>> I have a suspicion where it is, I would just like to >>>>>> confirm. The idea >>>>>> > >>>> is bullet proof and it must be >>>>>> > >>>> an implementation mess up. I would like to clarify before >>>>>> we draw a >>>>>> > >>>> conclusion. >>>>>> > >>>> >>>>>> > >>>> Repartitioning the scattered events back to their >>>>>> >>>>> original >>> >>>> > >>>>> partitions is the only way I know how to conclusively deal >>>>>> with >>>>>> > >>>>> out-of-order events in a given time frame, and to ensure >>>>>> that the >>>>>> > data >>>>>> > >>>>> is >>>>>> > >>>>> eventually consistent with the input events. >>>>>> > >>>>> >>>>>> > >>>>> If you have some code to share that illustrates your >>>>>> approach, I >>>>>> > would >>>>>> > >>>>> be >>>>>> > >>>>> very grateful as it would remove any misunderstandings >>>>>> that I may >>>>>> > have. >>>>>> > >>>>> >>>>>> > >>>> ah okay you were looking for my code. I don't have >>>>>> something easily >>>>>> > >>>> readable here as its bloated with OO-patterns. >>>>>> > >>>> >>>>>> > >>>> its anyhow trivial: >>>>>> > >>>> >>>>>> > >>>> @Override >>>>>> > >>>> public T apply(K aggKey, V value, T aggregate) >>>>>> > >>>> { >>>>>> > >>>> Map<U, V> currentStateAsMap = asMap(aggregate); >>>>>> << >>>>>> imaginary >>>>>> > >>>> U toModifyKey = mapper.apply(value); >>>>>> > >>>> << this is the place where people actually >>>>>> gonna have >>>>>> > issues >>>>>> > >>>> and why you probably couldn't do it. we would need to find >>>>>> a solution >>>>>> > here. >>>>>> > >>>> I didn't realize that yet. >>>>>> > >>>> << we propagate the field in the joiner, so >>>>>> that we can >>>>>> > pick >>>>>> > >>>> it up in an aggregate. Probably you have not thought of >>>>>> this in your >>>>>> > >>>> approach right? >>>>>> > >>>> << I am very open to find a generic solution >>>>>> here. In my >>>>>> > >>>> honest opinion this is broken in KTableImpl.GroupBy that >>>>>> it >>>>>> looses >>>>>> > the keys >>>>>> > >>>> and only maintains the aggregate key. >>>>>> > >>>> << I abstracted it away back then way before >>>>>> i >>>>>> was >>>>>> > thinking >>>>>> > >>>> of oneToMany join. That is why I didn't realize its >>>>>> significance here. >>>>>> > >>>> << Opinions? >>>>>> > >>>> >>>>>> > >>>> for (V m : current) >>>>>> > >>>> { >>>>>> > >>>> currentStateAsMap.put(mapper.apply(m), m); >>>>>> > >>>> } >>>>>> > >>>> if (isAdder) >>>>>> > >>>> { >>>>>> > >>>> currentStateAsMap.put(toModifyKey, value); >>>>>> > >>>> } >>>>>> > >>>> else >>>>>> > >>>> { >>>>>> > >>>> currentStateAsMap.remove(toModifyKey); >>>>>> > >>>> if(currentStateAsMap.isEmpty()){ >>>>>> > >>>> return null; >>>>>> > >>>> } >>>>>> > >>>> } >>>>>> > >>>> retrun asAggregateType(currentStateAsMap) >>>>>> > >>>> } >>>>>> > >>>> >>>>>> > >>>> >>>>>> > >>>> >>>>>> > >>>> >>>>>> > >>>> >>>>>> > >>>> Thanks, >>>>>> > >>>>> Adam >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak < >>>>>> > jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>> >>>>>> >>>>>> > >>>>> wrote: >>>>>> > >>>>> >>>>>> > >>>>> Thanks Adam for bringing Matthias to speed! >>>>>> > >>>>>> about the differences. I think re-keying back should be >>>>>> optional at >>>>>> > >>>>>> best. >>>>>> > >>>>>> I would say we return a KScatteredTable with reshuffle() >>>>>> returning >>>>>> > >>>>>> KTable<originalKey,Joined> to make the backwards >>>>>> repartitioning >>>>>> > >>>>>> optional. >>>>>> > >>>>>> I am also in a big favour of doing the out of order >>>>>> processing using >>>>>> > >>>>>> group >>>>>> > >>>>>> by instead high water mark tracking. >>>>>> > >>>>>> Just because unbounded growth is just scary + It saves >>>>>> us >>>>>> the header >>>>>> > >>>>>> stuff. >>>>>> > >>>>>> >>>>>> > >>>>>> I think the abstraction of always repartitioning back is >>>>>> just not so >>>>>> > >>>>>> strong. Like the work has been done before we partition >>>>>> back and >>>>>> > >>>>>> grouping >>>>>> > >>>>>> by something else afterwards is really common. >>>>>> > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote: >>>>>> > >>>>>> >>>>>> > >>>>>> Hi Matthias >>>>>> > >>>>>>> Thank you for your feedback, I do appreciate it! >>>>>> > >>>>>>> >>>>>> > >>>>>>> While name spacing would be possible, it would require >>>>>> to >>>>>> > deserialize >>>>>> > >>>>>>> >>>>>> > >>>>>>>> user headers what implies a runtime overhead. I would >>>>>> suggest to >>>>>> > no >>>>>> > >>>>>>>> namespace for now to avoid the overhead. If this >>>>>> >>>>> becomes a >>> >>>> > problem in >>>>>> > >>>>>>>> the future, we can still add name spacing later on. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Agreed. I will go with using a reserved string and >>>>>> document it. >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> My main concern about the design it the type of the >>>>>> result KTable: >>>>>> > If >>>>>> > >>>>>>> I >>>>>> > >>>>>>> understood the proposal correctly, >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> In your example, you have table1 and table2 swapped. >>>>>> Here is how it >>>>>> > >>>>>>> works >>>>>> > >>>>>>> currently: >>>>>> > >>>>>>> >>>>>> > >>>>>>> 1) table1 has the records that contain the foreign key >>>>>> within their >>>>>> > >>>>>>> value. >>>>>> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, >>>>>> <b,(fk=A,bar=2)>, >>>>>> > >>>>>>> <c,(fk=B,bar=3)> >>>>>> > >>>>>>> table2 input stream: <A,X>, <B,Y> >>>>>> > >>>>>>> >>>>>> > >>>>>>> 2) A Value mapper is required to extract the foreign >>>>>> key. >>>>>> > >>>>>>> table1 foreign key mapper: ( value => value.fk >>>>>> <http://value.fk> ) >>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> The mapper is applied to each element in table1, and a >>>>>> new combined >>>>>> > >>>>>>> key is >>>>>> > >>>>>>> made: >>>>>> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, >>>>>> (fk=A,bar=2)>, >>>>>> <B-c, >>>>>> > >>>>>>> (fk=B,bar=3)> >>>>>> > >>>>>>> >>>>>> > >>>>>>> 3) The rekeyed events are copartitioned with table2: >>>>>> > >>>>>>> >>>>>> > >>>>>>> a) Stream Thread with Partition 0: >>>>>> > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, >>>>>> (fk=A,bar=2)> >>>>>> > >>>>>>> Table2: <A,X> >>>>>> > >>>>>>> >>>>>> > >>>>>>> b) Stream Thread with Partition 1: >>>>>> > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> >>>>>> > >>>>>>> Table2: <B,Y> >>>>>> > >>>>>>> >>>>>> > >>>>>>> 4) From here, they can be joined together locally by >>>>>> applying the >>>>>> > >>>>>>> joiner >>>>>> > >>>>>>> function. >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> At this point, Jan's design and my design deviate. My >>>>>> design goes >>>>>> > on >>>>>> > >>>>>>> to >>>>>> > >>>>>>> repartition the data post-join and resolve out-of-order >>>>>> arrival of >>>>>> > >>>>>>> records, >>>>>> > >>>>>>> finally returning the data keyed just the original key. >>>>>> I do not >>>>>> > >>>>>>> expose >>>>>> > >>>>>>> the >>>>>> > >>>>>>> CombinedKey or any of the internals outside of the >>>>>> joinOnForeignKey >>>>>> > >>>>>>> function. This does make for larger footprint, but it >>>>>> removes all >>>>>> > >>>>>>> agency >>>>>> > >>>>>>> for resolving out-of-order arrivals and handling >>>>>> CombinedKeys from >>>>>> > the >>>>>> > >>>>>>> user. I believe that this makes the function much >>>>>> easier >>>>>> to use. >>>>>> > >>>>>>> >>>>>> > >>>>>>> Let me know if this helps resolve your questions, and >>>>>> please feel >>>>>> > >>>>>>> free to >>>>>> > >>>>>>> add anything else on your mind. >>>>>> > >>>>>>> >>>>>> > >>>>>>> Thanks again, >>>>>> > >>>>>>> Adam >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax < >>>>>> > >>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>> >>>>>> >>>>>> > >>>>>>> wrote: >>>>>> > >>>>>>> >>>>>> > >>>>>>> Hi, >>>>>> > >>>>>>> >>>>>> > >>>>>>>> I am just catching up on this thread. I did not read >>>>>> everything so >>>>>> > >>>>>>>> far, >>>>>> > >>>>>>>> but want to share couple of initial thoughts: >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Headers: I think there is a fundamental difference >>>>>> between header >>>>>> > >>>>>>>> usage >>>>>> > >>>>>>>> in this KIP and KP-258. For 258, we add headers to >>>>>> changelog topic >>>>>> > >>>>>>>> that >>>>>> > >>>>>>>> are owned by Kafka Streams and nobody else is supposed >>>>>> to write >>>>>> > into >>>>>> > >>>>>>>> them. In fact, no user header are written into the >>>>>> changelog topic >>>>>> > >>>>>>>> and >>>>>> > >>>>>>>> thus, there are not conflicts. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Nevertheless, I don't see a big issue with using >>>>>> headers within >>>>>> > >>>>>>>> Streams. >>>>>> > >>>>>>>> As long as we document it, we can have some "reserved" >>>>>> header keys >>>>>> > >>>>>>>> and >>>>>> > >>>>>>>> users are not allowed to use when processing data with >>>>>> Kafka >>>>>> > Streams. >>>>>> > >>>>>>>> IMHO, this should be ok. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> I think there is a safe way to avoid conflicts, since >>>>>> these >>>>>> > headers >>>>>> > >>>>>>>> are >>>>>> > >>>>>>>> >>>>>> > >>>>>>>>> only needed in internal topics (I think): >>>>>> > >>>>>>>>> For internal and changelog topics, we can namespace >>>>>> all headers: >>>>>> > >>>>>>>>> * user-defined headers are namespaced as "external." >>>>>> + >>>>>> headerKey >>>>>> > >>>>>>>>> * internal headers are namespaced as "internal." + >>>>>> headerKey >>>>>> > >>>>>>>>> >>>>>> > >>>>>>>>> While name spacing would be possible, it would >>>>>> require >>>>>> >>>>> to >>> >>>> > >>>>>>>> deserialize >>>>>> > >>>>>>>> user headers what implies a runtime overhead. I would >>>>>> suggest to >>>>>> > no >>>>>> > >>>>>>>> namespace for now to avoid the overhead. If this >>>>>> >>>>> becomes a >>> >>>> > problem in >>>>>> > >>>>>>>> the future, we can still add name spacing later on. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> My main concern about the design it the type of the >>>>>> result KTable: >>>>>> > >>>>>>>> If I >>>>>> > >>>>>>>> understood the proposal correctly, >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... >>>>>> > >>>>>>>> KTable<K2,V2> table2 = ... >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> implies that the `joinedTable` has the same key as the >>>>>> left input >>>>>> > >>>>>>>> table. >>>>>> > >>>>>>>> IMHO, this does not work because if table2 contains >>>>>> multiple rows >>>>>> > >>>>>>>> that >>>>>> > >>>>>>>> join with a record in table1 (what is the main purpose >>>>>> >>>>> of >>> >>>> a >>>>>> > foreign >>>>>> > >>>>>>>> key >>>>>> > >>>>>>>> join), the result table would only contain a single >>>>>> join result, >>>>>> > but >>>>>> > >>>>>>>> not >>>>>> > >>>>>>>> multiple. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Example: >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> table1 input stream: <A,X> >>>>>> > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> We use table2 value a foreign key to table1 key (ie, >>>>>> "A" joins). >>>>>> > If >>>>>> > >>>>>>>> the >>>>>> > >>>>>>>> result key is the same key as key of table1, this >>>>>> implies that the >>>>>> > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> >>>>>> but not >>>>>> > both. >>>>>> > >>>>>>>> Because the share the same key, whatever result record >>>>>> we emit >>>>>> > later, >>>>>> > >>>>>>>> overwrite the previous result. >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> This is the reason why Jan originally proposed to use >>>>>> a >>>>>> > combination >>>>>> > >>>>>>>> of >>>>>> > >>>>>>>> both primary keys of the input tables as key of the >>>>>> output table. >>>>>> > >>>>>>>> This >>>>>> > >>>>>>>> makes the keys of the output table unique and we can >>>>>> store both in >>>>>> > >>>>>>>> the >>>>>> > >>>>>>>> output table: >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Thoughts? >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> -Matthias >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >>>>>> > >>>>>>>> >>>>>> > >>>>>>>> Just on remark here. >>>>>> > >>>>>>>>> The high-watermark could be disregarded. The decision >>>>>> about the >>>>>> > >>>>>>>>> forward >>>>>> > >>>>>>>>> depends on the size of the aggregated map. >>>>>> > >>>>>>>>> Only 1 element long maps would be unpacked and >>>>>> forwarded. 0 >>>>>> > element >>>>>> > >>>>>>>>> maps >>>>>> > >>>>>>>>> would be published as delete. Any other count >>>>>> > >>>>>>>>> of map entries is in "waiting for correct deletes to >>>>>> > arrive"-state. >>>>>> > >>>>>>>>> >>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: >>>>>> > >>>>>>>>> >>>>>> > >>>>>>>>> It does look like I could replace the second >>>>>> repartition store >>>>>> > and >>>>>> > >>>>>>>>>> highwater store with a groupBy and reduce. However, >>>>>> it looks >>>>>> > like >>>>>> > >>>>>>>>>> I >>>>>> > >>>>>>>>>> would >>>>>> > >>>>>>>>>> still need to store the highwater value within the >>>>>> materialized >>>>>> > >>>>>>>>>> store, >>>>>> > >>>>>>>>>> >>>>>> > >>>>>>>>>> to >>>>>> > >>>>>>>>> compare the arrival of out-of-order records (assuming >>>>>> >>>>> my >>> >>>> > >>>>>>>>> understanding >>>>>> > >>>>>>>>> of >>>>>> > >>>>>>>>> THIS is correct...). This in effect is the same as >>>>>> the >>>>>> design I >>>>>> > have >>>>>> > >>>>>>>>> now, >>>>>> > >>>>>>>>> just with the two tables merged together. >>>>>> > >>>>>>>>> >>>>>> > >>>>>> > >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >>> >> >