On 10.12.2018 07:42, Guozhang Wang wrote: > Hello Adam / Jan / John, > > Sorry for being late on this thread! I've finally got some time this > weekend to cleanup a load of tasks on my queue (actually I've also realized > there are a bunch of other things I need to enqueue while cleaning them up > --- sth I need to improve on my side). So here are my thoughts: > > Regarding the APIs: I like the current written API in the KIP. More > generally I'd prefer to keep the 1) one-to-many join functionalities as > well as 2) other join types than inner as separate KIPs since 1) may worth > a general API refactoring that can benefit not only foreignkey joins but > collocate joins as well (e.g. an extended proposal of > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup), > and I'm not sure if other join types would actually be needed (maybe left > join still makes sense), so it's better to wait-for-people-to-ask-and-add > than add-sth-that-no-one-uses. > > Regarding whether we enforce step 3) / 4) v.s. introducing a > KScatteredTable for users to inject their own optimization: I'd prefer to > do the current option as-is, and my main rationale is for optimization > rooms inside the Streams internals and the API succinctness. For advanced > users who may indeed prefer KScatteredTable and do their own optimization, > while it is too much of the work to use Processor API directly, I think we > can still extend the current API to support it in the future if it becomes > necessary.
no internal optimization potential. it's a myth ¯\_(ツ)_/¯ :-) > > Another note about step 4) resolving out-of-ordering data, as I mentioned > before I think with KIP-258 (embedded timestamp with key-value store) we > can actually make this step simpler than the current proposal. In fact, we > can just keep a single final-result store with timestamps and reject values > that have a smaller timestamp, is that right? Which is the correct output should at least be decided on the offset of the original message. > > > That's all I have in mind now. Again, great appreciation to Adam to make > such HUGE progress on this KIP! > > > Guozhang > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> If they don't find the time: >> They usually take the opposite path from me :D >> so the answer would be clear. >> >> hence my suggestion to vote. >> >> >> On 04.12.2018 21:06, Adam Bellemare wrote: >>> Hi Guozhang and Matthias >>> >>> I know both of you are quite busy, but we've gotten this KIP to a point >>> where we need more guidance on the API (perhaps a bit of a tie-breaker, >> if >>> you will). If you have anyone else you may think should look at this, >>> please tag them accordingly. >>> >>> The scenario is as such: >>> >>> Current Option: >>> API: >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces >>> 1) Rekey the data to CombinedKey, and shuffles it to the partition with >> the >>> foreignKey (repartition 1) >>> 2) Join the data >>> 3) Shuffle the data back to the original node (repartition 2) >>> 4) Resolve out-of-order arrival / race condition due to foreign-key >> changes. >>> >>> Alternate Option: >>> Perform #1 and #2 above, and return a KScatteredTable. >>> - It would be keyed on a wrapped key function: <CombinedKey<KO, K>, VR> >> (KO >>> = Other Table Key, K = This Table Key, VR = Joined Result) >>> - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user >>> would be able to perform additional functions directly from the >>> KScatteredTable (TBD - currently out of scope). >>> - John's analysis 2-emails up is accurate as to the tradeoffs. >>> >>> Current Option is coded as-is. Alternate option is possible, but will >>> require for implementation details to be made in the API and some >> exposure >>> of new data structures into the API (ie: CombinedKey). >>> >>> I appreciate any insight into this. >>> >>> Thanks. >>> >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <adam.bellem...@gmail.com> >>> wrote: >>> >>>> Hi John >>>> >>>> Thanks for your feedback and assistance. I think your summary is >> accurate >>>> from my perspective. Additionally, I would like to add that there is a >> risk >>>> of inconsistent final states without performing the resolution. This is >> a >>>> major concern for me as most of the data I have dealt with is produced >> by >>>> relational databases. We have seen a number of cases where a user in the >>>> Rails UI has modified the field (foreign key), realized they made a >>>> mistake, and then updated the field again with a new key. The events are >>>> propagated out as they are produced, and as such we have had real-world >>>> cases where these inconsistencies were propagated downstream as the >> final >>>> values due to the race conditions in the fanout of the data. >>>> >>>> This solution that I propose values correctness of the final result over >>>> other factors. >>>> >>>> We could always move this function over to using a KScatteredTable >>>> implementation in the future, and simply deprecate it this join API in >>>> time. I think I would like to hear more from some of the other major >>>> committers on which course of action they would think is best before any >>>> more coding is done. >>>> >>>> Thanks again >>>> >>>> Adam >>>> >>>> >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <j...@confluent.io> wrote: >>>> >>>>> Hi Jan and Adam, >>>>> >>>>> Wow, thanks for doing that test, Adam. Those results are encouraging. >>>>> >>>>> Thanks for your performance experience as well, Jan. I agree that >> avoiding >>>>> unnecessary join outputs is especially important when the fan-out is so >>>>> high. I suppose this could also be built into the implementation we're >>>>> discussing, but it wouldn't have to be specified in the KIP (since >> it's an >>>>> API-transparent optimization). >>>>> >>>>> As far as whether or not to re-repartition the data, I didn't bring it >> up >>>>> because it sounded like the two of you agreed to leave the KIP as-is, >>>>> despite the disagreement. >>>>> >>>>> If you want my opinion, I feel like both approaches are reasonable. >>>>> It sounds like Jan values more the potential for developers to optimize >>>>> their topologies to re-use the intermediate nodes, whereas Adam places >>>>> more >>>>> value on having a single operator that people can use without extra >> steps >>>>> at the end. >>>>> >>>>> Personally, although I do find it exceptionally annoying when a >> framework >>>>> gets in my way when I'm trying to optimize something, it seems better >> to >>>>> go >>>>> for a single operation. >>>>> * Encapsulating the internal transitions gives us significant latitude >> in >>>>> the implementation (for example, joining only at the end, not in the >>>>> middle >>>>> to avoid extra data copying and out-of-order resolution; how we >> represent >>>>> the first repartition keys (combined keys vs. value vectors), etc.). >> If we >>>>> publish something like a KScatteredTable with the right-partitioned >> joined >>>>> data, then the API pretty much locks in the implementation as well. >>>>> * The API seems simpler to understand and use. I do mean "seems"; if >>>>> anyone >>>>> wants to make the case that KScatteredTable is actually simpler, I >> think >>>>> hypothetical usage code would help. From a relational algebra >> perspective, >>>>> it seems like KTable.join(KTable) should produce a new KTable in all >>>>> cases. >>>>> * That said, there might still be room in the API for a different >>>>> operation >>>>> like what Jan has proposed to scatter a KTable, and then do things like >>>>> join, re-group, etc from there... I'm not sure; I haven't thought >> through >>>>> all the consequences yet. >>>>> >>>>> This is all just my opinion after thinking over the discussion so >> far... >>>>> -John >>>>> >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare < >> adam.bellem...@gmail.com> >>>>> wrote: >>>>> >>>>>> Updated the PR to take into account John's feedback. >>>>>> >>>>>> I did some preliminary testing for the performance of the prefixScan. >> I >>>>>> have attached the file, but I will also include the text in the body >>>>> here >>>>>> for archival purposes (I am not sure what happens to attached files). >> I >>>>>> also updated the PR and the KIP accordingly. >>>>>> >>>>>> Summary: It scales exceptionally well for scanning large values of >>>>>> records. As Jan mentioned previously, the real issue would be more >>>>> around >>>>>> processing the resulting records after obtaining them. For instance, >> it >>>>>> takes approximately ~80-120 mS to flush the buffer and a further >>>>> ~35-85mS >>>>>> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating >>>>>> through the records just to generate a simple count takes ~ 40 times >>>>> longer >>>>>> than the flush + scan combined. >>>>>> >>>>>> >>>>> >> ============================================================================================ >>>>>> Setup: >>>>>> >>>>>> >>>>> >> ============================================================================================ >>>>>> Java 9 with default settings aside from a 512 MB heap (Xmx512m, >> Xms512m) >>>>>> CPU: i7 2.2 Ghz. >>>>>> >>>>>> Note: I am using a slightly-modified, directly-accessible Kafka >> Streams >>>>>> RocksDB >>>>>> implementation (RocksDB.java, basically just avoiding the >>>>>> ProcessorContext). >>>>>> There are no modifications to the default RocksDB values provided in >> the >>>>>> 2.1/trunk release. >>>>>> >>>>>> >>>>>> keysize = 128 bytes >>>>>> valsize = 512 bytes >>>>>> >>>>>> Step 1: >>>>>> Write X positive matching events: (key = prefix + left-padded >>>>>> auto-incrementing integer) >>>>>> Step 2: >>>>>> Write 10X negative matching events (key = left-padded >> auto-incrementing >>>>>> integer) >>>>>> Step 3: >>>>>> Perform flush >>>>>> Step 4: >>>>>> Perform prefixScan >>>>>> Step 5: >>>>>> Iterate through return Iterator and validate the count of expected >>>>> events. >>>>>> >>>>>> >>>>>> >>>>> >> ============================================================================================ >>>>>> Results: >>>>>> >>>>>> >>>>> >> ============================================================================================ >>>>>> X = 1k (11k events total) >>>>>> Flush Time = 39 mS >>>>>> Scan Time = 7 mS >>>>>> 6.9 MB disk >>>>>> >>>>>> >>>>> >> -------------------------------------------------------------------------------------------- >>>>>> X = 10k (110k events total) >>>>>> Flush Time = 45 mS >>>>>> Scan Time = 8 mS >>>>>> 127 MB >>>>>> >>>>>> >>>>> >> -------------------------------------------------------------------------------------------- >>>>>> X = 100k (1.1M events total) >>>>>> Test1: >>>>>> Flush Time = 60 mS >>>>>> Scan Time = 12 mS >>>>>> 678 MB >>>>>> >>>>>> Test2: >>>>>> Flush Time = 45 mS >>>>>> Scan Time = 7 mS >>>>>> 576 MB >>>>>> >>>>>> >>>>> >> -------------------------------------------------------------------------------------------- >>>>>> X = 1MB (11M events total) >>>>>> Test1: >>>>>> Flush Time = 52 mS >>>>>> Scan Time = 19 mS >>>>>> 7.2 GB >>>>>> >>>>>> Test2: >>>>>> Flush Time = 84 mS >>>>>> Scan Time = 34 mS >>>>>> 9.1 GB >>>>>> >>>>>> >>>>> >> -------------------------------------------------------------------------------------------- >>>>>> X = 2.5M (27.5M events total) >>>>>> Test1: >>>>>> Flush Time = 82 mS >>>>>> Scan Time = 63 mS >>>>>> 17GB - 276 sst files >>>>>> >>>>>> Test2: >>>>>> Flush Time = 116 mS >>>>>> Scan Time = 35 mS >>>>>> 23GB - 361 sst files >>>>>> >>>>>> Test3: >>>>>> Flush Time = 103 mS >>>>>> Scan Time = 82 mS >>>>>> 19 GB - 300 sst files >>>>>> >>>>>> >>>>> >> -------------------------------------------------------------------------------------------- >>>>>> >>>>>> I had to limit my testing on my laptop to X = 2.5M events. I tried to >> go >>>>>> to X = 10M (110M events) but RocksDB was going into the 100GB+ range >>>>> and my >>>>>> laptop ran out of disk. More extensive testing could be done but I >>>>> suspect >>>>>> that it would be in line with what we're seeing in the results above. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> At this point in time, I think the only major discussion point is >> really >>>>>> around what Jan and I have disagreed on: repartitioning back + >> resolving >>>>>> potential out of order issues or leaving that up to the client to >>>>> handle. >>>>>> >>>>>> >>>>>> Thanks folks, >>>>>> >>>>>> Adam >>>>>> >>>>>> >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com >>> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On 29.11.2018 15:14, John Roesler wrote: >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Sorry that this discussion petered out... I think the 2.1 release >>>>>>> caused an >>>>>>>> extended distraction that pushed it off everyone's radar (which was >>>>>>>> precisely Adam's concern). Personally, I've also had some extend >>>>>>>> distractions of my own that kept (and continue to keep) me >>>>> preoccupied. >>>>>>>> >>>>>>>> However, calling for a vote did wake me up, so I guess Jan was on >> the >>>>>>> right >>>>>>>> track! >>>>>>>> >>>>>>>> I've gone back and reviewed the whole KIP document and the prior >>>>>>>> discussion, and I'd like to offer a few thoughts: >>>>>>>> >>>>>>>> API Thoughts: >>>>>>>> >>>>>>>> 1. If I read the KIP right, you are proposing a many-to-one join. >>>>> Could >>>>>>> we >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip the design >>>>>>> around >>>>>>>> and make it a oneToManyJoin? >>>>>>>> >>>>>>>> The proposed name "joinOnForeignKey" disguises the join type, and it >>>>>>> seems >>>>>>>> like it might trick some people into using it for a one-to-one join. >>>>>>> This >>>>>>>> would work, of course, but it would be super inefficient compared to >>>>> a >>>>>>>> simple rekey-and-join. >>>>>>>> >>>>>>>> 2. I might have missed it, but I don't think it's specified whether >>>>>>> it's an >>>>>>>> inner, outer, or left join. I'm guessing an outer join, as >>>>> (neglecting >>>>>>> IQ), >>>>>>>> the rest can be achieved by filtering or by handling it in the >>>>>>> ValueJoiner. >>>>>>>> >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite right. >>>>>>>> 3a. Regarding Serialized: There are a few different paradigms in >>>>> play in >>>>>>>> the Streams API, so it's confusing, but instead of three Serialized >>>>>>> args, I >>>>>>>> think it would be better to have one that allows (optionally) >> setting >>>>>>> the 4 >>>>>>>> incoming serdes. The result serde is defined by the Materialized. >> The >>>>>>>> incoming serdes can be optional because they might already be >>>>> available >>>>>>> on >>>>>>>> the source KTables, or the default serdes from the config might be >>>>>>>> applicable. >>>>>>>> >>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins don't allow >>>>>>> setting >>>>>>>> one, and it seems like it might actually be harmful, since the rekey >>>>>>>> operation needs to produce results that are co-partitioned with the >>>>>>> "other" >>>>>>>> KTable. >>>>>>>> >>>>>>>> 4. I'm fine with the "reserved word" header, but I didn't actually >>>>>>> follow >>>>>>>> what Matthias meant about namespacing requiring "deserializing" the >>>>>>> record >>>>>>>> header. The headers are already Strings, so I don't think that >>>>>>>> deserialization is required. If we applied the namespace at source >>>>> nodes >>>>>>>> and stripped it at sink nodes, this would be practically no >> overhead. >>>>>>> The >>>>>>>> advantage of the namespace idea is that no public API change wrt >>>>> headers >>>>>>>> needs to happen, and no restrictions need to be placed on users' >>>>>>> headers. >>>>>>>> >>>>>>>> (Although I'm wondering if we can get away without the header at >>>>> all... >>>>>>>> stay tuned) >>>>>>>> >>>>>>>> 5. I also didn't follow the discussion about the HWM table growing >>>>>>> without >>>>>>>> bound. As I read it, the HWM table is effectively implementing OCC >> to >>>>>>>> resolve the problem you noted with disordering when the rekey is >>>>>>>> reversed... particularly notable when the FK changes. As such, it >>>>> only >>>>>>>> needs to track the most recent "version" (the offset in the source >>>>>>>> partition) of each key. Therefore, it should have the same number of >>>>>>> keys >>>>>>>> as the source table at all times. >>>>>>>> >>>>>>>> I see that you are aware of KIP-258, which I think might be relevant >>>>> in >>>>>>> a >>>>>>>> couple of ways. One: it's just about storing the timestamp in the >>>>> state >>>>>>>> store, but the ultimate idea is to effectively use the timestamp as >>>>> an >>>>>>> OCC >>>>>>>> "version" to drop disordered updates. You wouldn't want to use the >>>>>>>> timestamp for this operation, but if you were to use a similar >>>>>>> mechanism to >>>>>>>> store the source offset in the store alongside the re-keyed values, >>>>> then >>>>>>>> you could avoid a separate table. >>>>>>>> >>>>>>>> 6. You and Jan have been thinking about this for a long time, so >> I've >>>>>>>> probably missed something here, but I'm wondering if we can avoid >> the >>>>>>> HWM >>>>>>>> tracking at all and resolve out-of-order during a final join >>>>> instead... >>>>>>>> >>>>>>>> Let's say we're joining a left table (Integer K: Letter FK, (other >>>>>>> data)) >>>>>>>> to a right table (Letter K: (some data)). >>>>>>>> >>>>>>>> Left table: >>>>>>>> 1: (A, xyz) >>>>>>>> 2: (B, asd) >>>>>>>> >>>>>>>> Right table: >>>>>>>> A: EntityA >>>>>>>> B: EntityB >>>>>>>> >>>>>>>> We could do a rekey as you proposed with a combined key, but not >>>>>>>> propagating the value at all.. >>>>>>>> Rekey table: >>>>>>>> A-1: (dummy value) >>>>>>>> B-2: (dummy value) >>>>>>>> >>>>>>>> Which we then join with the right table to produce: >>>>>>>> A-1: EntityA >>>>>>>> B-2: EntityB >>>>>>>> >>>>>>>> Which gets rekeyed back: >>>>>>>> 1: A, EntityA >>>>>>>> 2: B, EntityB >>>>>>>> >>>>>>>> And finally we do the actual join: >>>>>>>> Result table: >>>>>>>> 1: ((A, xyz), EntityA) >>>>>>>> 2: ((B, asd), EntityB) >>>>>>>> >>>>>>>> The thing is that in that last join, we have the opportunity to >>>>> compare >>>>>>> the >>>>>>>> current FK in the left table with the incoming PK of the right >>>>> table. If >>>>>>>> they don't match, we just drop the event, since it must be outdated. >>>>>>>> >>>>>>> >>>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets updated >> to >>>>>>> (1: >>>>>>>> B, xyz), ultimately yielding a conundrum about whether the final >>>>> state >>>>>>>> should be (1: null) or (1: joined-on-B). With the algorithm above, >>>>> you >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B, >>>>>>>> EntityB)). It seems like this does give you enough information to >>>>> make >>>>>>> the >>>>>>>> right choice, regardless of disordering. >>>>>>> >>>>>>> Will check Adams patch, but this should work. As mentioned often I am >>>>>>> not convinced on partitioning back for the user automatically. I >> think >>>>>>> this is the real performance eater ;) >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 7. Last thought... I'm a little concerned about the performance of >>>>> the >>>>>>>> range scans when records change in the right table. You've said that >>>>>>> you've >>>>>>>> been using the algorithm you presented in production for a while. >> Can >>>>>>> you >>>>>>>> give us a sense of the performance characteristics you've observed? >>>>>>>> >>>>>>> >>>>>>> Make it work, make it fast, make it beautiful. The topmost thing here >>>>> is >>>>>>> / was correctness. In practice I do not measure the performance of >> the >>>>>>> range scan. Usual cases I run this with is emitting 500k - 1kk rows >>>>>>> on a left hand side change. The range scan is just the work you gotta >>>>>>> do, also when you pack your data into different formats, usually the >>>>>>> rocks performance is very tight to the size of the data and we can't >>>>>>> really change that. It is more important for users to prevent useless >>>>>>> updates to begin with. My left hand side is guarded to drop changes >>>>> that >>>>>>> are not going to change my join output. >>>>>>> >>>>>>> usually it's: >>>>>>> >>>>>>> drop unused fields and then don't forward if old.equals(new) >>>>>>> >>>>>>> regarding to the performance of creating an iterator for smaller >>>>>>> fanouts, users can still just do a group by first then anyways. >>>>>>> >>>>>>> >>>>>>> >>>>>>>> I could only think of one alternative, but I'm not sure if it's >>>>> better >>>>>>> or >>>>>>>> worse... If the first re-key only needs to preserve the original >> key, >>>>>>> as I >>>>>>>> proposed in #6, then we could store a vector of keys in the value: >>>>>>>> >>>>>>>> Left table: >>>>>>>> 1: A,... >>>>>>>> 2: B,... >>>>>>>> 3: A,... >>>>>>>> >>>>>>>> Gets re-keyed: >>>>>>>> A: [1, 3] >>>>>>>> B: [2] >>>>>>>> >>>>>>>> Then, the rhs part of the join would only need a regular single-key >>>>>>> lookup. >>>>>>>> Of course we have to deal with the problem of large values, as >>>>> there's >>>>>>> no >>>>>>>> bound on the number of lhs records that can reference rhs records. >>>>>>> Offhand, >>>>>>>> I'd say we could page the values, so when one row is past the >>>>>>> threshold, we >>>>>>>> append the key for the next page. Then in most cases, it would be a >>>>>>> single >>>>>>>> key lookup, but for large fan-out updates, it would be one per (max >>>>>>> value >>>>>>>> size)/(avg lhs key size). >>>>>>>> >>>>>>>> This seems more complex, though... Plus, I think there's some extra >>>>>>>> tracking we'd need to do to know when to emit a retraction. For >>>>> example, >>>>>>>> when record 1 is deleted, the re-key table would just have (A: [3]). >>>>>>> Some >>>>>>>> kind of tombstone is needed so that the join result for 1 can also >> be >>>>>>>> retracted. >>>>>>>> >>>>>>>> That's all! >>>>>>>> >>>>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry >> the >>>>>>>> discussion has been slow. >>>>>>>> -John >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak < >>>>> jan.filip...@trivago.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Id say you can just call the vote. >>>>>>>>> >>>>>>>>> that happens all the time, and if something comes up, it just goes >>>>> back >>>>>>>>> to discuss. >>>>>>>>> >>>>>>>>> would not expect to much attention with another another email in >>>>> this >>>>>>>>> thread. >>>>>>>>> >>>>>>>>> best Jan >>>>>>>>> >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote: >>>>>>>>>> Hello Contributors >>>>>>>>>> >>>>>>>>>> I know that 2.1 is about to be released, but I do need to bump >>>>> this to >>>>>>>>> keep >>>>>>>>>> visibility up. I am still intending to push this through once >>>>>>> contributor >>>>>>>>>> feedback is given. >>>>>>>>>> >>>>>>>>>> Main points that need addressing: >>>>>>>>>> 1) Any way (or benefit) in structuring the current singular graph >>>>> node >>>>>>>>> into >>>>>>>>>> multiple nodes? It has a whopping 25 parameters right now. I am a >>>>> bit >>>>>>>>> fuzzy >>>>>>>>>> on how the optimizations are supposed to work, so I would >>>>> appreciate >>>>>>> any >>>>>>>>>> help on this aspect. >>>>>>>>>> >>>>>>>>>> 2) Overall strategy for joining + resolving. This thread has much >>>>>>>>> discourse >>>>>>>>>> between Jan and I between the current highwater mark proposal and >> a >>>>>>>>> groupBy >>>>>>>>>> + reduce proposal. I am of the opinion that we need to strictly >>>>> handle >>>>>>>>> any >>>>>>>>>> chance of out-of-order data and leave none of it up to the >>>>> consumer. >>>>>>> Any >>>>>>>>>> comments or suggestions here would also help. >>>>>>>>>> >>>>>>>>>> 3) Anything else that you see that would prevent this from moving >>>>> to a >>>>>>>>> vote? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> Adam >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare < >>>>>>>>> adam.bellem...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jan >>>>>>>>>>> >>>>>>>>>>> With the Stores.windowStoreBuilder and >>>>> Stores.persistentWindowStore, >>>>>>> you >>>>>>>>>>> actually only need to specify the amount of segments you want and >>>>> how >>>>>>>>> large >>>>>>>>>>> they are. To the best of my understanding, what happens is that >>>>> the >>>>>>>>>>> segments are automatically rolled over as new data with new >>>>>>> timestamps >>>>>>>>> are >>>>>>>>>>> created. We use this exact functionality in some of the work done >>>>>>>>>>> internally at my company. For reference, this is the hopping >>>>> windowed >>>>>>>>> store. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21 >>>>>>>>>>> >>>>>>>>>>> In the code that I have provided, there are going to be two 24h >>>>>>>>> segments. >>>>>>>>>>> When a record is put into the windowStore, it will be inserted at >>>>>>> time >>>>>>>>> T in >>>>>>>>>>> both segments. The two segments will always overlap by 12h. As >>>>> time >>>>>>>>> goes on >>>>>>>>>>> and new records are added (say at time T+12h+), the oldest >> segment >>>>>>> will >>>>>>>>> be >>>>>>>>>>> automatically deleted and a new segment created. The records are >>>>> by >>>>>>>>> default >>>>>>>>>>> inserted with the context.timestamp(), such that it is the record >>>>>>> time, >>>>>>>>> not >>>>>>>>>>> the clock time, which is used. >>>>>>>>>>> >>>>>>>>>>> To the best of my understanding, the timestamps are retained when >>>>>>>>>>> restoring from the changelog. >>>>>>>>>>> >>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a >>>>>>> segment-level, >>>>>>>>>>> instead of at an individual record level. >>>>>>>>>>> >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak < >>>>>>> jan.filip...@trivago.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Will that work? I expected it to blow up with ClassCastException >>>>> or >>>>>>>>>>>> similar. >>>>>>>>>>>> >>>>>>>>>>>> You either would have to specify the window you fetch/put or >>>>> iterate >>>>>>>>>>>> across all windows the key was found in right? >>>>>>>>>>>> >>>>>>>>>>>> I just hope the window-store doesn't check stream-time under the >>>>>>> hoods >>>>>>>>>>>> that would be a questionable interface. >>>>>>>>>>>> >>>>>>>>>>>> If it does: did you see my comment on checking all the windows >>>>>>> earlier? >>>>>>>>>>>> that would be needed to actually give reasonable time gurantees. >>>>>>>>>>>> >>>>>>>>>>>> Best >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote: >>>>>>>>>>>>> Hi Jan >>>>>>>>>>>>> >>>>>>>>>>>>> Check for " highwaterMat " in the PR. I only changed the state >>>>>>> store, >>>>>>>>>>>> not >>>>>>>>>>>>> the ProcessorSupplier. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Adam >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak < >>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the information. This is indeed something that >>>>> will be >>>>>>>>>>>>>>> extremely >>>>>>>>>>>>>>> useful for this KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Jan >>>>>>>>>>>>>>> Thanks for your explanations. That being said, I will not be >>>>>>> moving >>>>>>>>>>>> ahead >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy solution as >> you >>>>>>>>>>>> propose. >>>>>>>>>>>>>>> That being said, if you wish to implement it yourself off of >>>>> my >>>>>>>>>>>> current PR >>>>>>>>>>>>>>> and submit it as a competitive alternative, I would be more >>>>> than >>>>>>>>>>>> happy to >>>>>>>>>>>>>>> help vet that as an alternate solution. As it stands right >>>>> now, >>>>>>> I do >>>>>>>>>>>> not >>>>>>>>>>>>>>> really have more time to invest into alternatives without >>>>> there >>>>>>>>> being >>>>>>>>>>>> a >>>>>>>>>>>>>>> strong indication from the binding voters which they would >>>>>>> prefer. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Hey, total no worries. I think I personally gave up on the >>>>> streams >>>>>>>>> DSL >>>>>>>>>>>> for >>>>>>>>>>>>>> some time already, otherwise I would have pulled this KIP >>>>> through >>>>>>>>>>>> already. >>>>>>>>>>>>>> I am currently reimplementing my own DSL based on PAPI. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will look at finishing up my PR with the windowed state >>>>> store >>>>>>> in >>>>>>>>> the >>>>>>>>>>>>>>> next >>>>>>>>>>>>>>> week or so, exercising it via tests, and then I will come >> back >>>>>>> for >>>>>>>>>>>> final >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of the binding >>>>>>> voters >>>>>>>>>>>> could >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it >>>>> according >>>>>>> to >>>>>>>>> the >>>>>>>>>>>>>>> latest plan: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+ >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed store. This >>>>>>> could >>>>>>>>> be >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are >>>>> completed. >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated >>>>> in >>>>>>> the >>>>>>>>>>>> PR? >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing something? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang < >>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong >> link, >>>>>>> as it >>>>>>>>>>>> is >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258 >>>>> we do >>>>>>>>>>>> want to >>>>>>>>>>>>>>>> have "handling out-of-order data for source KTable" such >> that >>>>>>>>>>>> instead of >>>>>>>>>>>>>>>> blindly apply the updates to the materialized store, i.e. >>>>>>> following >>>>>>>>>>>>>>>> offset >>>>>>>>>>>>>>>> ordering, we will reject updates that are older than the >>>>> current >>>>>>>>>>>> key's >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang < >>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Adam, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e. >>>>> the >>>>>>>>> high >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced with a window >>>>>>> store), >>>>>>>>> I >>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>> another current on-going KIP may actually help: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value store >>>>> (i.e. >>>>>>> only >>>>>>>>>>>> for >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as >>>>> described >>>>>>> in >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that >>>>> we >>>>>>> can >>>>>>>>>>>> then >>>>>>>>>>>>>>>>> "reject" updates from the source topics if its timestamp is >>>>>>>>> smaller >>>>>>>>>>>> than >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I think it is >>>>> very >>>>>>>>>>>> similar to >>>>>>>>>>>>>>>>> what you have in mind for high watermark based filtering, >>>>> while >>>>>>>>> you >>>>>>>>>>>> only >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the joining >> records >>>>>>> are >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> correctly >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> inherited though the whole topology to the final stage. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence >>>>>>> non-windowed >>>>>>>>>>>> KTables >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not really have a good >>>>>>>>> support >>>>>>>>>>>> for >>>>>>>>>>>>>>>>> their joins anyways ( >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107) >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>> think we can just consider non-windowed KTable-KTable >>>>> non-key >>>>>>>>> joins >>>>>>>>>>>> for >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak < >>>>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Guozhang >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Current highwater mark implementation would grow >> endlessly >>>>>>> based >>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of (<this >>>>> table >>>>>>>>>>>> primary >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> key>, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used to >>>>>>> differentiate >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal would be >>>>> to >>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This would allow >>>>> the >>>>>>>>> same >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This should >>>>> allow >>>>>>> for >>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and should be >>>>>>> customizable >>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10 >>>>>>> minutes >>>>>>>>> of >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> window, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> or perhaps 7 days...). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the trick >> here. >>>>>>> Even >>>>>>>>>>>> if I >>>>>>>>>>>>>>>>>> would still like to see the automatic repartitioning >>>>> optional >>>>>>>>>>>> since I >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a little bit >>>>>>>>>>>> sceptical >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> how to determine the window. So esentially one could run >>>>> into >>>>>>>>>>>> problems >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the rapid change happens near a window border. I will check >>>>> you >>>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we could >>>>> still >>>>>>>>> check >>>>>>>>>>>>>>>>>> _all_ >>>>>>>>>>>>>>>>>> windows on read with not to bad performance impact I >> guess. >>>>>>> Will >>>>>>>>>>>> let >>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>> know if the implementation would be correct as is. I >>>>> wouldn't >>>>>>> not >>>>>>>>>>>> like >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A) < >>>>>>>>> timestamp(B). >>>>>>>>>>>> I >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> we can't expect that. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Jan >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks for the >>>>>>>>>>>> diagram, it >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do not have the >>>>>>> original >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> primary >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> key available, and I can see that if it was available then >>>>> you >>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>> able to add and remove events from the Map. That being >>>>> said, >>>>>>> I >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> encourage >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for clarity for >>>>>>> everyone >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> else. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But >>>>> I >>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the original >> primary >>>>>>> key, >>>>>>>>> We >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and >> basically >>>>>>> not >>>>>>>>>>>> using >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that in >>>>> original >>>>>>> DSL >>>>>>>>>>>> its >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on my end. >>>>> Will >>>>>>>>>>>> finish >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map stay >> inside >>>>>>> the >>>>>>>>>>>> State >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have >>>>> propagated? >>>>>>>>> Isn't >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state store? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna >>>>>>> return >>>>>>>>>>>> `null` >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But there is going to >>>>> be >>>>>>> a >>>>>>>>>>>> store >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store directly >>>>> for >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a regular >>>>> store, >>>>>>>>>>>> satisfying >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. The >>>>> Windowed >>>>>>>>>>>> store is >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> keeping the values, so for the next statefull operation we >>>>>>> would >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the window >>>>>>> store >>>>>>>>>>>> also >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the values then. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group by >>>>> before >>>>>>>>>>>>>>>>>> repartitioning to the original primary key i think it >> would >>>>>>> help >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> users >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> big time in building efficient apps. Given the original >>>>> primary >>>>>>>>> key >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> issue I >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> understand that we do not have a solid foundation to build >>>>> on. >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very >>>>>>> unfortunate. I >>>>>>>>>>>> could >>>>>>>>>>>>>>>>>> understand the decision goes like that. I do not think its >>>>> a >>>>>>> good >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> decision. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> Adam >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre < >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto: >>>>>>> dumbreprajakta...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> please remove me from this group >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > Hi Adam, >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > give me some time, will make such a chart. >> last >>>>>>> time i >>>>>>>>>>>> didn't >>>>>>>>>>>>>>>>>>> get along >>>>>>>>>>>>>>>>>>> > well with giphy and ruined all your charts. >>>>>>>>>>>>>>>>>>> > Hopefully i can get it done today >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > On 08.09.2018 16:00, Adam Bellemare wrote: >>>>>>>>>>>>>>>>>>> > > Hi Jan >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > I have included a diagram of what I >> attempted >>>>> on >>>>>>> the >>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su >>>>>>>>>>>>>>>>>>> >>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S >>>>>>>>>>>>>>>>>>> >>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate> >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > I attempted this back at the start of my own >>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> > > solution, and since I could not get it to >>>>> work I >>>>>>> have >>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>> discarded the >>>>>>>>>>>>>>>>>>> > > code. At this point in time, if you wish to >>>>>>> continue >>>>>>>>>>>> pursuing >>>>>>>>>>>>>>>>>>> for your >>>>>>>>>>>>>>>>>>> > > groupBy solution, I ask that you please >>>>> create a >>>>>>>>>>>> diagram on >>>>>>>>>>>>>>>>>>> the KIP >>>>>>>>>>>>>>>>>>> > > carefully explaining your solution. Please >>>>> feel >>>>>>> free >>>>>>>>> to >>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>> the image I >>>>>>>>>>>>>>>>>>> > > just posted as a starting point. I am having >>>>>>> trouble >>>>>>>>>>>>>>>>>>> understanding your >>>>>>>>>>>>>>>>>>> > > explanations but I think that a carefully >>>>>>> constructed >>>>>>>>>>>> diagram >>>>>>>>>>>>>>>>>>> will clear >>>>>>>>>>>>>>>>>>> > up >>>>>>>>>>>>>>>>>>> > > any misunderstandings. Alternately, please >>>>> post a >>>>>>>>>>>>>>>>>>> comprehensive PR with >>>>>>>>>>>>>>>>>>> > > your solution. I can only guess at what you >>>>>>> mean, and >>>>>>>>>>>> since I >>>>>>>>>>>>>>>>>>> value my >>>>>>>>>>>>>>>>>>> > own >>>>>>>>>>>>>>>>>>> > > time as much as you value yours, I believe >> it >>>>> is >>>>>>> your >>>>>>>>>>>>>>>>>>> responsibility to >>>>>>>>>>>>>>>>>>> > > provide an implementation instead of me >>>>> trying to >>>>>>>>> guess. >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > Adam >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > >> Hi James, >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> nice to see you beeing interested. kafka >>>>>>> streams at >>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> point supports >>>>>>>>>>>>>>>>>>> > >> all sorts of joins as long as both streams >>>>> have >>>>>>> the >>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>> key. >>>>>>>>>>>>>>>>>>> > >> Adam is currently implementing a join >> where a >>>>>>> KTable >>>>>>>>>>>> and a >>>>>>>>>>>>>>>>>>> KTable can >>>>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>>>> > >> a one to many relation ship (1:n). We >> exploit >>>>>>> that >>>>>>>>>>>> rocksdb >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >> datastore that keeps data sorted (At least >>>>>>> exposes an >>>>>>>>>>>> API to >>>>>>>>>>>>>>>>>>> access the >>>>>>>>>>>>>>>>>>> > >> stored data in a sorted fashion). >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> I think the technical caveats are well >>>>>>> understood >>>>>>>>> now >>>>>>>>>>>> and we >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > basically >>>>>>>>>>>>>>>>>>> > >> down to philosophy and API Design ( when >> Adam >>>>>>> sees >>>>>>>>> my >>>>>>>>>>>> newest >>>>>>>>>>>>>>>>>>> message). >>>>>>>>>>>>>>>>>>> > >> I have a lengthy track record of loosing >>>>> those >>>>>>> kinda >>>>>>>>>>>>>>>>>>> arguments within >>>>>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>>>>> > >> streams community and I have no clue why. >> So >>>>> I >>>>>>>>>>>> literally >>>>>>>>>>>>>>>>>>> can't wait for >>>>>>>>>>>>>>>>>>> > you >>>>>>>>>>>>>>>>>>> > >> to churn through this thread and give you >>>>>>> opinion on >>>>>>>>>>>> how we >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>> > design >>>>>>>>>>>>>>>>>>> > >> the return type of the oneToManyJoin and >> how >>>>>>> many >>>>>>>>>>>> power we >>>>>>>>>>>>>>>>>>> want to give >>>>>>>>>>>>>>>>>>> > to >>>>>>>>>>>>>>>>>>> > >> the user vs "simplicity" (where simplicity >>>>> isn't >>>>>>>>>>>> really that >>>>>>>>>>>>>>>>>>> as users >>>>>>>>>>>>>>>>>>> > still >>>>>>>>>>>>>>>>>>> > >> need to understand it I argue) >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> waiting for you to join in on the >> discussion >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> Best Jan >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> On 07.09.2018 15:49, James Kwan wrote: >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >>> I am new to this group and I found this >>>>> subject >>>>>>>>>>>>>>>>>>> interesting. Sounds >>>>>>>>>>>>>>>>>>> > like >>>>>>>>>>>>>>>>>>> > >>> you guys want to implement a join table of >>>>> two >>>>>>>>>>>> streams? Is >>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>> > somewhere >>>>>>>>>>>>>>>>>>> > >>> I can see the original requirement or >>>>> proposal? >>>>>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>>>>> > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak >>>>>>>>>>>>>>>>>>> <jan.filip...@trivago.com <mailto: >>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> On 05.09.2018 22:17, Adam Bellemare >> wrote: >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>>> I'm currently testing using a Windowed >>>>> Store >>>>>>> to >>>>>>>>>>>> store the >>>>>>>>>>>>>>>>>>> highwater >>>>>>>>>>>>>>>>>>> > >>>>> mark. >>>>>>>>>>>>>>>>>>> > >>>>> By all indications this should work >> fine, >>>>>>> with >>>>>>>>> the >>>>>>>>>>>> caveat >>>>>>>>>>>>>>>>>>> being that >>>>>>>>>>>>>>>>>>> > it >>>>>>>>>>>>>>>>>>> > >>>>> can >>>>>>>>>>>>>>>>>>> > >>>>> only resolve out-of-order arrival for up >>>>> to >>>>>>> the >>>>>>>>>>>> size of >>>>>>>>>>>>>>>>>>> the window >>>>>>>>>>>>>>>>>>> > (ie: >>>>>>>>>>>>>>>>>>> > >>>>> 24h, 72h, etc). This would remove the >>>>>>> possibility >>>>>>>>>>>> of it >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> being >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > unbounded >>>>>>>>>>>>>>>>>>> > >>>>> in >>>>>>>>>>>>>>>>>>> > >>>>> size. >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> With regards to Jan's suggestion, I >>>>> believe >>>>>>> this >>>>>>>>> is >>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>> we will >>>>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>>>> > >>>>> to >>>>>>>>>>>>>>>>>>> > >>>>> remain in disagreement. While I do not >>>>>>> disagree >>>>>>>>>>>> with your >>>>>>>>>>>>>>>>>>> statement >>>>>>>>>>>>>>>>>>> > >>>>> about >>>>>>>>>>>>>>>>>>> > >>>>> there likely to be additional joins done >>>>> in a >>>>>>>>>>>> real-world >>>>>>>>>>>>>>>>>>> workflow, I >>>>>>>>>>>>>>>>>>> > do >>>>>>>>>>>>>>>>>>> > >>>>> not >>>>>>>>>>>>>>>>>>> > >>>>> see how you can conclusively deal with >>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>> arrival >>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> > >>>>> foreign-key >>>>>>>>>>>>>>>>>>> > >>>>> changes and subsequent joins. I have >>>>>>> attempted >>>>>>>>> what >>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>> think you have >>>>>>>>>>>>>>>>>>> > >>>>> proposed (without a high-water, using >>>>>>> groupBy and >>>>>>>>>>>> reduce) >>>>>>>>>>>>>>>>>>> and found >>>>>>>>>>>>>>>>>>> > >>>>> that if >>>>>>>>>>>>>>>>>>> > >>>>> the foreign key changes too quickly, or >>>>> the >>>>>>> load >>>>>>>>> on >>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> stream thread >>>>>>>>>>>>>>>>>>> > is >>>>>>>>>>>>>>>>>>> > >>>>> too >>>>>>>>>>>>>>>>>>> > >>>>> high, the joined messages will arrive >>>>>>>>> out-of-order >>>>>>>>>>>> and be >>>>>>>>>>>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>>>> > >>>>> propagated, such that an intermediate >>>>> event >>>>>>> is >>>>>>>>>>>>>>>>>>> represented >>>>>>>>>>>>>>>>>>> as the >>>>>>>>>>>>>>>>>>> > final >>>>>>>>>>>>>>>>>>> > >>>>> event. >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>> Can you shed some light on your groupBy >>>>>>>>>>>> implementation. >>>>>>>>>>>>>>>>>>> There must be >>>>>>>>>>>>>>>>>>> > >>>> some sort of flaw in it. >>>>>>>>>>>>>>>>>>> > >>>> I have a suspicion where it is, I would >>>>> just >>>>>>> like >>>>>>>>> to >>>>>>>>>>>>>>>>>>> confirm. The idea >>>>>>>>>>>>>>>>>>> > >>>> is bullet proof and it must be >>>>>>>>>>>>>>>>>>> > >>>> an implementation mess up. I would like >> to >>>>>>> clarify >>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>> we draw a >>>>>>>>>>>>>>>>>>> > >>>> conclusion. >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> Repartitioning the scattered events >>>>> back to >>>>>>>>> their >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> original >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>> partitions is the only way I know how to >>>>>>>>> conclusively >>>>>>>>>>>> deal >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> > >>>>> out-of-order events in a given time >> frame, >>>>>>> and to >>>>>>>>>>>> ensure >>>>>>>>>>>>>>>>>>> that the >>>>>>>>>>>>>>>>>>> > data >>>>>>>>>>>>>>>>>>> > >>>>> is >>>>>>>>>>>>>>>>>>> > >>>>> eventually consistent with the input >>>>> events. >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> If you have some code to share that >>>>>>> illustrates >>>>>>>>> your >>>>>>>>>>>>>>>>>>> approach, I >>>>>>>>>>>>>>>>>>> > would >>>>>>>>>>>>>>>>>>> > >>>>> be >>>>>>>>>>>>>>>>>>> > >>>>> very grateful as it would remove any >>>>>>>>>>>> misunderstandings >>>>>>>>>>>>>>>>>>> that I may >>>>>>>>>>>>>>>>>>> > have. >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>> ah okay you were looking for my code. I >>>>> don't >>>>>>> have >>>>>>>>>>>>>>>>>>> something easily >>>>>>>>>>>>>>>>>>> > >>>> readable here as its bloated with >>>>> OO-patterns. >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> its anyhow trivial: >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> @Override >>>>>>>>>>>>>>>>>>> > >>>> public T apply(K aggKey, V value, T >>>>>>>>> aggregate) >>>>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>>>> > >>>> Map<U, V> currentStateAsMap = >>>>>>>>>>>> asMap(aggregate); >>>>>>>>>>>>>>>>>>> << >>>>>>>>>>>>>>>>>>> imaginary >>>>>>>>>>>>>>>>>>> > >>>> U toModifyKey = >>>>> mapper.apply(value); >>>>>>>>>>>>>>>>>>> > >>>> << this is the place where >>>>> people >>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>> gonna have >>>>>>>>>>>>>>>>>>> > issues >>>>>>>>>>>>>>>>>>> > >>>> and why you probably couldn't do it. we >>>>> would >>>>>>> need >>>>>>>>>>>> to find >>>>>>>>>>>>>>>>>>> a solution >>>>>>>>>>>>>>>>>>> > here. >>>>>>>>>>>>>>>>>>> > >>>> I didn't realize that yet. >>>>>>>>>>>>>>>>>>> > >>>> << we propagate the field in >>>>> the >>>>>>>>>>>> joiner, so >>>>>>>>>>>>>>>>>>> that we can >>>>>>>>>>>>>>>>>>> > pick >>>>>>>>>>>>>>>>>>> > >>>> it up in an aggregate. Probably you have >>>>> not >>>>>>>>> thought >>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> this in your >>>>>>>>>>>>>>>>>>> > >>>> approach right? >>>>>>>>>>>>>>>>>>> > >>>> << I am very open to find a >>>>>>> generic >>>>>>>>>>>> solution >>>>>>>>>>>>>>>>>>> here. In my >>>>>>>>>>>>>>>>>>> > >>>> honest opinion this is broken in >>>>>>>>> KTableImpl.GroupBy >>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> looses >>>>>>>>>>>>>>>>>>> > the keys >>>>>>>>>>>>>>>>>>> > >>>> and only maintains the aggregate key. >>>>>>>>>>>>>>>>>>> > >>>> << I abstracted it away back >>>>>>> then way >>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>> i >>>>>>>>>>>>>>>>>>> was >>>>>>>>>>>>>>>>>>> > thinking >>>>>>>>>>>>>>>>>>> > >>>> of oneToMany join. That is why I didn't >>>>>>> realize >>>>>>>>> its >>>>>>>>>>>>>>>>>>> significance here. >>>>>>>>>>>>>>>>>>> > >>>> << Opinions? >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> for (V m : current) >>>>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.put(mapper.apply(m), >> m); >>>>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>>>> > >>>> if (isAdder) >>>>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.put(toModifyKey, >> value); >>>>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>>>> > >>>> else >>>>>>>>>>>>>>>>>>> > >>>> { >>>>>>>>>>>>>>>>>>> > >>>> currentStateAsMap.remove(toModifyKey); >>>>>>>>>>>>>>>>>>> > >>>> if(currentStateAsMap.isEmpty()){ >>>>>>>>>>>>>>>>>>> > >>>> return null; >>>>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>>>> > >>>> retrun >>>>>>> asAggregateType(currentStateAsMap) >>>>>>>>>>>>>>>>>>> > >>>> } >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>>>>> > >>>> Thanks, >>>>>>>>>>>>>>>>>>> > >>>>> Adam >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan >>>>> Filipiak >>>>>>> < >>>>>>>>>>>>>>>>>>> > jan.filip...@trivago.com <mailto: >>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>> wrote: >>>>>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>>>>> > >>>>> Thanks Adam for bringing Matthias to >>>>> speed! >>>>>>>>>>>>>>>>>>> > >>>>>> about the differences. I think >> re-keying >>>>>>> back >>>>>>>>>>>> should be >>>>>>>>>>>>>>>>>>> optional at >>>>>>>>>>>>>>>>>>> > >>>>>> best. >>>>>>>>>>>>>>>>>>> > >>>>>> I would say we return a KScatteredTable >>>>> with >>>>>>>>>>>> reshuffle() >>>>>>>>>>>>>>>>>>> returning >>>>>>>>>>>>>>>>>>> > >>>>>> KTable<originalKey,Joined> to make the >>>>>>> backwards >>>>>>>>>>>>>>>>>>> repartitioning >>>>>>>>>>>>>>>>>>> > >>>>>> optional. >>>>>>>>>>>>>>>>>>> > >>>>>> I am also in a big favour of doing the >>>>> out >>>>>>> of >>>>>>>>> order >>>>>>>>>>>>>>>>>>> processing using >>>>>>>>>>>>>>>>>>> > >>>>>> group >>>>>>>>>>>>>>>>>>> > >>>>>> by instead high water mark tracking. >>>>>>>>>>>>>>>>>>> > >>>>>> Just because unbounded growth is just >>>>> scary >>>>>>> + It >>>>>>>>>>>> saves >>>>>>>>>>>>>>>>>>> us >>>>>>>>>>>>>>>>>>> the header >>>>>>>>>>>>>>>>>>> > >>>>>> stuff. >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> I think the abstraction of always >>>>>>> repartitioning >>>>>>>>>>>> back is >>>>>>>>>>>>>>>>>>> just not so >>>>>>>>>>>>>>>>>>> > >>>>>> strong. Like the work has been done >>>>> before >>>>>>> we >>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>> back and >>>>>>>>>>>>>>>>>>> > >>>>>> grouping >>>>>>>>>>>>>>>>>>> > >>>>>> by something else afterwards is really >>>>>>> common. >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> On 05.09.2018 13:49, Adam Bellemare >>>>> wrote: >>>>>>>>>>>>>>>>>>> > >>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>> Hi Matthias >>>>>>>>>>>>>>>>>>> > >>>>>>> Thank you for your feedback, I do >>>>>>> appreciate >>>>>>>>> it! >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> While name spacing would be possible, >> it >>>>>>> would >>>>>>>>>>>> require >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> > deserialize >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >>>>>>> overhead. >>>>>>>>> I >>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> suggest to >>>>>>>>>>>>>>>>>>> > no >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the >>>>> overhead. >>>>>>> If >>>>>>>>> this >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> becomes a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > problem in >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still add name >>>>> spacing >>>>>>>>> later >>>>>>>>>>>> on. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Agreed. I will go with using a >> reserved >>>>>>> string >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> document it. >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> My main concern about the design it >> the >>>>>>> type of >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> result KTable: >>>>>>>>>>>>>>>>>>> > If >>>>>>>>>>>>>>>>>>> > >>>>>>> I >>>>>>>>>>>>>>>>>>> > >>>>>>> understood the proposal correctly, >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> In your example, you have table1 and >>>>> table2 >>>>>>>>>>>> swapped. >>>>>>>>>>>>>>>>>>> Here is how it >>>>>>>>>>>>>>>>>>> > >>>>>>> works >>>>>>>>>>>>>>>>>>> > >>>>>>> currently: >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> 1) table1 has the records that contain >>>>> the >>>>>>>>>>>> foreign key >>>>>>>>>>>>>>>>>>> within their >>>>>>>>>>>>>>>>>>> > >>>>>>> value. >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>, >>>>>>>>>>>>>>>>>>> > >>>>>>> <c,(fk=B,bar=3)> >>>>>>>>>>>>>>>>>>> > >>>>>>> table2 input stream: <A,X>, <B,Y> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> 2) A Value mapper is required to >> extract >>>>>>> the >>>>>>>>>>>> foreign >>>>>>>>>>>>>>>>>>> key. >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 foreign key mapper: ( value => >>>>>>> value.fk >>>>>>>>>>>>>>>>>>> <http://value.fk> ) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> The mapper is applied to each element >> in >>>>>>>>> table1, >>>>>>>>>>>> and a >>>>>>>>>>>>>>>>>>> new combined >>>>>>>>>>>>>>>>>>> > >>>>>>> key is >>>>>>>>>>>>>>>>>>> > >>>>>>> made: >>>>>>>>>>>>>>>>>>> > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, >>>>> <A-b, >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>, >>>>>>>>>>>>>>>>>>> <B-c, >>>>>>>>>>>>>>>>>>> > >>>>>>> (fk=B,bar=3)> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> 3) The rekeyed events are >> copartitioned >>>>>>> with >>>>>>>>>>>> table2: >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> a) Stream Thread with Partition 0: >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <A-a, >>>>> (fk=A,bar=1)>, >>>>>>> <A-b, >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)> >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <A,X> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> b) Stream Thread with Partition 1: >>>>>>>>>>>>>>>>>>> > >>>>>>> RepartitionedTable1: <B-c, >> (fk=B,bar=3)> >>>>>>>>>>>>>>>>>>> > >>>>>>> Table2: <B,Y> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> 4) From here, they can be joined >>>>> together >>>>>>>>> locally >>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>> applying the >>>>>>>>>>>>>>>>>>> > >>>>>>> joiner >>>>>>>>>>>>>>>>>>> > >>>>>>> function. >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> At this point, Jan's design and my >>>>> design >>>>>>>>>>>> deviate. My >>>>>>>>>>>>>>>>>>> design goes >>>>>>>>>>>>>>>>>>> > on >>>>>>>>>>>>>>>>>>> > >>>>>>> to >>>>>>>>>>>>>>>>>>> > >>>>>>> repartition the data post-join and >>>>> resolve >>>>>>>>>>>> out-of-order >>>>>>>>>>>>>>>>>>> arrival of >>>>>>>>>>>>>>>>>>> > >>>>>>> records, >>>>>>>>>>>>>>>>>>> > >>>>>>> finally returning the data keyed just >>>>> the >>>>>>>>>>>> original key. >>>>>>>>>>>>>>>>>>> I do not >>>>>>>>>>>>>>>>>>> > >>>>>>> expose >>>>>>>>>>>>>>>>>>> > >>>>>>> the >>>>>>>>>>>>>>>>>>> > >>>>>>> CombinedKey or any of the internals >>>>>>> outside of >>>>>>>>> the >>>>>>>>>>>>>>>>>>> joinOnForeignKey >>>>>>>>>>>>>>>>>>> > >>>>>>> function. This does make for larger >>>>>>> footprint, >>>>>>>>>>>> but it >>>>>>>>>>>>>>>>>>> removes all >>>>>>>>>>>>>>>>>>> > >>>>>>> agency >>>>>>>>>>>>>>>>>>> > >>>>>>> for resolving out-of-order arrivals >> and >>>>>>>>> handling >>>>>>>>>>>>>>>>>>> CombinedKeys from >>>>>>>>>>>>>>>>>>> > the >>>>>>>>>>>>>>>>>>> > >>>>>>> user. I believe that this makes the >>>>>>> function >>>>>>>>> much >>>>>>>>>>>>>>>>>>> easier >>>>>>>>>>>>>>>>>>> to use. >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> Let me know if this helps resolve your >>>>>>>>> questions, >>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> please feel >>>>>>>>>>>>>>>>>>> > >>>>>>> free to >>>>>>>>>>>>>>>>>>> > >>>>>>> add anything else on your mind. >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> Thanks again, >>>>>>>>>>>>>>>>>>> > >>>>>>> Adam >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, >>>>> Matthias J. >>>>>>>>> Sax < >>>>>>>>>>>>>>>>>>> > >>>>>>> matth...@confluent.io <mailto: >>>>>>>>>>>> matth...@confluent.io>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> wrote: >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>> Hi, >>>>>>>>>>>>>>>>>>> > >>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> I am just catching up on this >> thread. I >>>>>>> did >>>>>>>>> not >>>>>>>>>>>> read >>>>>>>>>>>>>>>>>>> everything so >>>>>>>>>>>>>>>>>>> > >>>>>>>> far, >>>>>>>>>>>>>>>>>>> > >>>>>>>> but want to share couple of initial >>>>>>> thoughts: >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Headers: I think there is a >> fundamental >>>>>>>>>>>> difference >>>>>>>>>>>>>>>>>>> between header >>>>>>>>>>>>>>>>>>> > >>>>>>>> usage >>>>>>>>>>>>>>>>>>> > >>>>>>>> in this KIP and KP-258. For 258, we >> add >>>>>>>>> headers >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> changelog topic >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >>>>>>>>>>>>>>>>>>> > >>>>>>>> are owned by Kafka Streams and nobody >>>>>>> else is >>>>>>>>>>>> supposed >>>>>>>>>>>>>>>>>>> to write >>>>>>>>>>>>>>>>>>> > into >>>>>>>>>>>>>>>>>>> > >>>>>>>> them. In fact, no user header are >>>>> written >>>>>>> into >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> changelog topic >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >>>>>>>>>>>>>>>>>>> > >>>>>>>> thus, there are not conflicts. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Nevertheless, I don't see a big issue >>>>> with >>>>>>>>> using >>>>>>>>>>>>>>>>>>> headers within >>>>>>>>>>>>>>>>>>> > >>>>>>>> Streams. >>>>>>>>>>>>>>>>>>> > >>>>>>>> As long as we document it, we can >> have >>>>>>> some >>>>>>>>>>>> "reserved" >>>>>>>>>>>>>>>>>>> header keys >>>>>>>>>>>>>>>>>>> > >>>>>>>> and >>>>>>>>>>>>>>>>>>> > >>>>>>>> users are not allowed to use when >>>>>>> processing >>>>>>>>>>>> data with >>>>>>>>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>>>>>> > Streams. >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this should be ok. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> I think there is a safe way to avoid >>>>>>>>> conflicts, >>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>> > headers >>>>>>>>>>>>>>>>>>> > >>>>>>>> are >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>> only needed in internal topics (I >>>>> think): >>>>>>>>>>>>>>>>>>> > >>>>>>>>> For internal and changelog topics, >> we >>>>> can >>>>>>>>>>>> namespace >>>>>>>>>>>>>>>>>>> all headers: >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * user-defined headers are >> namespaced >>>>> as >>>>>>>>>>>> "external." >>>>>>>>>>>>>>>>>>> + >>>>>>>>>>>>>>>>>>> headerKey >>>>>>>>>>>>>>>>>>> > >>>>>>>>> * internal headers are namespaced as >>>>>>>>>>>> "internal." + >>>>>>>>>>>>>>>>>>> headerKey >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>> While name spacing would be >> possible, >>>>> it >>>>>>>>> would >>>>>>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>> deserialize >>>>>>>>>>>>>>>>>>> > >>>>>>>> user headers what implies a runtime >>>>>>> overhead. >>>>>>>>> I >>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>> suggest to >>>>>>>>>>>>>>>>>>> > no >>>>>>>>>>>>>>>>>>> > >>>>>>>> namespace for now to avoid the >>>>> overhead. >>>>>>> If >>>>>>>>> this >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> becomes a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > problem in >>>>>>>>>>>>>>>>>>> > >>>>>>>> the future, we can still add name >>>>> spacing >>>>>>>>> later >>>>>>>>>>>> on. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> My main concern about the design it >> the >>>>>>> type >>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> result KTable: >>>>>>>>>>>>>>>>>>> > >>>>>>>> If I >>>>>>>>>>>>>>>>>>> > >>>>>>>> understood the proposal correctly, >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V1> table1 = ... >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K2,V2> table2 = ... >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> KTable<K1,V3> joinedTable = >>>>>>>>>>>> table1.join(table2,...); >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> implies that the `joinedTable` has >> the >>>>>>> same >>>>>>>>> key >>>>>>>>>>>> as the >>>>>>>>>>>>>>>>>>> left input >>>>>>>>>>>>>>>>>>> > >>>>>>>> table. >>>>>>>>>>>>>>>>>>> > >>>>>>>> IMHO, this does not work because if >>>>> table2 >>>>>>>>>>>> contains >>>>>>>>>>>>>>>>>>> multiple rows >>>>>>>>>>>>>>>>>>> > >>>>>>>> that >>>>>>>>>>>>>>>>>>> > >>>>>>>> join with a record in table1 (what is >>>>> the >>>>>>> main >>>>>>>>>>>> purpose >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> > foreign >>>>>>>>>>>>>>>>>>> > >>>>>>>> key >>>>>>>>>>>>>>>>>>> > >>>>>>>> join), the result table would only >>>>>>> contain a >>>>>>>>>>>> single >>>>>>>>>>>>>>>>>>> join result, >>>>>>>>>>>>>>>>>>> > but >>>>>>>>>>>>>>>>>>> > >>>>>>>> not >>>>>>>>>>>>>>>>>>> > >>>>>>>> multiple. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Example: >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> table1 input stream: <A,X> >>>>>>>>>>>>>>>>>>> > >>>>>>>> table2 input stream: <a,(A,1)>, >>>>> <b,(A,2)> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> We use table2 value a foreign key to >>>>>>> table1 >>>>>>>>> key >>>>>>>>>>>> (ie, >>>>>>>>>>>>>>>>>>> "A" joins). >>>>>>>>>>>>>>>>>>> > If >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >>>>>>>>>>>>>>>>>>> > >>>>>>>> result key is the same key as key of >>>>>>> table1, >>>>>>>>> this >>>>>>>>>>>>>>>>>>> implies that the >>>>>>>>>>>>>>>>>>> > >>>>>>>> result can either be <A, join(X,1)> >> or >>>>> <A, >>>>>>>>>>>> join(X,2)> >>>>>>>>>>>>>>>>>>> but not >>>>>>>>>>>>>>>>>>> > both. >>>>>>>>>>>>>>>>>>> > >>>>>>>> Because the share the same key, >>>>> whatever >>>>>>>>> result >>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>> we emit >>>>>>>>>>>>>>>>>>> > later, >>>>>>>>>>>>>>>>>>> > >>>>>>>> overwrite the previous result. >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> This is the reason why Jan originally >>>>>>> proposed >>>>>>>>>>>> to use >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> > combination >>>>>>>>>>>>>>>>>>> > >>>>>>>> of >>>>>>>>>>>>>>>>>>> > >>>>>>>> both primary keys of the input tables >>>>> as >>>>>>> key >>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> output table. >>>>>>>>>>>>>>>>>>> > >>>>>>>> This >>>>>>>>>>>>>>>>>>> > >>>>>>>> makes the keys of the output table >>>>> unique >>>>>>> and >>>>>>>>> we >>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>> store both in >>>>>>>>>>>>>>>>>>> > >>>>>>>> the >>>>>>>>>>>>>>>>>>> > >>>>>>>> output table: >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Result would be <A-a, join(X,1)>, >> <A-b, >>>>>>>>>>>> join(X,2)> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Thoughts? >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak >> wrote: >>>>>>>>>>>>>>>>>>> > >>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>> Just on remark here. >>>>>>>>>>>>>>>>>>> > >>>>>>>>> The high-watermark could be >>>>> disregarded. >>>>>>> The >>>>>>>>>>>> decision >>>>>>>>>>>>>>>>>>> about the >>>>>>>>>>>>>>>>>>> > >>>>>>>>> forward >>>>>>>>>>>>>>>>>>> > >>>>>>>>> depends on the size of the >> aggregated >>>>>>> map. >>>>>>>>>>>>>>>>>>> > >>>>>>>>> Only 1 element long maps would be >>>>>>> unpacked >>>>>>>>> and >>>>>>>>>>>>>>>>>>> forwarded. 0 >>>>>>>>>>>>>>>>>>> > element >>>>>>>>>>>>>>>>>>> > >>>>>>>>> maps >>>>>>>>>>>>>>>>>>> > >>>>>>>>> would be published as delete. Any >>>>> other >>>>>>> count >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of map entries is in "waiting for >>>>> correct >>>>>>>>>>>> deletes to >>>>>>>>>>>>>>>>>>> > arrive"-state. >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare >>>>>>> wrote: >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>> It does look like I could replace >> the >>>>>>> second >>>>>>>>>>>>>>>>>>> repartition store >>>>>>>>>>>>>>>>>>> > and >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> highwater store with a groupBy and >>>>>>> reduce. >>>>>>>>>>>> However, >>>>>>>>>>>>>>>>>>> it looks >>>>>>>>>>>>>>>>>>> > like >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> I >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> would >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> still need to store the highwater >>>>> value >>>>>>>>> within >>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> store, >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>> to >>>>>>>>>>>>>>>>>>> > >>>>>>>>> compare the arrival of out-of-order >>>>>>> records >>>>>>>>>>>> (assuming >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > >>>>>>>>> understanding >>>>>>>>>>>>>>>>>>> > >>>>>>>>> of >>>>>>>>>>>>>>>>>>> > >>>>>>>>> THIS is correct...). This in effect >> is >>>>>>> the >>>>>>>>> same >>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> design I >>>>>>>>>>>>>>>>>>> > have >>>>>>>>>>>>>>>>>>> > >>>>>>>>> now, >>>>>>>>>>>>>>>>>>> > >>>>>>>>> just with the two tables merged >>>>> together. >>>>>>>>>>>>>>>>>>> > >>>>>>>>> >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > >