@Guozhang Thanks for the information. This is indeed something that will be extremely useful for this KIP.
@Jan Thanks for your explanations. That being said, I will not be moving ahead with an implementation using reshuffle/groupBy solution as you propose. That being said, if you wish to implement it yourself off of my current PR and submit it as a competitive alternative, I would be more than happy to help vet that as an alternate solution. As it stands right now, I do not really have more time to invest into alternatives without there being a strong indication from the binding voters which they would prefer. I will look at finishing up my PR with the windowed state store in the next week or so, exercising it via tests, and then I will come back for final discussions. In the meantime, I hope that any of the binding voters could take a look at the KIP in the wiki. I have updated it according to the latest plan: https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable I have also updated the KIP PR to use a windowed store. This could be replaced by the results of KIP-258 whenever they are completed. https://github.com/apache/kafka/pull/5527 Thanks, Adam On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <[email protected]> wrote: > Correction on my previous email: KAFKA-5533 is the wrong link, as it is for > corresponding changelog mechanisms. But as part of KIP-258 we do want to > have "handling out-of-order data for source KTable" such that instead of > blindly apply the updates to the materialized store, i.e. following offset > ordering, we will reject updates that are older than the current key's > timestamps, i.e. following timestamp ordering. > > > Guozhang > > On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <[email protected]> > wrote: > > > Hello Adam, > > > > Thanks for the explanation. Regarding the final step (i.e. the high > > watermark store, now altered to be replaced with a window store), I think > > another current on-going KIP may actually help: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > > > > > > This is for adding the timestamp into a key-value store (i.e. only for > > non-windowed KTable), and then one of its usage, as described in > > https://issues.apache.org/jira/browse/KAFKA-5533, is that we can then > > "reject" updates from the source topics if its timestamp is smaller than > > the current key's latest update timestamp. I think it is very similar to > > what you have in mind for high watermark based filtering, while you only > > need to make sure that the timestamps of the joining records are > correctly > > inherited though the whole topology to the final stage. > > > > Note that this KIP is for key-value store and hence non-windowed KTables > > only, but for windowed KTables we do not really have a good support for > > their joins anyways (https://issues.apache.org/jira/browse/KAFKA-7107) I > > think we can just consider non-windowed KTable-KTable non-key joins for > > now. In which case, KIP-258 should help. > > > > > > > > Guozhang > > > > > > > > On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <[email protected]> > > wrote: > > > >> > >> On 11.09.2018 18:00, Adam Bellemare wrote: > >> > >>> Hi Guozhang > >>> > >>> Current highwater mark implementation would grow endlessly based on > >>> primary key of original event. It is a pair of (<this table primary > key>, > >>> <highest offset seen for that key>). This is used to differentiate > between > >>> late arrivals and new updates. My newest proposal would be to replace > it > >>> with a Windowed state store of Duration N. This would allow the same > >>> behaviour, but cap the size based on time. This should allow for all > >>> late-arriving events to be processed, and should be customizable by the > >>> user to tailor to their own needs (ie: perhaps just 10 minutes of > window, > >>> or perhaps 7 days...). > >>> > >> Hi Adam, using time based retention can do the trick here. Even if I > >> would still like to see the automatic repartitioning optional since I > would > >> just reshuffle again. With windowed store I am a little bit sceptical > about > >> how to determine the window. So esentially one could run into problems > when > >> the rapid change happens near a window border. I will check you > >> implementation in detail, if its problematic, we could still check _all_ > >> windows on read with not to bad performance impact I guess. Will let you > >> know if the implementation would be correct as is. I wouldn't not like > to > >> assume that: offset(A) < offset(B) => timestamp(A) < timestamp(B). I > think > >> we can't expect that. > >> > >>> > >>> > >>> @Jan > >>> I believe I understand what you mean now - thanks for the diagram, it > >>> did really help. You are correct that I do not have the original > primary > >>> key available, and I can see that if it was available then you would be > >>> able to add and remove events from the Map. That being said, I > encourage > >>> you to finish your diagrams / charts just for clarity for everyone > else. > >>> > >>> Yeah 100%, this giphy thing is just really hard work. But I understand > >> the benefits for the rest. Sorry about the original primary key, We have > >> join and Group by implemented our own in PAPI and basically not using > any > >> DSL (Just the abstraction). Completely missed that in original DSL its > not > >> there and just assumed it. total brain mess up on my end. Will finish > the > >> chart as soon as i get a quite evening this week. > >> > >> My follow up question for you is, won't the Map stay inside the State > >>> Store indefinitely after all of the changes have propagated? Isn't this > >>> effectively the same as a highwater mark state store? > >>> > >> Thing is that if the map is empty, substractor is gonna return `null` > and > >> the key is removed from the keyspace. But there is going to be a store > >> 100%, the good thing is that I can use this store directly for > >> materialize() / enableSendingOldValues() is a regular store, satisfying > >> all gurantees needed for further groupby / join. The Windowed store is > not > >> keeping the values, so for the next statefull operation we would > >> need to instantiate an extra store. or we have the window store also > have > >> the values then. > >> > >> Long story short. if we can flip in a custom group by before > >> repartitioning to the original primary key i think it would help the > users > >> big time in building efficient apps. Given the original primary key > issue I > >> understand that we do not have a solid foundation to build on. > >> Leaving primary key carry along to the user. very unfortunate. I could > >> understand the decision goes like that. I do not think its a good > decision. > >> > >> > >>> > >>> > >>> > >>> Thanks > >>> Adam > >>> > >>> > >>> > >>> > >>> > >>> > >>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre < > >>> [email protected] <mailto:[email protected]>> > wrote: > >>> > >>> please remove me from this group > >>> > >>> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak > >>> <[email protected] <mailto:[email protected]>> > >>> > >>> wrote: > >>> > >>> > Hi Adam, > >>> > > >>> > give me some time, will make such a chart. last time i didn't > >>> get along > >>> > well with giphy and ruined all your charts. > >>> > Hopefully i can get it done today > >>> > > >>> > On 08.09.2018 16:00, Adam Bellemare wrote: > >>> > > Hi Jan > >>> > > > >>> > > I have included a diagram of what I attempted on the KIP. > >>> > > > >>> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su > >>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining > >>> inKTable-GroupBy+Reduce/Aggregate > >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S > >>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin > >>> ginKTable-GroupBy+Reduce/Aggregate> > >>> > > > >>> > > I attempted this back at the start of my own implementation of > >>> this > >>> > > solution, and since I could not get it to work I have since > >>> discarded the > >>> > > code. At this point in time, if you wish to continue pursuing > >>> for your > >>> > > groupBy solution, I ask that you please create a diagram on > >>> the KIP > >>> > > carefully explaining your solution. Please feel free to use > >>> the image I > >>> > > just posted as a starting point. I am having trouble > >>> understanding your > >>> > > explanations but I think that a carefully constructed diagram > >>> will clear > >>> > up > >>> > > any misunderstandings. Alternately, please post a > >>> comprehensive PR with > >>> > > your solution. I can only guess at what you mean, and since I > >>> value my > >>> > own > >>> > > time as much as you value yours, I believe it is your > >>> responsibility to > >>> > > provide an implementation instead of me trying to guess. > >>> > > > >>> > > Adam > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak > >>> <[email protected] <mailto:[email protected]>> > >>> > >>> > > wrote: > >>> > > > >>> > >> Hi James, > >>> > >> > >>> > >> nice to see you beeing interested. kafka streams at this > >>> point supports > >>> > >> all sorts of joins as long as both streams have the same key. > >>> > >> Adam is currently implementing a join where a KTable and a > >>> KTable can > >>> > have > >>> > >> a one to many relation ship (1:n). We exploit that rocksdb is > a > >>> > >> datastore that keeps data sorted (At least exposes an API to > >>> access the > >>> > >> stored data in a sorted fashion). > >>> > >> > >>> > >> I think the technical caveats are well understood now and we > are > >>> > basically > >>> > >> down to philosophy and API Design ( when Adam sees my newest > >>> message). > >>> > >> I have a lengthy track record of loosing those kinda > >>> arguments within > >>> > the > >>> > >> streams community and I have no clue why. So I literally > >>> can't wait for > >>> > you > >>> > >> to churn through this thread and give you opinion on how we > >>> should > >>> > design > >>> > >> the return type of the oneToManyJoin and how many power we > >>> want to give > >>> > to > >>> > >> the user vs "simplicity" (where simplicity isn't really that > >>> as users > >>> > still > >>> > >> need to understand it I argue) > >>> > >> > >>> > >> waiting for you to join in on the discussion > >>> > >> > >>> > >> Best Jan > >>> > >> > >>> > >> > >>> > >> > >>> > >> On 07.09.2018 15:49, James Kwan wrote: > >>> > >> > >>> > >>> I am new to this group and I found this subject > >>> interesting. Sounds > >>> > like > >>> > >>> you guys want to implement a join table of two streams? Is > >>> there > >>> > somewhere > >>> > >>> I can see the original requirement or proposal? > >>> > >>> > >>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak > >>> <[email protected] <mailto:[email protected]>> > >>> > >>> > >>>> wrote: > >>> > >>>> > >>> > >>>> > >>> > >>>> On 05.09.2018 22:17, Adam Bellemare wrote: > >>> > >>>> > >>> > >>>>> I'm currently testing using a Windowed Store to store the > >>> highwater > >>> > >>>>> mark. > >>> > >>>>> By all indications this should work fine, with the caveat > >>> being that > >>> > it > >>> > >>>>> can > >>> > >>>>> only resolve out-of-order arrival for up to the size of > >>> the window > >>> > (ie: > >>> > >>>>> 24h, 72h, etc). This would remove the possibility of it > being > >>> > unbounded > >>> > >>>>> in > >>> > >>>>> size. > >>> > >>>>> > >>> > >>>>> With regards to Jan's suggestion, I believe this is where > >>> we will > >>> > have > >>> > >>>>> to > >>> > >>>>> remain in disagreement. While I do not disagree with your > >>> statement > >>> > >>>>> about > >>> > >>>>> there likely to be additional joins done in a real-world > >>> workflow, I > >>> > do > >>> > >>>>> not > >>> > >>>>> see how you can conclusively deal with out-of-order arrival > >>> of > >>> > >>>>> foreign-key > >>> > >>>>> changes and subsequent joins. I have attempted what I > >>> think you have > >>> > >>>>> proposed (without a high-water, using groupBy and reduce) > >>> and found > >>> > >>>>> that if > >>> > >>>>> the foreign key changes too quickly, or the load on a > >>> stream thread > >>> > is > >>> > >>>>> too > >>> > >>>>> high, the joined messages will arrive out-of-order and be > >>> incorrectly > >>> > >>>>> propagated, such that an intermediate event is represented > >>> as the > >>> > final > >>> > >>>>> event. > >>> > >>>>> > >>> > >>>> Can you shed some light on your groupBy implementation. > >>> There must be > >>> > >>>> some sort of flaw in it. > >>> > >>>> I have a suspicion where it is, I would just like to > >>> confirm. The idea > >>> > >>>> is bullet proof and it must be > >>> > >>>> an implementation mess up. I would like to clarify before > >>> we draw a > >>> > >>>> conclusion. > >>> > >>>> > >>> > >>>> Repartitioning the scattered events back to their > original > >>> > >>>>> partitions is the only way I know how to conclusively deal > >>> with > >>> > >>>>> out-of-order events in a given time frame, and to ensure > >>> that the > >>> > data > >>> > >>>>> is > >>> > >>>>> eventually consistent with the input events. > >>> > >>>>> > >>> > >>>>> If you have some code to share that illustrates your > >>> approach, I > >>> > would > >>> > >>>>> be > >>> > >>>>> very grateful as it would remove any misunderstandings > >>> that I may > >>> > have. > >>> > >>>>> > >>> > >>>> ah okay you were looking for my code. I don't have > >>> something easily > >>> > >>>> readable here as its bloated with OO-patterns. > >>> > >>>> > >>> > >>>> its anyhow trivial: > >>> > >>>> > >>> > >>>> @Override > >>> > >>>> public T apply(K aggKey, V value, T aggregate) > >>> > >>>> { > >>> > >>>> Map<U, V> currentStateAsMap = asMap(aggregate); << > >>> imaginary > >>> > >>>> U toModifyKey = mapper.apply(value); > >>> > >>>> << this is the place where people actually > >>> gonna have > >>> > issues > >>> > >>>> and why you probably couldn't do it. we would need to find > >>> a solution > >>> > here. > >>> > >>>> I didn't realize that yet. > >>> > >>>> << we propagate the field in the joiner, so > >>> that we can > >>> > pick > >>> > >>>> it up in an aggregate. Probably you have not thought of > >>> this in your > >>> > >>>> approach right? > >>> > >>>> << I am very open to find a generic solution > >>> here. In my > >>> > >>>> honest opinion this is broken in KTableImpl.GroupBy that it > >>> looses > >>> > the keys > >>> > >>>> and only maintains the aggregate key. > >>> > >>>> << I abstracted it away back then way before i > >>> was > >>> > thinking > >>> > >>>> of oneToMany join. That is why I didn't realize its > >>> significance here. > >>> > >>>> << Opinions? > >>> > >>>> > >>> > >>>> for (V m : current) > >>> > >>>> { > >>> > >>>> currentStateAsMap.put(mapper.apply(m), m); > >>> > >>>> } > >>> > >>>> if (isAdder) > >>> > >>>> { > >>> > >>>> currentStateAsMap.put(toModifyKey, value); > >>> > >>>> } > >>> > >>>> else > >>> > >>>> { > >>> > >>>> currentStateAsMap.remove(toModifyKey); > >>> > >>>> if(currentStateAsMap.isEmpty()){ > >>> > >>>> return null; > >>> > >>>> } > >>> > >>>> } > >>> > >>>> retrun asAggregateType(currentStateAsMap) > >>> > >>>> } > >>> > >>>> > >>> > >>>> > >>> > >>>> > >>> > >>>> > >>> > >>>> > >>> > >>>> Thanks, > >>> > >>>>> Adam > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak < > >>> > [email protected] <mailto:[email protected]>> > >>> > >>> > >>>>> wrote: > >>> > >>>>> > >>> > >>>>> Thanks Adam for bringing Matthias to speed! > >>> > >>>>>> about the differences. I think re-keying back should be > >>> optional at > >>> > >>>>>> best. > >>> > >>>>>> I would say we return a KScatteredTable with reshuffle() > >>> returning > >>> > >>>>>> KTable<originalKey,Joined> to make the backwards > >>> repartitioning > >>> > >>>>>> optional. > >>> > >>>>>> I am also in a big favour of doing the out of order > >>> processing using > >>> > >>>>>> group > >>> > >>>>>> by instead high water mark tracking. > >>> > >>>>>> Just because unbounded growth is just scary + It saves us > >>> the header > >>> > >>>>>> stuff. > >>> > >>>>>> > >>> > >>>>>> I think the abstraction of always repartitioning back is > >>> just not so > >>> > >>>>>> strong. Like the work has been done before we partition > >>> back and > >>> > >>>>>> grouping > >>> > >>>>>> by something else afterwards is really common. > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote: > >>> > >>>>>> > >>> > >>>>>> Hi Matthias > >>> > >>>>>>> Thank you for your feedback, I do appreciate it! > >>> > >>>>>>> > >>> > >>>>>>> While name spacing would be possible, it would require to > >>> > deserialize > >>> > >>>>>>> > >>> > >>>>>>>> user headers what implies a runtime overhead. I would > >>> suggest to > >>> > no > >>> > >>>>>>>> namespace for now to avoid the overhead. If this > becomes a > >>> > problem in > >>> > >>>>>>>> the future, we can still add name spacing later on. > >>> > >>>>>>>> > >>> > >>>>>>>> Agreed. I will go with using a reserved string and > >>> document it. > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> My main concern about the design it the type of the > >>> result KTable: > >>> > If > >>> > >>>>>>> I > >>> > >>>>>>> understood the proposal correctly, > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> In your example, you have table1 and table2 swapped. > >>> Here is how it > >>> > >>>>>>> works > >>> > >>>>>>> currently: > >>> > >>>>>>> > >>> > >>>>>>> 1) table1 has the records that contain the foreign key > >>> within their > >>> > >>>>>>> value. > >>> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, > >>> > >>>>>>> <c,(fk=B,bar=3)> > >>> > >>>>>>> table2 input stream: <A,X>, <B,Y> > >>> > >>>>>>> > >>> > >>>>>>> 2) A Value mapper is required to extract the foreign key. > >>> > >>>>>>> table1 foreign key mapper: ( value => value.fk > >>> <http://value.fk> ) > >>> > >>> > >>>>>>> > >>> > >>>>>>> The mapper is applied to each element in table1, and a > >>> new combined > >>> > >>>>>>> key is > >>> > >>>>>>> made: > >>> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, > >>> <B-c, > >>> > >>>>>>> (fk=B,bar=3)> > >>> > >>>>>>> > >>> > >>>>>>> 3) The rekeyed events are copartitioned with table2: > >>> > >>>>>>> > >>> > >>>>>>> a) Stream Thread with Partition 0: > >>> > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, > >>> (fk=A,bar=2)> > >>> > >>>>>>> Table2: <A,X> > >>> > >>>>>>> > >>> > >>>>>>> b) Stream Thread with Partition 1: > >>> > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)> > >>> > >>>>>>> Table2: <B,Y> > >>> > >>>>>>> > >>> > >>>>>>> 4) From here, they can be joined together locally by > >>> applying the > >>> > >>>>>>> joiner > >>> > >>>>>>> function. > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> At this point, Jan's design and my design deviate. My > >>> design goes > >>> > on > >>> > >>>>>>> to > >>> > >>>>>>> repartition the data post-join and resolve out-of-order > >>> arrival of > >>> > >>>>>>> records, > >>> > >>>>>>> finally returning the data keyed just the original key. > >>> I do not > >>> > >>>>>>> expose > >>> > >>>>>>> the > >>> > >>>>>>> CombinedKey or any of the internals outside of the > >>> joinOnForeignKey > >>> > >>>>>>> function. This does make for larger footprint, but it > >>> removes all > >>> > >>>>>>> agency > >>> > >>>>>>> for resolving out-of-order arrivals and handling > >>> CombinedKeys from > >>> > the > >>> > >>>>>>> user. I believe that this makes the function much easier > >>> to use. > >>> > >>>>>>> > >>> > >>>>>>> Let me know if this helps resolve your questions, and > >>> please feel > >>> > >>>>>>> free to > >>> > >>>>>>> add anything else on your mind. > >>> > >>>>>>> > >>> > >>>>>>> Thanks again, > >>> > >>>>>>> Adam > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> > >>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax < > >>> > >>>>>>> [email protected] <mailto:[email protected]>> > >>> > >>> > >>>>>>> wrote: > >>> > >>>>>>> > >>> > >>>>>>> Hi, > >>> > >>>>>>> > >>> > >>>>>>>> I am just catching up on this thread. I did not read > >>> everything so > >>> > >>>>>>>> far, > >>> > >>>>>>>> but want to share couple of initial thoughts: > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> Headers: I think there is a fundamental difference > >>> between header > >>> > >>>>>>>> usage > >>> > >>>>>>>> in this KIP and KP-258. For 258, we add headers to > >>> changelog topic > >>> > >>>>>>>> that > >>> > >>>>>>>> are owned by Kafka Streams and nobody else is supposed > >>> to write > >>> > into > >>> > >>>>>>>> them. In fact, no user header are written into the > >>> changelog topic > >>> > >>>>>>>> and > >>> > >>>>>>>> thus, there are not conflicts. > >>> > >>>>>>>> > >>> > >>>>>>>> Nevertheless, I don't see a big issue with using > >>> headers within > >>> > >>>>>>>> Streams. > >>> > >>>>>>>> As long as we document it, we can have some "reserved" > >>> header keys > >>> > >>>>>>>> and > >>> > >>>>>>>> users are not allowed to use when processing data with > >>> Kafka > >>> > Streams. > >>> > >>>>>>>> IMHO, this should be ok. > >>> > >>>>>>>> > >>> > >>>>>>>> I think there is a safe way to avoid conflicts, since > >>> these > >>> > headers > >>> > >>>>>>>> are > >>> > >>>>>>>> > >>> > >>>>>>>>> only needed in internal topics (I think): > >>> > >>>>>>>>> For internal and changelog topics, we can namespace > >>> all headers: > >>> > >>>>>>>>> * user-defined headers are namespaced as "external." + > >>> headerKey > >>> > >>>>>>>>> * internal headers are namespaced as "internal." + > >>> headerKey > >>> > >>>>>>>>> > >>> > >>>>>>>>> While name spacing would be possible, it would require > to > >>> > >>>>>>>> deserialize > >>> > >>>>>>>> user headers what implies a runtime overhead. I would > >>> suggest to > >>> > no > >>> > >>>>>>>> namespace for now to avoid the overhead. If this > becomes a > >>> > problem in > >>> > >>>>>>>> the future, we can still add name spacing later on. > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> My main concern about the design it the type of the > >>> result KTable: > >>> > >>>>>>>> If I > >>> > >>>>>>>> understood the proposal correctly, > >>> > >>>>>>>> > >>> > >>>>>>>> KTable<K1,V1> table1 = ... > >>> > >>>>>>>> KTable<K2,V2> table2 = ... > >>> > >>>>>>>> > >>> > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...); > >>> > >>>>>>>> > >>> > >>>>>>>> implies that the `joinedTable` has the same key as the > >>> left input > >>> > >>>>>>>> table. > >>> > >>>>>>>> IMHO, this does not work because if table2 contains > >>> multiple rows > >>> > >>>>>>>> that > >>> > >>>>>>>> join with a record in table1 (what is the main purpose > of > >>> a > >>> > foreign > >>> > >>>>>>>> key > >>> > >>>>>>>> join), the result table would only contain a single > >>> join result, > >>> > but > >>> > >>>>>>>> not > >>> > >>>>>>>> multiple. > >>> > >>>>>>>> > >>> > >>>>>>>> Example: > >>> > >>>>>>>> > >>> > >>>>>>>> table1 input stream: <A,X> > >>> > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)> > >>> > >>>>>>>> > >>> > >>>>>>>> We use table2 value a foreign key to table1 key (ie, > >>> "A" joins). > >>> > If > >>> > >>>>>>>> the > >>> > >>>>>>>> result key is the same key as key of table1, this > >>> implies that the > >>> > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> > >>> but not > >>> > both. > >>> > >>>>>>>> Because the share the same key, whatever result record > >>> we emit > >>> > later, > >>> > >>>>>>>> overwrite the previous result. > >>> > >>>>>>>> > >>> > >>>>>>>> This is the reason why Jan originally proposed to use a > >>> > combination > >>> > >>>>>>>> of > >>> > >>>>>>>> both primary keys of the input tables as key of the > >>> output table. > >>> > >>>>>>>> This > >>> > >>>>>>>> makes the keys of the output table unique and we can > >>> store both in > >>> > >>>>>>>> the > >>> > >>>>>>>> output table: > >>> > >>>>>>>> > >>> > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> Thoughts? > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> -Matthias > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> > >>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote: > >>> > >>>>>>>> > >>> > >>>>>>>> Just on remark here. > >>> > >>>>>>>>> The high-watermark could be disregarded. The decision > >>> about the > >>> > >>>>>>>>> forward > >>> > >>>>>>>>> depends on the size of the aggregated map. > >>> > >>>>>>>>> Only 1 element long maps would be unpacked and > >>> forwarded. 0 > >>> > element > >>> > >>>>>>>>> maps > >>> > >>>>>>>>> would be published as delete. Any other count > >>> > >>>>>>>>> of map entries is in "waiting for correct deletes to > >>> > arrive"-state. > >>> > >>>>>>>>> > >>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote: > >>> > >>>>>>>>> > >>> > >>>>>>>>> It does look like I could replace the second > >>> repartition store > >>> > and > >>> > >>>>>>>>>> highwater store with a groupBy and reduce. However, > >>> it looks > >>> > like > >>> > >>>>>>>>>> I > >>> > >>>>>>>>>> would > >>> > >>>>>>>>>> still need to store the highwater value within the > >>> materialized > >>> > >>>>>>>>>> store, > >>> > >>>>>>>>>> > >>> > >>>>>>>>>> to > >>> > >>>>>>>>> compare the arrival of out-of-order records (assuming > my > >>> > >>>>>>>>> understanding > >>> > >>>>>>>>> of > >>> > >>>>>>>>> THIS is correct...). This in effect is the same as the > >>> design I > >>> > have > >>> > >>>>>>>>> now, > >>> > >>>>>>>>> just with the two tables merged together. > >>> > >>>>>>>>> > >>> > > >>> > > >>> > >>> > >>> > >> > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang >
