On 10.12.2018 07:42, Guozhang Wang wrote:
> Hello Adam / Jan / John,
>
> Sorry for being late on this thread! I've finally got some time this
> weekend to cleanup a load of tasks on my queue (actually I've also realized
> there are a bunch of other things I need to enqueue while cleaning them up
> --- sth I need to improve on my side). So here are my thoughts:
>
> Regarding the APIs: I like the current written API in the KIP. More
> generally I'd prefer to keep the 1) one-to-many join functionalities as
> well as 2) other join types than inner as separate KIPs since 1) may worth
> a general API refactoring that can benefit not only foreignkey joins but
> collocate joins as well (e.g. an extended proposal of
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup),
> and I'm not sure if other join types would actually be needed (maybe left
> join still makes sense), so it's better to wait-for-people-to-ask-and-add
> than add-sth-that-no-one-uses.
>
> Regarding whether we enforce step 3) / 4) v.s. introducing a
> KScatteredTable for users to inject their own optimization: I'd prefer to
> do the current option as-is, and my main rationale is for optimization
> rooms inside the Streams internals and the API succinctness. For advanced
> users who may indeed prefer KScatteredTable and do their own optimization,
> while it is too much of the work to use Processor API directly, I think we
> can still extend the current API to support it in the future if it becomes
> necessary.

no internal optimization potential. it's a myth

¯\_(ツ)_/¯

:-)

>
> Another note about step 4) resolving out-of-ordering data, as I mentioned
> before I think with KIP-258 (embedded timestamp with key-value store) we
> can actually make this step simpler than the current proposal. In fact, we
> can just keep a single final-result store with timestamps and reject values
> that have a smaller timestamp, is that right?

Which is the correct output should at least be decided on the offset of 
the original message.

>
>
> That's all I have in mind now. Again, great appreciation to Adam to make
> such HUGE progress on this KIP!
>
>
> Guozhang
>
> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>> If they don't find the time:
>> They usually take the opposite path from me :D
>> so the answer would be clear.
>>
>> hence my suggestion to vote.
>>
>>
>> On 04.12.2018 21:06, Adam Bellemare wrote:
>>> Hi Guozhang and Matthias
>>>
>>> I know both of you are quite busy, but we've gotten this KIP to a point
>>> where we need more guidance on the API (perhaps a bit of a tie-breaker,
>> if
>>> you will). If you have anyone else you may think should look at this,
>>> please tag them accordingly.
>>>
>>> The scenario is as such:
>>>
>>> Current Option:
>>> API:
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
>>> 1) Rekey the data to CombinedKey, and shuffles it to the partition with
>> the
>>> foreignKey (repartition 1)
>>> 2) Join the data
>>> 3) Shuffle the data back to the original node (repartition 2)
>>> 4) Resolve out-of-order arrival / race condition due to foreign-key
>> changes.
>>>
>>> Alternate Option:
>>> Perform #1 and #2 above, and return a KScatteredTable.
>>> - It would be keyed on a wrapped key function: <CombinedKey<KO, K>, VR>
>> (KO
>>> = Other Table Key, K = This Table Key, VR = Joined Result)
>>> - KScatteredTable.resolve() would perform #3 and #4 but otherwise a user
>>> would be able to perform additional functions directly from the
>>> KScatteredTable (TBD - currently out of scope).
>>> - John's analysis 2-emails up is accurate as to the tradeoffs.
>>>
>>> Current Option is coded as-is. Alternate option is possible, but will
>>> require for implementation details to be made in the API and some
>> exposure
>>> of new data structures into the API (ie: CombinedKey).
>>>
>>> I appreciate any insight into this.
>>>
>>> Thanks.
>>>
>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <adam.bellem...@gmail.com>
>>> wrote:
>>>
>>>> Hi John
>>>>
>>>> Thanks for your feedback and assistance. I think your summary is
>> accurate
>>>> from my perspective. Additionally, I would like to add that there is a
>> risk
>>>> of inconsistent final states without performing the resolution. This is
>> a
>>>> major concern for me as most of the data I have dealt with is produced
>> by
>>>> relational databases. We have seen a number of cases where a user in the
>>>> Rails UI has modified the field (foreign key), realized they made a
>>>> mistake, and then updated the field again with a new key. The events are
>>>> propagated out as they are produced, and as such we have had real-world
>>>> cases where these inconsistencies were propagated downstream as the
>> final
>>>> values due to the race conditions in the fanout of the data.
>>>>
>>>> This solution that I propose values correctness of the final result over
>>>> other factors.
>>>>
>>>> We could always move this function over to using a KScatteredTable
>>>> implementation in the future, and simply deprecate it this join API in
>>>> time. I think I would like to hear more from some of the other major
>>>> committers on which course of action they would think is best before any
>>>> more coding is done.
>>>>
>>>> Thanks again
>>>>
>>>> Adam
>>>>
>>>>
>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <j...@confluent.io> wrote:
>>>>
>>>>> Hi Jan and Adam,
>>>>>
>>>>> Wow, thanks for doing that test, Adam. Those results are encouraging.
>>>>>
>>>>> Thanks for your performance experience as well, Jan. I agree that
>> avoiding
>>>>> unnecessary join outputs is especially important when the fan-out is so
>>>>> high. I suppose this could also be built into the implementation we're
>>>>> discussing, but it wouldn't have to be specified in the KIP (since
>> it's an
>>>>> API-transparent optimization).
>>>>>
>>>>> As far as whether or not to re-repartition the data, I didn't bring it
>> up
>>>>> because it sounded like the two of you agreed to leave the KIP as-is,
>>>>> despite the disagreement.
>>>>>
>>>>> If you want my opinion, I feel like both approaches are reasonable.
>>>>> It sounds like Jan values more the potential for developers to optimize
>>>>> their topologies to re-use the intermediate nodes, whereas Adam places
>>>>> more
>>>>> value on having a single operator that people can use without extra
>> steps
>>>>> at the end.
>>>>>
>>>>> Personally, although I do find it exceptionally annoying when a
>> framework
>>>>> gets in my way when I'm trying to optimize something, it seems better
>> to
>>>>> go
>>>>> for a single operation.
>>>>> * Encapsulating the internal transitions gives us significant latitude
>> in
>>>>> the implementation (for example, joining only at the end, not in the
>>>>> middle
>>>>> to avoid extra data copying and out-of-order resolution; how we
>> represent
>>>>> the first repartition keys (combined keys vs. value vectors), etc.).
>> If we
>>>>> publish something like a KScatteredTable with the right-partitioned
>> joined
>>>>> data, then the API pretty much locks in the implementation as well.
>>>>> * The API seems simpler to understand and use. I do mean "seems"; if
>>>>> anyone
>>>>> wants to make the case that KScatteredTable is actually simpler, I
>> think
>>>>> hypothetical usage code would help. From a relational algebra
>> perspective,
>>>>> it seems like KTable.join(KTable) should produce a new KTable in all
>>>>> cases.
>>>>> * That said, there might still be room in the API for a different
>>>>> operation
>>>>> like what Jan has proposed to scatter a KTable, and then do things like
>>>>> join, re-group, etc from there... I'm not sure; I haven't thought
>> through
>>>>> all the consequences yet.
>>>>>
>>>>> This is all just my opinion after thinking over the discussion so
>> far...
>>>>> -John
>>>>>
>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
>> adam.bellem...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Updated the PR to take into account John's feedback.
>>>>>>
>>>>>> I did some preliminary testing for the performance of the prefixScan.
>> I
>>>>>> have attached the file, but I will also include the text in the body
>>>>> here
>>>>>> for archival purposes (I am not sure what happens to attached files).
>> I
>>>>>> also updated the PR and the KIP accordingly.
>>>>>>
>>>>>> Summary: It scales exceptionally well for scanning large values of
>>>>>> records. As Jan mentioned previously, the real issue would be more
>>>>> around
>>>>>> processing the resulting records after obtaining them. For instance,
>> it
>>>>>> takes approximately ~80-120 mS to flush the buffer and a further
>>>>> ~35-85mS
>>>>>> to scan 27.5M records, obtaining matches for 2.5M of them. Iterating
>>>>>> through the records just to generate a simple count takes ~ 40 times
>>>>> longer
>>>>>> than the flush + scan combined.
>>>>>>
>>>>>>
>>>>>
>> ============================================================================================
>>>>>> Setup:
>>>>>>
>>>>>>
>>>>>
>> ============================================================================================
>>>>>> Java 9 with default settings aside from a 512 MB heap (Xmx512m,
>> Xms512m)
>>>>>> CPU: i7 2.2 Ghz.
>>>>>>
>>>>>> Note: I am using a slightly-modified, directly-accessible Kafka
>> Streams
>>>>>> RocksDB
>>>>>> implementation (RocksDB.java, basically just avoiding the
>>>>>> ProcessorContext).
>>>>>> There are no modifications to the default RocksDB values provided in
>> the
>>>>>> 2.1/trunk release.
>>>>>>
>>>>>>
>>>>>> keysize = 128 bytes
>>>>>> valsize = 512 bytes
>>>>>>
>>>>>> Step 1:
>>>>>> Write X positive matching events: (key = prefix + left-padded
>>>>>> auto-incrementing integer)
>>>>>> Step 2:
>>>>>> Write 10X negative matching events (key = left-padded
>> auto-incrementing
>>>>>> integer)
>>>>>> Step 3:
>>>>>> Perform flush
>>>>>> Step 4:
>>>>>> Perform prefixScan
>>>>>> Step 5:
>>>>>> Iterate through return Iterator and validate the count of expected
>>>>> events.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>> ============================================================================================
>>>>>> Results:
>>>>>>
>>>>>>
>>>>>
>> ============================================================================================
>>>>>> X = 1k (11k events total)
>>>>>> Flush Time = 39 mS
>>>>>> Scan Time = 7 mS
>>>>>> 6.9 MB disk
>>>>>>
>>>>>>
>>>>>
>> --------------------------------------------------------------------------------------------
>>>>>> X = 10k (110k events total)
>>>>>> Flush Time = 45 mS
>>>>>> Scan Time = 8 mS
>>>>>> 127 MB
>>>>>>
>>>>>>
>>>>>
>> --------------------------------------------------------------------------------------------
>>>>>> X = 100k (1.1M events total)
>>>>>> Test1:
>>>>>> Flush Time = 60 mS
>>>>>> Scan Time = 12 mS
>>>>>> 678 MB
>>>>>>
>>>>>> Test2:
>>>>>> Flush Time = 45 mS
>>>>>> Scan Time = 7 mS
>>>>>> 576 MB
>>>>>>
>>>>>>
>>>>>
>> --------------------------------------------------------------------------------------------
>>>>>> X = 1MB (11M events total)
>>>>>> Test1:
>>>>>> Flush Time = 52 mS
>>>>>> Scan Time = 19 mS
>>>>>> 7.2 GB
>>>>>>
>>>>>> Test2:
>>>>>> Flush Time = 84 mS
>>>>>> Scan Time = 34 mS
>>>>>> 9.1 GB
>>>>>>
>>>>>>
>>>>>
>> --------------------------------------------------------------------------------------------
>>>>>> X = 2.5M (27.5M events total)
>>>>>> Test1:
>>>>>> Flush Time = 82 mS
>>>>>> Scan Time = 63 mS
>>>>>> 17GB - 276 sst files
>>>>>>
>>>>>> Test2:
>>>>>> Flush Time = 116 mS
>>>>>> Scan Time = 35 mS
>>>>>> 23GB - 361 sst files
>>>>>>
>>>>>> Test3:
>>>>>> Flush Time = 103 mS
>>>>>> Scan Time = 82 mS
>>>>>> 19 GB - 300 sst files
>>>>>>
>>>>>>
>>>>>
>> --------------------------------------------------------------------------------------------
>>>>>>
>>>>>> I had to limit my testing on my laptop to X = 2.5M events. I tried to
>> go
>>>>>> to X = 10M (110M events) but RocksDB was going into the 100GB+ range
>>>>> and my
>>>>>> laptop ran out of disk. More extensive testing could be done but I
>>>>> suspect
>>>>>> that it would be in line with what we're seeing in the results above.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> At this point in time, I think the only major discussion point is
>> really
>>>>>> around what Jan and I have disagreed on: repartitioning back +
>> resolving
>>>>>> potential out of order issues or leaving that up to the client to
>>>>> handle.
>>>>>>
>>>>>>
>>>>>> Thanks folks,
>>>>>>
>>>>>> Adam
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <jan.filip...@trivago.com
>>>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 29.11.2018 15:14, John Roesler wrote:
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> Sorry that this discussion petered out... I think the 2.1 release
>>>>>>> caused an
>>>>>>>> extended distraction that pushed it off everyone's radar (which was
>>>>>>>> precisely Adam's concern). Personally, I've also had some extend
>>>>>>>> distractions of my own that kept (and continue to keep) me
>>>>> preoccupied.
>>>>>>>>
>>>>>>>> However, calling for a vote did wake me up, so I guess Jan was on
>> the
>>>>>>> right
>>>>>>>> track!
>>>>>>>>
>>>>>>>> I've gone back and reviewed the whole KIP document and the prior
>>>>>>>> discussion, and I'd like to offer a few thoughts:
>>>>>>>>
>>>>>>>> API Thoughts:
>>>>>>>>
>>>>>>>> 1. If I read the KIP right, you are proposing a many-to-one join.
>>>>> Could
>>>>>>> we
>>>>>>>> consider naming it manyToOneJoin? Or, if you prefer, flip the design
>>>>>>> around
>>>>>>>> and make it a oneToManyJoin?
>>>>>>>>
>>>>>>>> The proposed name "joinOnForeignKey" disguises the join type, and it
>>>>>>> seems
>>>>>>>> like it might trick some people into using it for a one-to-one join.
>>>>>>> This
>>>>>>>> would work, of course, but it would be super inefficient compared to
>>>>> a
>>>>>>>> simple rekey-and-join.
>>>>>>>>
>>>>>>>> 2. I might have missed it, but I don't think it's specified whether
>>>>>>> it's an
>>>>>>>> inner, outer, or left join. I'm guessing an outer join, as
>>>>> (neglecting
>>>>>>> IQ),
>>>>>>>> the rest can be achieved by filtering or by handling it in the
>>>>>>> ValueJoiner.
>>>>>>>>
>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look quite right.
>>>>>>>> 3a. Regarding Serialized: There are a few different paradigms in
>>>>> play in
>>>>>>>> the Streams API, so it's confusing, but instead of three Serialized
>>>>>>> args, I
>>>>>>>> think it would be better to have one that allows (optionally)
>> setting
>>>>>>> the 4
>>>>>>>> incoming serdes. The result serde is defined by the Materialized.
>> The
>>>>>>>> incoming serdes can be optional because they might already be
>>>>> available
>>>>>>> on
>>>>>>>> the source KTables, or the default serdes from the config might be
>>>>>>>> applicable.
>>>>>>>>
>>>>>>>> 3b. Is the StreamPartitioner necessary? The other joins don't allow
>>>>>>> setting
>>>>>>>> one, and it seems like it might actually be harmful, since the rekey
>>>>>>>> operation needs to produce results that are co-partitioned with the
>>>>>>> "other"
>>>>>>>> KTable.
>>>>>>>>
>>>>>>>> 4. I'm fine with the "reserved word" header, but I didn't actually
>>>>>>> follow
>>>>>>>> what Matthias meant about namespacing requiring "deserializing" the
>>>>>>> record
>>>>>>>> header. The headers are already Strings, so I don't think that
>>>>>>>> deserialization is required. If we applied the namespace at source
>>>>> nodes
>>>>>>>> and stripped it at sink nodes, this would be practically no
>> overhead.
>>>>>>> The
>>>>>>>> advantage of the namespace idea is that no public API change wrt
>>>>> headers
>>>>>>>> needs to happen, and no restrictions need to be placed on users'
>>>>>>> headers.
>>>>>>>>
>>>>>>>> (Although I'm wondering if we can get away without the header at
>>>>> all...
>>>>>>>> stay tuned)
>>>>>>>>
>>>>>>>> 5. I also didn't follow the discussion about the HWM table growing
>>>>>>> without
>>>>>>>> bound. As I read it, the HWM table is effectively implementing OCC
>> to
>>>>>>>> resolve the problem you noted with disordering when the rekey is
>>>>>>>> reversed... particularly notable when the FK changes. As such, it
>>>>> only
>>>>>>>> needs to track the most recent "version" (the offset in the source
>>>>>>>> partition) of each key. Therefore, it should have the same number of
>>>>>>> keys
>>>>>>>> as the source table at all times.
>>>>>>>>
>>>>>>>> I see that you are aware of KIP-258, which I think might be relevant
>>>>> in
>>>>>>> a
>>>>>>>> couple of ways. One: it's just about storing the timestamp in the
>>>>> state
>>>>>>>> store, but the ultimate idea is to effectively use the timestamp as
>>>>> an
>>>>>>> OCC
>>>>>>>> "version" to drop disordered updates. You wouldn't want to use the
>>>>>>>> timestamp for this operation, but if you were to use a similar
>>>>>>> mechanism to
>>>>>>>> store the source offset in the store alongside the re-keyed values,
>>>>> then
>>>>>>>> you could avoid a separate table.
>>>>>>>>
>>>>>>>> 6. You and Jan have been thinking about this for a long time, so
>> I've
>>>>>>>> probably missed something here, but I'm wondering if we can avoid
>> the
>>>>>>> HWM
>>>>>>>> tracking at all and resolve out-of-order during a final join
>>>>> instead...
>>>>>>>>
>>>>>>>> Let's say we're joining a left table (Integer K: Letter FK, (other
>>>>>>> data))
>>>>>>>> to a right table (Letter K: (some data)).
>>>>>>>>
>>>>>>>> Left table:
>>>>>>>> 1: (A, xyz)
>>>>>>>> 2: (B, asd)
>>>>>>>>
>>>>>>>> Right table:
>>>>>>>> A: EntityA
>>>>>>>> B: EntityB
>>>>>>>>
>>>>>>>> We could do a rekey as you proposed with a combined key, but not
>>>>>>>> propagating the value at all..
>>>>>>>> Rekey table:
>>>>>>>> A-1: (dummy value)
>>>>>>>> B-2: (dummy value)
>>>>>>>>
>>>>>>>> Which we then join with the right table to produce:
>>>>>>>> A-1: EntityA
>>>>>>>> B-2: EntityB
>>>>>>>>
>>>>>>>> Which gets rekeyed back:
>>>>>>>> 1: A, EntityA
>>>>>>>> 2: B, EntityB
>>>>>>>>
>>>>>>>> And finally we do the actual join:
>>>>>>>> Result table:
>>>>>>>> 1: ((A, xyz), EntityA)
>>>>>>>> 2: ((B, asd), EntityB)
>>>>>>>>
>>>>>>>> The thing is that in that last join, we have the opportunity to
>>>>> compare
>>>>>>> the
>>>>>>>> current FK in the left table with the incoming PK of the right
>>>>> table. If
>>>>>>>> they don't match, we just drop the event, since it must be outdated.
>>>>>>>>
>>>>>>>
>>>>>>>> In your KIP, you gave an example in which (1: A, xyz) gets updated
>> to
>>>>>>> (1:
>>>>>>>> B, xyz), ultimately yielding a conundrum about whether the final
>>>>> state
>>>>>>>> should be (1: null) or (1: joined-on-B). With the algorithm above,
>>>>> you
>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B,
>>>>>>>> EntityB)). It seems like this does give you enough information to
>>>>> make
>>>>>>> the
>>>>>>>> right choice, regardless of disordering.
>>>>>>>
>>>>>>> Will check Adams patch, but this should work. As mentioned often I am
>>>>>>> not convinced on partitioning back for the user automatically. I
>> think
>>>>>>> this is the real performance eater ;)
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 7. Last thought... I'm a little concerned about the performance of
>>>>> the
>>>>>>>> range scans when records change in the right table. You've said that
>>>>>>> you've
>>>>>>>> been using the algorithm you presented in production for a while.
>> Can
>>>>>>> you
>>>>>>>> give us a sense of the performance characteristics you've observed?
>>>>>>>>
>>>>>>>
>>>>>>> Make it work, make it fast, make it beautiful. The topmost thing here
>>>>> is
>>>>>>> / was correctness. In practice I do not measure the performance of
>> the
>>>>>>> range scan. Usual cases I run this with is emitting 500k - 1kk rows
>>>>>>> on a left hand side change. The range scan is just the work you gotta
>>>>>>> do, also when you pack your data into different formats, usually the
>>>>>>> rocks performance is very tight to the size of the data and we can't
>>>>>>> really change that. It is more important for users to prevent useless
>>>>>>> updates to begin with. My left hand side is guarded to drop changes
>>>>> that
>>>>>>> are not going to change my join output.
>>>>>>>
>>>>>>> usually it's:
>>>>>>>
>>>>>>> drop unused fields and then don't forward if old.equals(new)
>>>>>>>
>>>>>>> regarding to the performance of creating an iterator for smaller
>>>>>>> fanouts, users can still just do a group by first then anyways.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> I could only think of one alternative, but I'm not sure if it's
>>>>> better
>>>>>>> or
>>>>>>>> worse... If the first re-key only needs to preserve the original
>> key,
>>>>>>> as I
>>>>>>>> proposed in #6, then we could store a vector of keys in the value:
>>>>>>>>
>>>>>>>> Left table:
>>>>>>>> 1: A,...
>>>>>>>> 2: B,...
>>>>>>>> 3: A,...
>>>>>>>>
>>>>>>>> Gets re-keyed:
>>>>>>>> A: [1, 3]
>>>>>>>> B: [2]
>>>>>>>>
>>>>>>>> Then, the rhs part of the join would only need a regular single-key
>>>>>>> lookup.
>>>>>>>> Of course we have to deal with the problem of large values, as
>>>>> there's
>>>>>>> no
>>>>>>>> bound on the number of lhs records that can reference rhs records.
>>>>>>> Offhand,
>>>>>>>> I'd say we could page the values, so when one row is past the
>>>>>>> threshold, we
>>>>>>>> append the key for the next page. Then in most cases, it would be a
>>>>>>> single
>>>>>>>> key lookup, but for large fan-out updates, it would be one per (max
>>>>>>> value
>>>>>>>> size)/(avg lhs key size).
>>>>>>>>
>>>>>>>> This seems more complex, though... Plus, I think there's some extra
>>>>>>>> tracking we'd need to do to know when to emit a retraction. For
>>>>> example,
>>>>>>>> when record 1 is deleted, the re-key table would just have (A: [3]).
>>>>>>> Some
>>>>>>>> kind of tombstone is needed so that the join result for 1 can also
>> be
>>>>>>>> retracted.
>>>>>>>>
>>>>>>>> That's all!
>>>>>>>>
>>>>>>>> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry
>> the
>>>>>>>> discussion has been slow.
>>>>>>>> -John
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
>>>>> jan.filip...@trivago.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Id say you can just call the vote.
>>>>>>>>>
>>>>>>>>> that happens all the time, and if something comes up, it just goes
>>>>> back
>>>>>>>>> to discuss.
>>>>>>>>>
>>>>>>>>> would not expect to much attention with another another email in
>>>>> this
>>>>>>>>> thread.
>>>>>>>>>
>>>>>>>>> best Jan
>>>>>>>>>
>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
>>>>>>>>>> Hello Contributors
>>>>>>>>>>
>>>>>>>>>> I know that 2.1 is about to be released, but I do need to bump
>>>>> this to
>>>>>>>>> keep
>>>>>>>>>> visibility up. I am still intending to push this through once
>>>>>>> contributor
>>>>>>>>>> feedback is given.
>>>>>>>>>>
>>>>>>>>>> Main points that need addressing:
>>>>>>>>>> 1) Any way (or benefit) in structuring the current singular graph
>>>>> node
>>>>>>>>> into
>>>>>>>>>> multiple nodes? It has a whopping 25 parameters right now. I am a
>>>>> bit
>>>>>>>>> fuzzy
>>>>>>>>>> on how the optimizations are supposed to work, so I would
>>>>> appreciate
>>>>>>> any
>>>>>>>>>> help on this aspect.
>>>>>>>>>>
>>>>>>>>>> 2) Overall strategy for joining + resolving. This thread has much
>>>>>>>>> discourse
>>>>>>>>>> between Jan and I between the current highwater mark proposal and
>> a
>>>>>>>>> groupBy
>>>>>>>>>> + reduce proposal. I am of the opinion that we need to strictly
>>>>> handle
>>>>>>>>> any
>>>>>>>>>> chance of out-of-order data and leave none of it up to the
>>>>> consumer.
>>>>>>> Any
>>>>>>>>>> comments or suggestions here would also help.
>>>>>>>>>>
>>>>>>>>>> 3) Anything else that you see that would prevent this from moving
>>>>> to a
>>>>>>>>> vote?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> Adam
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>>>>>>>>> adam.bellem...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jan
>>>>>>>>>>>
>>>>>>>>>>> With the Stores.windowStoreBuilder and
>>>>> Stores.persistentWindowStore,
>>>>>>> you
>>>>>>>>>>> actually only need to specify the amount of segments you want and
>>>>> how
>>>>>>>>> large
>>>>>>>>>>> they are. To the best of my understanding, what happens is that
>>>>> the
>>>>>>>>>>> segments are automatically rolled over as new data with new
>>>>>>> timestamps
>>>>>>>>> are
>>>>>>>>>>> created. We use this exact functionality in some of the work done
>>>>>>>>>>> internally at my company. For reference, this is the hopping
>>>>> windowed
>>>>>>>>> store.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>>>>>>>>>>>
>>>>>>>>>>> In the code that I have provided, there are going to be two 24h
>>>>>>>>> segments.
>>>>>>>>>>> When a record is put into the windowStore, it will be inserted at
>>>>>>> time
>>>>>>>>> T in
>>>>>>>>>>> both segments. The two segments will always overlap by 12h. As
>>>>> time
>>>>>>>>> goes on
>>>>>>>>>>> and new records are added (say at time T+12h+), the oldest
>> segment
>>>>>>> will
>>>>>>>>> be
>>>>>>>>>>> automatically deleted and a new segment created. The records are
>>>>> by
>>>>>>>>> default
>>>>>>>>>>> inserted with the context.timestamp(), such that it is the record
>>>>>>> time,
>>>>>>>>> not
>>>>>>>>>>> the clock time, which is used.
>>>>>>>>>>>
>>>>>>>>>>> To the best of my understanding, the timestamps are retained when
>>>>>>>>>>> restoring from the changelog.
>>>>>>>>>>>
>>>>>>>>>>> Basically, this is heavy-handed way to deal with TTL at a
>>>>>>> segment-level,
>>>>>>>>>>> instead of at an individual record level.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Will that work? I expected it to blow up with ClassCastException
>>>>> or
>>>>>>>>>>>> similar.
>>>>>>>>>>>>
>>>>>>>>>>>> You either would have to specify the window you fetch/put or
>>>>> iterate
>>>>>>>>>>>> across all windows the key was found in right?
>>>>>>>>>>>>
>>>>>>>>>>>> I just hope the window-store doesn't check stream-time under the
>>>>>>> hoods
>>>>>>>>>>>> that would be a questionable interface.
>>>>>>>>>>>>
>>>>>>>>>>>> If it does: did you see my comment on checking all the windows
>>>>>>> earlier?
>>>>>>>>>>>> that would be needed to actually give reasonable time gurantees.
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
>>>>>>>>>>>>> Hi Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only changed the state
>>>>>>> store,
>>>>>>>>>>>> not
>>>>>>>>>>>>> the ProcessorSupplier.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the information. This is indeed something that
>>>>> will be
>>>>>>>>>>>>>>> extremely
>>>>>>>>>>>>>>> useful for this KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>> Thanks for your explanations. That being said, I will not be
>>>>>>> moving
>>>>>>>>>>>> ahead
>>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy solution as
>> you
>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>> That being said, if you wish to implement it yourself off of
>>>>> my
>>>>>>>>>>>> current PR
>>>>>>>>>>>>>>> and submit it as a competitive alternative, I would be more
>>>>> than
>>>>>>>>>>>> happy to
>>>>>>>>>>>>>>> help vet that as an alternate solution. As it stands right
>>>>> now,
>>>>>>> I do
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> really have more time to invest into alternatives without
>>>>> there
>>>>>>>>> being
>>>>>>>>>>>> a
>>>>>>>>>>>>>>> strong indication from the binding voters which they would
>>>>>>> prefer.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey, total no worries. I think I personally gave up on the
>>>>> streams
>>>>>>>>> DSL
>>>>>>>>>>>> for
>>>>>>>>>>>>>> some time already, otherwise I would have pulled this KIP
>>>>> through
>>>>>>>>>>>> already.
>>>>>>>>>>>>>> I am currently reimplementing my own DSL based on PAPI.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I will look at finishing up my PR with the windowed state
>>>>> store
>>>>>>> in
>>>>>>>>> the
>>>>>>>>>>>>>>> next
>>>>>>>>>>>>>>> week or so, exercising it via tests, and then I will come
>> back
>>>>>>> for
>>>>>>>>>>>> final
>>>>>>>>>>>>>>> discussions. In the meantime, I hope that any of the binding
>>>>>>> voters
>>>>>>>>>>>> could
>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have updated it
>>>>> according
>>>>>>> to
>>>>>>>>> the
>>>>>>>>>>>>>>> latest plan:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have also updated the KIP PR to use a windowed store. This
>>>>>>> could
>>>>>>>>> be
>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever they are
>>>>> completed.
>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier already updated
>>>>> in
>>>>>>> the
>>>>>>>>>>>> PR?
>>>>>>>>>>>>>> expected it to change to Windowed<K>,Long Missing something?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <
>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is the wrong
>> link,
>>>>>>> as it
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as part of KIP-258
>>>>> we do
>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>> have "handling out-of-order data for source KTable" such
>> that
>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>> blindly apply the updates to the materialized store, i.e.
>>>>>>> following
>>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>> ordering, we will reject updates that are older than the
>>>>> current
>>>>>>>>>>>> key's
>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp ordering.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello Adam,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the final step (i.e.
>>>>> the
>>>>>>>>> high
>>>>>>>>>>>>>>>>> watermark store, now altered to be replaced with a window
>>>>>>> store),
>>>>>>>>> I
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>> another current on-going KIP may actually help:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is for adding the timestamp into a key-value store
>>>>> (i.e.
>>>>>>> only
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its usage, as
>>>>> described
>>>>>>> in
>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that
>>>>> we
>>>>>>> can
>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> "reject" updates from the source topics if its timestamp is
>>>>>>>>> smaller
>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>> the current key's latest update timestamp. I think it is
>>>>> very
>>>>>>>>>>>> similar to
>>>>>>>>>>>>>>>>> what you have in mind for high watermark based filtering,
>>>>> while
>>>>>>>>> you
>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>> need to make sure that the timestamps of the joining
>> records
>>>>>>> are
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> correctly
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> inherited though the whole topology to the final stage.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store and hence
>>>>>>> non-windowed
>>>>>>>>>>>> KTables
>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not really have a good
>>>>>>>>> support
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> their joins anyways (
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107)
>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>> think we can just consider non-windowed KTable-KTable
>>>>> non-key
>>>>>>>>> joins
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Guozhang
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Current highwater mark implementation would grow
>> endlessly
>>>>>>> based
>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> primary key of original event. It is a pair of (<this
>>>>> table
>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> key>,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is used to
>>>>>>> differentiate
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest proposal would be
>>>>> to
>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N. This would allow
>>>>> the
>>>>>>>>> same
>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time. This should
>>>>> allow
>>>>>>> for
>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and should be
>>>>>>> customizable
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10
>>>>>>> minutes
>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> window,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> or perhaps 7 days...).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do the trick
>> here.
>>>>>>> Even
>>>>>>>>>>>> if I
>>>>>>>>>>>>>>>>>> would still like to see the automatic repartitioning
>>>>> optional
>>>>>>>>>>>> since I
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I am a little bit
>>>>>>>>>>>> sceptical
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> how to determine the window. So esentially one could run
>>>>> into
>>>>>>>>>>>> problems
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the rapid change happens near a window border. I will check
>>>>> you
>>>>>>>>>>>>>>>>>> implementation in detail, if its problematic, we could
>>>>> still
>>>>>>>>> check
>>>>>>>>>>>>>>>>>> _all_
>>>>>>>>>>>>>>>>>> windows on read with not to bad performance impact I
>> guess.
>>>>>>> Will
>>>>>>>>>>>> let
>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> know if the implementation would be correct as is. I
>>>>> wouldn't
>>>>>>> not
>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) => timestamp(A)  <
>>>>>>>>> timestamp(B).
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> we can't expect that.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> @Jan
>>>>>>>>>>>>>>>>>>> I believe I understand what you mean now - thanks for the
>>>>>>>>>>>> diagram, it
>>>>>>>>>>>>>>>>>>> did really help. You are correct that I do not have the
>>>>>>> original
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> primary
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> key available, and I can see that if it was available then
>>>>> you
>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>> able to add and remove events from the Map. That being
>>>>> said,
>>>>>>> I
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> encourage
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just for clarity for
>>>>>>> everyone
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> else.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But
>>>>> I
>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the original
>> primary
>>>>>>> key,
>>>>>>>>> We
>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>> join and Group by implemented our own in PAPI and
>> basically
>>>>>>> not
>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely missed that in
>>>>> original
>>>>>>> DSL
>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> there and just assumed it. total brain mess up on my end.
>>>>> Will
>>>>>>>>>>>> finish
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this week.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My follow up question for you is, won't the Map stay
>> inside
>>>>>>> the
>>>>>>>>>>>> State
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the changes have
>>>>> propagated?
>>>>>>>>> Isn't
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark state store?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty, substractor is gonna
>>>>>>> return
>>>>>>>>>>>> `null`
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But there is going to
>>>>> be
>>>>>>> a
>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this store directly
>>>>> for
>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues() is a regular
>>>>> store,
>>>>>>>>>>>> satisfying
>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby / join. The
>>>>> Windowed
>>>>>>>>>>>> store is
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> keeping the values, so for the next statefull operation we
>>>>>>> would
>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we have the window
>>>>>>> store
>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the values then.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a custom group by
>>>>> before
>>>>>>>>>>>>>>>>>> repartitioning to the original primary key i think it
>> would
>>>>>>> help
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> big time in building efficient apps. Given the original
>>>>> primary
>>>>>>>>> key
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> issue I
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> understand that we do not have a solid foundation to build
>>>>> on.
>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the user. very
>>>>>>> unfortunate. I
>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>> understand the decision goes like that. I do not think its
>>>>> a
>>>>>>> good
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> decision.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Adam
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
>>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
>>>>>>> dumbreprajakta...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           please remove me from this group
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
>>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > Hi Adam,
>>>>>>>>>>>>>>>>>>>           >
>>>>>>>>>>>>>>>>>>>           > give me some time, will make such a chart.
>> last
>>>>>>> time i
>>>>>>>>>>>> didn't
>>>>>>>>>>>>>>>>>>>           get along
>>>>>>>>>>>>>>>>>>>           > well with giphy and ruined all your charts.
>>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done today
>>>>>>>>>>>>>>>>>>>           >
>>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam Bellemare wrote:
>>>>>>>>>>>>>>>>>>>           > > Hi Jan
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > > I have included a diagram of what I
>> attempted
>>>>> on
>>>>>>> the
>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           >
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>>>>>>>>>>>>>>>>>>>
>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>>>>>>>>>>>>>>>>>>>           <
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>>>>>>>>>>>>>>>>>>>
>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > > I attempted this back at the start of my own
>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>           this
>>>>>>>>>>>>>>>>>>>           > > solution, and since I could not get it to
>>>>> work I
>>>>>>> have
>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>>           discarded the
>>>>>>>>>>>>>>>>>>>           > > code. At this point in time, if you wish to
>>>>>>> continue
>>>>>>>>>>>> pursuing
>>>>>>>>>>>>>>>>>>>           for your
>>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask that you please
>>>>> create a
>>>>>>>>>>>> diagram on
>>>>>>>>>>>>>>>>>>>           the KIP
>>>>>>>>>>>>>>>>>>>           > > carefully explaining your solution. Please
>>>>> feel
>>>>>>> free
>>>>>>>>> to
>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>           the image I
>>>>>>>>>>>>>>>>>>>           > > just posted as a starting point. I am having
>>>>>>> trouble
>>>>>>>>>>>>>>>>>>>           understanding your
>>>>>>>>>>>>>>>>>>>           > > explanations but I think that a carefully
>>>>>>> constructed
>>>>>>>>>>>> diagram
>>>>>>>>>>>>>>>>>>>           will clear
>>>>>>>>>>>>>>>>>>>           > up
>>>>>>>>>>>>>>>>>>>           > > any misunderstandings. Alternately, please
>>>>> post a
>>>>>>>>>>>>>>>>>>>           comprehensive PR with
>>>>>>>>>>>>>>>>>>>           > > your solution. I can only guess at what you
>>>>>>> mean, and
>>>>>>>>>>>> since I
>>>>>>>>>>>>>>>>>>>           value my
>>>>>>>>>>>>>>>>>>>           > own
>>>>>>>>>>>>>>>>>>>           > > time as much as you value yours, I believe
>> it
>>>>> is
>>>>>>> your
>>>>>>>>>>>>>>>>>>>           responsibility to
>>>>>>>>>>>>>>>>>>>           > > provide an implementation instead of me
>>>>> trying to
>>>>>>>>> guess.
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > > Adam
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
>>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > > wrote:
>>>>>>>>>>>>>>>>>>>           > >
>>>>>>>>>>>>>>>>>>>           > >> Hi James,
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing interested. kafka
>>>>>>> streams at
>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>           point supports
>>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as long as both streams
>>>>> have
>>>>>>> the
>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>           > >> Adam is currently implementing a join
>> where a
>>>>>>> KTable
>>>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>>>           KTable can
>>>>>>>>>>>>>>>>>>>           > have
>>>>>>>>>>>>>>>>>>>           > >> a one to many relation ship (1:n). We
>> exploit
>>>>>>> that
>>>>>>>>>>>> rocksdb
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > >> datastore that keeps data sorted (At least
>>>>>>> exposes an
>>>>>>>>>>>> API to
>>>>>>>>>>>>>>>>>>>           access the
>>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted fashion).
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >> I think the technical caveats are well
>>>>>>> understood
>>>>>>>>> now
>>>>>>>>>>>> and we
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > basically
>>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API Design ( when
>> Adam
>>>>>>> sees
>>>>>>>>> my
>>>>>>>>>>>> newest
>>>>>>>>>>>>>>>>>>>           message).
>>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track record of loosing
>>>>> those
>>>>>>> kinda
>>>>>>>>>>>>>>>>>>>           arguments within
>>>>>>>>>>>>>>>>>>>           > the
>>>>>>>>>>>>>>>>>>>           > >> streams community and I have no clue why.
>> So
>>>>> I
>>>>>>>>>>>> literally
>>>>>>>>>>>>>>>>>>>           can't wait for
>>>>>>>>>>>>>>>>>>>           > you
>>>>>>>>>>>>>>>>>>>           > >> to churn through this thread and give you
>>>>>>> opinion on
>>>>>>>>>>>> how we
>>>>>>>>>>>>>>>>>>>           should
>>>>>>>>>>>>>>>>>>>           > design
>>>>>>>>>>>>>>>>>>>           > >> the return type of the oneToManyJoin and
>> how
>>>>>>> many
>>>>>>>>>>>> power we
>>>>>>>>>>>>>>>>>>>           want to give
>>>>>>>>>>>>>>>>>>>           > to
>>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity" (where simplicity
>>>>> isn't
>>>>>>>>>>>> really that
>>>>>>>>>>>>>>>>>>>           as users
>>>>>>>>>>>>>>>>>>>           > still
>>>>>>>>>>>>>>>>>>>           > >> need to understand it I argue)
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >> waiting for you to join in on the
>> discussion
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >> Best Jan
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James Kwan wrote:
>>>>>>>>>>>>>>>>>>>           > >>
>>>>>>>>>>>>>>>>>>>           > >>> I am new to this group and I found this
>>>>> subject
>>>>>>>>>>>>>>>>>>>           interesting.  Sounds
>>>>>>>>>>>>>>>>>>>           > like
>>>>>>>>>>>>>>>>>>>           > >>> you guys want to implement a join table of
>>>>> two
>>>>>>>>>>>> streams? Is
>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>           > somewhere
>>>>>>>>>>>>>>>>>>>           > >>> I can see the original requirement or
>>>>> proposal?
>>>>>>>>>>>>>>>>>>>           > >>>
>>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
>>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com <mailto:
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17, Adam Bellemare
>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing using a Windowed
>>>>> Store
>>>>>>> to
>>>>>>>>>>>> store the
>>>>>>>>>>>>>>>>>>>           highwater
>>>>>>>>>>>>>>>>>>>           > >>>>> mark.
>>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this should work
>> fine,
>>>>>>> with
>>>>>>>>> the
>>>>>>>>>>>> caveat
>>>>>>>>>>>>>>>>>>>           being that
>>>>>>>>>>>>>>>>>>>           > it
>>>>>>>>>>>>>>>>>>>           > >>>>> can
>>>>>>>>>>>>>>>>>>>           > >>>>> only resolve out-of-order arrival for up
>>>>> to
>>>>>>> the
>>>>>>>>>>>> size of
>>>>>>>>>>>>>>>>>>>           the window
>>>>>>>>>>>>>>>>>>>           > (ie:
>>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This would remove the
>>>>>>> possibility
>>>>>>>>>>>> of it
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> being
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > unbounded
>>>>>>>>>>>>>>>>>>>           > >>>>> in
>>>>>>>>>>>>>>>>>>>           > >>>>> size.
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's suggestion, I
>>>>> believe
>>>>>>> this
>>>>>>>>> is
>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>>           we will
>>>>>>>>>>>>>>>>>>>           > have
>>>>>>>>>>>>>>>>>>>           > >>>>> to
>>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement. While I do not
>>>>>>> disagree
>>>>>>>>>>>> with your
>>>>>>>>>>>>>>>>>>>           statement
>>>>>>>>>>>>>>>>>>>           > >>>>> about
>>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be additional joins done
>>>>> in a
>>>>>>>>>>>> real-world
>>>>>>>>>>>>>>>>>>>           workflow, I
>>>>>>>>>>>>>>>>>>>           > do
>>>>>>>>>>>>>>>>>>>           > >>>>> not
>>>>>>>>>>>>>>>>>>>           > >>>>> see how you can conclusively deal with
>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>> arrival
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
>>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent joins. I have
>>>>>>> attempted
>>>>>>>>> what
>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>           think you have
>>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a high-water, using
>>>>>>> groupBy and
>>>>>>>>>>>> reduce)
>>>>>>>>>>>>>>>>>>>           and found
>>>>>>>>>>>>>>>>>>>           > >>>>> that if
>>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes too quickly, or
>>>>> the
>>>>>>> load
>>>>>>>>> on
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>           stream thread
>>>>>>>>>>>>>>>>>>>           > is
>>>>>>>>>>>>>>>>>>>           > >>>>> too
>>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined messages will arrive
>>>>>>>>> out-of-order
>>>>>>>>>>>> and be
>>>>>>>>>>>>>>>>>>>           incorrectly
>>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that an intermediate
>>>>> event
>>>>>>> is
>>>>>>>>>>>>>>>>>>> represented
>>>>>>>>>>>>>>>>>>>           as the
>>>>>>>>>>>>>>>>>>>           > final
>>>>>>>>>>>>>>>>>>>           > >>>>> event.
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light on your groupBy
>>>>>>>>>>>> implementation.
>>>>>>>>>>>>>>>>>>>           There must be
>>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
>>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion where it is, I would
>>>>> just
>>>>>>> like
>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>           confirm. The idea
>>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it must be
>>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess up. I would like
>> to
>>>>>>> clarify
>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>           we draw a
>>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the scattered events
>>>>> back to
>>>>>>>>> their
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> original
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > >>>>> partitions is the only way I know how to
>>>>>>>>> conclusively
>>>>>>>>>>>> deal
>>>>>>>>>>>>>>>>>>>           with
>>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in a given time
>> frame,
>>>>>>> and to
>>>>>>>>>>>> ensure
>>>>>>>>>>>>>>>>>>>           that the
>>>>>>>>>>>>>>>>>>>           > data
>>>>>>>>>>>>>>>>>>>           > >>>>> is
>>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent with the input
>>>>> events.
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code to share that
>>>>>>> illustrates
>>>>>>>>> your
>>>>>>>>>>>>>>>>>>>           approach, I
>>>>>>>>>>>>>>>>>>>           > would
>>>>>>>>>>>>>>>>>>>           > >>>>> be
>>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it would remove any
>>>>>>>>>>>> misunderstandings
>>>>>>>>>>>>>>>>>>>           that I may
>>>>>>>>>>>>>>>>>>>           > have.
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking for my code. I
>>>>> don't
>>>>>>> have
>>>>>>>>>>>>>>>>>>>           something easily
>>>>>>>>>>>>>>>>>>>           > >>>> readable here as its bloated with
>>>>> OO-patterns.
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>> @Override
>>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K aggKey, V value, T
>>>>>>>>> aggregate)
>>>>>>>>>>>>>>>>>>>           > >>>>      {
>>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V> currentStateAsMap =
>>>>>>>>>>>> asMap(aggregate);
>>>>>>>>>>>>>>>>>>> <<
>>>>>>>>>>>>>>>>>>>           imaginary
>>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
>>>>> mapper.apply(value);
>>>>>>>>>>>>>>>>>>>           > >>>>              << this is the place where
>>>>> people
>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>           gonna have
>>>>>>>>>>>>>>>>>>>           > issues
>>>>>>>>>>>>>>>>>>>           > >>>> and why you probably couldn't do it. we
>>>>> would
>>>>>>> need
>>>>>>>>>>>> to find
>>>>>>>>>>>>>>>>>>>           a solution
>>>>>>>>>>>>>>>>>>>           > here.
>>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that yet.
>>>>>>>>>>>>>>>>>>>           > >>>>              << we propagate the field in
>>>>> the
>>>>>>>>>>>> joiner, so
>>>>>>>>>>>>>>>>>>>           that we can
>>>>>>>>>>>>>>>>>>>           > pick
>>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate. Probably you have
>>>>> not
>>>>>>>>> thought
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>           this in your
>>>>>>>>>>>>>>>>>>>           > >>>> approach right?
>>>>>>>>>>>>>>>>>>>           > >>>>              << I am very open to find a
>>>>>>> generic
>>>>>>>>>>>> solution
>>>>>>>>>>>>>>>>>>>           here. In my
>>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is broken in
>>>>>>>>> KTableImpl.GroupBy
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>           looses
>>>>>>>>>>>>>>>>>>>           > the keys
>>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the aggregate key.
>>>>>>>>>>>>>>>>>>>           > >>>>              << I abstracted it away back
>>>>>>> then way
>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>> i
>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>           > thinking
>>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That is why I didn't
>>>>>>> realize
>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>           significance here.
>>>>>>>>>>>>>>>>>>>           > >>>>              << Opinions?
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>          for (V m : current)
>>>>>>>>>>>>>>>>>>>           > >>>>          {
>>>>>>>>>>>>>>>>>>>           > >>>> currentStateAsMap.put(mapper.apply(m),
>> m);
>>>>>>>>>>>>>>>>>>>           > >>>>          }
>>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
>>>>>>>>>>>>>>>>>>>           > >>>>          {
>>>>>>>>>>>>>>>>>>>           > >>>> currentStateAsMap.put(toModifyKey,
>> value);
>>>>>>>>>>>>>>>>>>>           > >>>>          }
>>>>>>>>>>>>>>>>>>>           > >>>>          else
>>>>>>>>>>>>>>>>>>>           > >>>>          {
>>>>>>>>>>>>>>>>>>>           > >>>> currentStateAsMap.remove(toModifyKey);
>>>>>>>>>>>>>>>>>>>           > >>>> if(currentStateAsMap.isEmpty()){
>>>>>>>>>>>>>>>>>>>           > >>>>                  return null;
>>>>>>>>>>>>>>>>>>>           > >>>>              }
>>>>>>>>>>>>>>>>>>>           > >>>>          }
>>>>>>>>>>>>>>>>>>>           > >>>>          retrun
>>>>>>> asAggregateType(currentStateAsMap)
>>>>>>>>>>>>>>>>>>>           > >>>>      }
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>>
>>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
>>>>>>>>>>>>>>>>>>>           > >>>>> Adam
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan
>>>>> Filipiak
>>>>>>> <
>>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com <mailto:
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for bringing Matthias to
>>>>> speed!
>>>>>>>>>>>>>>>>>>>           > >>>>>> about the differences. I think
>> re-keying
>>>>>>> back
>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>           optional at
>>>>>>>>>>>>>>>>>>>           > >>>>>> best.
>>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return a KScatteredTable
>>>>> with
>>>>>>>>>>>> reshuffle()
>>>>>>>>>>>>>>>>>>>           returning
>>>>>>>>>>>>>>>>>>>           > >>>>>> KTable<originalKey,Joined> to make the
>>>>>>> backwards
>>>>>>>>>>>>>>>>>>>           repartitioning
>>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
>>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big favour of doing the
>>>>> out
>>>>>>> of
>>>>>>>>> order
>>>>>>>>>>>>>>>>>>>           processing using
>>>>>>>>>>>>>>>>>>>           > >>>>>> group
>>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water mark tracking.
>>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded growth is just
>>>>> scary
>>>>>>> + It
>>>>>>>>>>>> saves
>>>>>>>>>>>>>>>>>>> us
>>>>>>>>>>>>>>>>>>>           the header
>>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>> I think the abstraction of always
>>>>>>> repartitioning
>>>>>>>>>>>> back is
>>>>>>>>>>>>>>>>>>>           just not so
>>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work has been done
>>>>> before
>>>>>>> we
>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>           back and
>>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
>>>>>>>>>>>>>>>>>>>           > >>>>>> by something else afterwards is really
>>>>>>> common.
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49, Adam Bellemare
>>>>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your feedback, I do
>>>>>>> appreciate
>>>>>>>>> it!
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing would be possible,
>> it
>>>>>>> would
>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>           > deserialize
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies a runtime
>>>>>>> overhead.
>>>>>>>>> I
>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>           suggest to
>>>>>>>>>>>>>>>>>>>           > no
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid the
>>>>> overhead.
>>>>>>> If
>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > problem in
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still add name
>>>>> spacing
>>>>>>>>> later
>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go with using a
>> reserved
>>>>>>> string
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>           document it.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about the design it
>> the
>>>>>>> type of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           result KTable:
>>>>>>>>>>>>>>>>>>>           > If
>>>>>>>>>>>>>>>>>>>           > >>>>>>> I
>>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the proposal correctly,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you have table1 and
>>>>> table2
>>>>>>>>>>>> swapped.
>>>>>>>>>>>>>>>>>>>           Here is how it
>>>>>>>>>>>>>>>>>>>           > >>>>>>> works
>>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the records that contain
>>>>> the
>>>>>>>>>>>> foreign key
>>>>>>>>>>>>>>>>>>>           within their
>>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
>>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
>>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream: <A,X>, <B,Y>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is required to
>> extract
>>>>>>> the
>>>>>>>>>>>> foreign
>>>>>>>>>>>>>>>>>>> key.
>>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key mapper: ( value =>
>>>>>>> value.fk
>>>>>>>>>>>>>>>>>>>           <http://value.fk> )
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is applied to each element
>> in
>>>>>>>>> table1,
>>>>>>>>>>>> and a
>>>>>>>>>>>>>>>>>>>           new combined
>>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
>>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
>>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>,
>>>>> <A-b,
>>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
>>>>>>>>>>>>>>>>>>>           <B-c,
>>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events are
>> copartitioned
>>>>>>> with
>>>>>>>>>>>> table2:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with Partition 0:
>>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <A-a,
>>>>> (fk=A,bar=1)>,
>>>>>>> <A-b,
>>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with Partition 1:
>>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1: <B-c,
>> (fk=B,bar=3)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they can be joined
>>>>> together
>>>>>>>>> locally
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>           applying the
>>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
>>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's design and my
>>>>> design
>>>>>>>>>>>> deviate. My
>>>>>>>>>>>>>>>>>>>           design goes
>>>>>>>>>>>>>>>>>>>           > on
>>>>>>>>>>>>>>>>>>>           > >>>>>>> to
>>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data post-join and
>>>>> resolve
>>>>>>>>>>>> out-of-order
>>>>>>>>>>>>>>>>>>>           arrival of
>>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
>>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the data keyed just
>>>>> the
>>>>>>>>>>>> original key.
>>>>>>>>>>>>>>>>>>>           I do not
>>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
>>>>>>>>>>>>>>>>>>>           > >>>>>>> the
>>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of the internals
>>>>>>> outside of
>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           joinOnForeignKey
>>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does make for larger
>>>>>>> footprint,
>>>>>>>>>>>> but it
>>>>>>>>>>>>>>>>>>>           removes all
>>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
>>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving out-of-order arrivals
>> and
>>>>>>>>> handling
>>>>>>>>>>>>>>>>>>>           CombinedKeys from
>>>>>>>>>>>>>>>>>>>           > the
>>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that this makes the
>>>>>>> function
>>>>>>>>> much
>>>>>>>>>>>>>>>>>>> easier
>>>>>>>>>>>>>>>>>>>           to use.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this helps resolve your
>>>>>>>>> questions,
>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>           please feel
>>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
>>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on your mind.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM,
>>>>> Matthias J.
>>>>>>>>> Sax <
>>>>>>>>>>>>>>>>>>>           > >>>>>>> matth...@confluent.io <mailto:
>>>>>>>>>>>> matth...@confluent.io>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching up on this
>> thread. I
>>>>>>> did
>>>>>>>>> not
>>>>>>>>>>>> read
>>>>>>>>>>>>>>>>>>>           everything so
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share couple of initial
>>>>>>> thoughts:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think there is a
>> fundamental
>>>>>>>>>>>> difference
>>>>>>>>>>>>>>>>>>>           between header
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and KP-258. For 258, we
>> add
>>>>>>>>> headers
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>           changelog topic
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka Streams and nobody
>>>>>>> else is
>>>>>>>>>>>> supposed
>>>>>>>>>>>>>>>>>>>           to write
>>>>>>>>>>>>>>>>>>>           > into
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no user header are
>>>>> written
>>>>>>> into
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           changelog topic
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not conflicts.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I don't see a big issue
>>>>> with
>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>           headers within
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we document it, we can
>> have
>>>>>>> some
>>>>>>>>>>>> "reserved"
>>>>>>>>>>>>>>>>>>>           header keys
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not allowed to use when
>>>>>>> processing
>>>>>>>>>>>> data with
>>>>>>>>>>>>>>>>>>>           Kafka
>>>>>>>>>>>>>>>>>>>           > Streams.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be ok.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a safe way to avoid
>>>>>>>>> conflicts,
>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>           > headers
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in internal topics (I
>>>>> think):
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and changelog topics,
>> we
>>>>> can
>>>>>>>>>>>> namespace
>>>>>>>>>>>>>>>>>>>           all headers:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined headers are
>> namespaced
>>>>> as
>>>>>>>>>>>> "external."
>>>>>>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>>>>           headerKey
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers are namespaced as
>>>>>>>>>>>> "internal." +
>>>>>>>>>>>>>>>>>>>           headerKey
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing would be
>> possible,
>>>>> it
>>>>>>>>> would
>>>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what implies a runtime
>>>>>>> overhead.
>>>>>>>>> I
>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>           suggest to
>>>>>>>>>>>>>>>>>>>           > no
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to avoid the
>>>>> overhead.
>>>>>>> If
>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> becomes a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > problem in
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can still add name
>>>>> spacing
>>>>>>>>> later
>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern about the design it
>> the
>>>>>>> type
>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           result KTable:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the proposal correctly,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1> table1 = ...
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2> table2 = ...
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3> joinedTable =
>>>>>>>>>>>> table1.join(table2,...);
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the `joinedTable` has
>> the
>>>>>>> same
>>>>>>>>> key
>>>>>>>>>>>> as the
>>>>>>>>>>>>>>>>>>>           left input
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not work because if
>>>>> table2
>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>>>>           multiple rows
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record in table1 (what is
>>>>> the
>>>>>>> main
>>>>>>>>>>>> purpose
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>           > foreign
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result table would only
>>>>>>> contain a
>>>>>>>>>>>> single
>>>>>>>>>>>>>>>>>>>           join result,
>>>>>>>>>>>>>>>>>>>           > but
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream: <A,X>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream: <a,(A,1)>,
>>>>> <b,(A,2)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value a foreign key to
>>>>>>> table1
>>>>>>>>> key
>>>>>>>>>>>> (ie,
>>>>>>>>>>>>>>>>>>>           "A" joins).
>>>>>>>>>>>>>>>>>>>           > If
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the same key as key of
>>>>>>> table1,
>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>           implies that the
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be <A, join(X,1)>
>> or
>>>>> <A,
>>>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>>>           but not
>>>>>>>>>>>>>>>>>>>           > both.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share the same key,
>>>>> whatever
>>>>>>>>> result
>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>           we emit
>>>>>>>>>>>>>>>>>>>           > later,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the previous result.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason why Jan originally
>>>>>>> proposed
>>>>>>>>>>>> to use
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>           > combination
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of the input tables
>>>>> as
>>>>>>> key
>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           output table.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of the output table
>>>>> unique
>>>>>>> and
>>>>>>>>> we
>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>           store both in
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be <A-a, join(X,1)>,
>> <A-b,
>>>>>>>>>>>> join(X,2)>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak
>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark could be
>>>>> disregarded.
>>>>>>> The
>>>>>>>>>>>> decision
>>>>>>>>>>>>>>>>>>>           about the
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the size of the
>> aggregated
>>>>>>> map.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long maps would be
>>>>>>> unpacked
>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>           forwarded. 0
>>>>>>>>>>>>>>>>>>>           > element
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published as delete. Any
>>>>> other
>>>>>>> count
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is in "waiting for
>>>>> correct
>>>>>>>>>>>> deletes to
>>>>>>>>>>>>>>>>>>>           > arrive"-state.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I could replace
>> the
>>>>>>> second
>>>>>>>>>>>>>>>>>>>           repartition store
>>>>>>>>>>>>>>>>>>>           > and
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store with a groupBy and
>>>>>>> reduce.
>>>>>>>>>>>> However,
>>>>>>>>>>>>>>>>>>>           it looks
>>>>>>>>>>>>>>>>>>>           > like
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to store the highwater
>>>>> value
>>>>>>>>> within
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           materialized
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the arrival of out-of-order
>>>>>>> records
>>>>>>>>>>>> (assuming
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is correct...). This in effect
>> is
>>>>>>> the
>>>>>>>>> same
>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>           design I
>>>>>>>>>>>>>>>>>>>           > have
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two tables merged
>>>>> together.
>>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
>>>>>>>>>>>>>>>>>>>           >
>>>>>>>>>>>>>>>>>>>           >
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to