Actually, I've been thinking more about my feedback #1 on the method name,
and I'm not so sure it's a good idea.

Following SQL, the existing join methods are named according to handedness
(inner/outer/left). All these joins are the same cardinality (1:1 joins). I
think it would be a mistake to switch up the naming scheme and introduce a
new method named for cardinality, like "manyToOneJoin".

Can we actually just add a new overload to the existing joins? The javadoc
could explain that it's a many-to-one join, and it would be differentiated
by the presence of the "keyExtractor".

Just to be safe, if we rename the proposed "joinOnForeignKey" to just
"join", then we could rename "keyExtractor" to "foreignKeyExtractor" for
clarity.

Just to be unambiguous, including the other API feedback I gave, I'm
proposing something like this:

KTable<K, VR> join(KTable<KO, VO> other,
                   ValueMapper<V, KO> foreignKeyExtractor,
                   ValueJoiner<V, VO, VR> joiner,
                   ManyToOneJoined manyToOneJoined, // join config object
                  );

The ManyToOneJoined config object would allow setting all 4 serdes and
configuring materialization.

Thanks for your consideration,
-John

On Thu, Nov 29, 2018 at 8:14 AM John Roesler <j...@confluent.io> wrote:

> Hi all,
>
> Sorry that this discussion petered out... I think the 2.1 release caused
> an extended distraction that pushed it off everyone's radar (which was
> precisely Adam's concern). Personally, I've also had some extend
> distractions of my own that kept (and continue to keep) me preoccupied.
>
> However, calling for a vote did wake me up, so I guess Jan was on the
> right track!
>
> I've gone back and reviewed the whole KIP document and the prior
> discussion, and I'd like to offer a few thoughts:
>
> API Thoughts:
>
> 1. If I read the KIP right, you are proposing a many-to-one join. Could we
> consider naming it manyToOneJoin? Or, if you prefer, flip the design around
> and make it a oneToManyJoin?
>
> The proposed name "joinOnForeignKey" disguises the join type, and it seems
> like it might trick some people into using it for a one-to-one join. This
> would work, of course, but it would be super inefficient compared to a
> simple rekey-and-join.
>
> 2. I might have missed it, but I don't think it's specified whether it's
> an inner, outer, or left join. I'm guessing an outer join, as (neglecting
> IQ), the rest can be achieved by filtering or by handling it in the
> ValueJoiner.
>
> 3. The arg list to joinOnForeignKey doesn't look quite right.
> 3a. Regarding Serialized: There are a few different paradigms in play in
> the Streams API, so it's confusing, but instead of three Serialized args, I
> think it would be better to have one that allows (optionally) setting the 4
> incoming serdes. The result serde is defined by the Materialized. The
> incoming serdes can be optional because they might already be available on
> the source KTables, or the default serdes from the config might be
> applicable.
>
> 3b. Is the StreamPartitioner necessary? The other joins don't allow
> setting one, and it seems like it might actually be harmful, since the
> rekey operation needs to produce results that are co-partitioned with the
> "other" KTable.
>
> 4. I'm fine with the "reserved word" header, but I didn't actually follow
> what Matthias meant about namespacing requiring "deserializing" the record
> header. The headers are already Strings, so I don't think that
> deserialization is required. If we applied the namespace at source nodes
> and stripped it at sink nodes, this would be practically no overhead. The
> advantage of the namespace idea is that no public API change wrt headers
> needs to happen, and no restrictions need to be placed on users' headers.
>
> (Although I'm wondering if we can get away without the header at all...
> stay tuned)
>
> 5. I also didn't follow the discussion about the HWM table growing without
> bound. As I read it, the HWM table is effectively implementing OCC to
> resolve the problem you noted with disordering when the rekey is
> reversed... particularly notable when the FK changes. As such, it only
> needs to track the most recent "version" (the offset in the source
> partition) of each key. Therefore, it should have the same number of keys
> as the source table at all times.
>
> I see that you are aware of KIP-258, which I think might be relevant in a
> couple of ways. One: it's just about storing the timestamp in the state
> store, but the ultimate idea is to effectively use the timestamp as an OCC
> "version" to drop disordered updates. You wouldn't want to use the
> timestamp for this operation, but if you were to use a similar mechanism to
> store the source offset in the store alongside the re-keyed values, then
> you could avoid a separate table.
>
> 6. You and Jan have been thinking about this for a long time, so I've
> probably missed something here, but I'm wondering if we can avoid the HWM
> tracking at all and resolve out-of-order during a final join instead...
>
> Let's say we're joining a left table (Integer K: Letter FK, (other data))
> to a right table (Letter K: (some data)).
>
> Left table:
> 1: (A, xyz)
> 2: (B, asd)
>
> Right table:
> A: EntityA
> B: EntityB
>
> We could do a rekey as you proposed with a combined key, but not
> propagating the value at all..
> Rekey table:
> A-1: (dummy value)
> B-2: (dummy value)
>
> Which we then join with the right table to produce:
> A-1: EntityA
> B-2: EntityB
>
> Which gets rekeyed back:
> 1: A, EntityA
> 2: B, EntityB
>
> And finally we do the actual join:
> Result table:
> 1: ((A, xyz), EntityA)
> 2: ((B, asd), EntityB)
>
> The thing is that in that last join, we have the opportunity to compare
> the current FK in the left table with the incoming PK of the right table.
> If they don't match, we just drop the event, since it must be outdated.
>
> In your KIP, you gave an example in which (1: A, xyz) gets updated to (1:
> B, xyz), ultimately yielding a conundrum about whether the final state
> should be (1: null) or (1: joined-on-B). With the algorithm above, you
> would be considering (1: (B, xyz), (A, null)) vs (1: (B, xyz), (B,
> EntityB)). It seems like this does give you enough information to make the
> right choice, regardless of disordering.
>
>
> 7. Last thought... I'm a little concerned about the performance of the
> range scans when records change in the right table. You've said that you've
> been using the algorithm you presented in production for a while. Can you
> give us a sense of the performance characteristics you've observed?
>
> I could only think of one alternative, but I'm not sure if it's better or
> worse... If the first re-key only needs to preserve the original key, as I
> proposed in #6, then we could store a vector of keys in the value:
>
> Left table:
> 1: A,...
> 2: B,...
> 3: A,...
>
> Gets re-keyed:
> A: [1, 3]
> B: [2]
>
> Then, the rhs part of the join would only need a regular single-key
> lookup. Of course we have to deal with the problem of large values, as
> there's no bound on the number of lhs records that can reference rhs
> records. Offhand, I'd say we could page the values, so when one row is past
> the threshold, we append the key for the next page. Then in most cases, it
> would be a single key lookup, but for large fan-out updates, it would be
> one per (max value size)/(avg lhs key size).
>
> This seems more complex, though... Plus, I think there's some extra
> tracking we'd need to do to know when to emit a retraction. For example,
> when record 1 is deleted, the re-key table would just have (A: [3]). Some
> kind of tombstone is needed so that the join result for 1 can also be
> retracted.
>
> That's all!
>
> Thanks so much to both Adam and Jan for the thoughtful KIP. Sorry the
> discussion has been slow.
> -John
>
>
> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>> Id say you can just call the vote.
>>
>> that happens all the time, and if something comes up, it just goes back
>> to discuss.
>>
>> would not expect to much attention with another another email in this
>> thread.
>>
>> best Jan
>>
>> On 09.10.2018 13:56, Adam Bellemare wrote:
>> > Hello Contributors
>> >
>> > I know that 2.1 is about to be released, but I do need to bump this to
>> keep
>> > visibility up. I am still intending to push this through once
>> contributor
>> > feedback is given.
>> >
>> > Main points that need addressing:
>> > 1) Any way (or benefit) in structuring the current singular graph node
>> into
>> > multiple nodes? It has a whopping 25 parameters right now. I am a bit
>> fuzzy
>> > on how the optimizations are supposed to work, so I would appreciate any
>> > help on this aspect.
>> >
>> > 2) Overall strategy for joining + resolving. This thread has much
>> discourse
>> > between Jan and I between the current highwater mark proposal and a
>> groupBy
>> > + reduce proposal. I am of the opinion that we need to strictly handle
>> any
>> > chance of out-of-order data and leave none of it up to the consumer. Any
>> > comments or suggestions here would also help.
>> >
>> > 3) Anything else that you see that would prevent this from moving to a
>> vote?
>> >
>> > Thanks
>> >
>> > Adam
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
>> adam.bellem...@gmail.com>
>> > wrote:
>> >
>> >> Hi Jan
>> >>
>> >> With the Stores.windowStoreBuilder and Stores.persistentWindowStore,
>> you
>> >> actually only need to specify the amount of segments you want and how
>> large
>> >> they are. To the best of my understanding, what happens is that the
>> >> segments are automatically rolled over as new data with new timestamps
>> are
>> >> created. We use this exact functionality in some of the work done
>> >> internally at my company. For reference, this is the hopping windowed
>> store.
>> >>
>> >>
>> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
>> >>
>> >> In the code that I have provided, there are going to be two 24h
>> segments.
>> >> When a record is put into the windowStore, it will be inserted at time
>> T in
>> >> both segments. The two segments will always overlap by 12h. As time
>> goes on
>> >> and new records are added (say at time T+12h+), the oldest segment
>> will be
>> >> automatically deleted and a new segment created. The records are by
>> default
>> >> inserted with the context.timestamp(), such that it is the record
>> time, not
>> >> the clock time, which is used.
>> >>
>> >> To the best of my understanding, the timestamps are retained when
>> >> restoring from the changelog.
>> >>
>> >> Basically, this is heavy-handed way to deal with TTL at a
>> segment-level,
>> >> instead of at an individual record level.
>> >>
>> >> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <jan.filip...@trivago.com
>> >
>> >> wrote:
>> >>
>> >>> Will that work? I expected it to blow up with ClassCastException or
>> >>> similar.
>> >>>
>> >>> You either would have to specify the window you fetch/put or iterate
>> >>> across all windows the key was found in right?
>> >>>
>> >>> I just hope the window-store doesn't check stream-time under the hoods
>> >>> that would be a questionable interface.
>> >>>
>> >>> If it does: did you see my comment on checking all the windows
>> earlier?
>> >>> that would be needed to actually give reasonable time gurantees.
>> >>>
>> >>> Best
>> >>>
>> >>>
>> >>>
>> >>> On 25.09.2018 13:18, Adam Bellemare wrote:
>> >>>> Hi Jan
>> >>>>
>> >>>> Check for  " highwaterMat " in the PR. I only changed the state
>> store,
>> >>> not
>> >>>> the ProcessorSupplier.
>> >>>>
>> >>>> Thanks,
>> >>>> Adam
>> >>>>
>> >>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
>> jan.filip...@trivago.com
>> >>>>
>> >>>> wrote:
>> >>>>
>> >>>>>
>> >>>>>
>> >>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
>> >>>>>
>> >>>>>> @Guozhang
>> >>>>>>
>> >>>>>> Thanks for the information. This is indeed something that will be
>> >>>>>> extremely
>> >>>>>> useful for this KIP.
>> >>>>>>
>> >>>>>> @Jan
>> >>>>>> Thanks for your explanations. That being said, I will not be moving
>> >>> ahead
>> >>>>>> with an implementation using reshuffle/groupBy solution as you
>> >>> propose.
>> >>>>>> That being said, if you wish to implement it yourself off of my
>> >>> current PR
>> >>>>>> and submit it as a competitive alternative, I would be more than
>> >>> happy to
>> >>>>>> help vet that as an alternate solution. As it stands right now, I
>> do
>> >>> not
>> >>>>>> really have more time to invest into alternatives without there
>> being
>> >>> a
>> >>>>>> strong indication from the binding voters which they would prefer.
>> >>>>>>
>> >>>>>>
>> >>>>> Hey, total no worries. I think I personally gave up on the streams
>> DSL
>> >>> for
>> >>>>> some time already, otherwise I would have pulled this KIP through
>> >>> already.
>> >>>>> I am currently reimplementing my own DSL based on PAPI.
>> >>>>>
>> >>>>>
>> >>>>>> I will look at finishing up my PR with the windowed state store in
>> the
>> >>>>>> next
>> >>>>>> week or so, exercising it via tests, and then I will come back for
>> >>> final
>> >>>>>> discussions. In the meantime, I hope that any of the binding voters
>> >>> could
>> >>>>>> take a look at the KIP in the wiki. I have updated it according to
>> the
>> >>>>>> latest plan:
>> >>>>>>
>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> >>>>>> Support+non-key+joining+in+KTable
>> >>>>>>
>> >>>>>> I have also updated the KIP PR to use a windowed store. This could
>> be
>> >>>>>> replaced by the results of KIP-258 whenever they are completed.
>> >>>>>> https://github.com/apache/kafka/pull/5527
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>>
>> >>>>>> Adam
>> >>>>>>
>> >>>>>
>> >>>>> Is the HighWatermarkResolverProccessorsupplier already updated in
>> the
>> >>> PR?
>> >>>>> expected it to change to Windowed<K>,Long Missing something?
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com
>> >
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>> Correction on my previous email: KAFKA-5533 is the wrong link, as
>> it
>> >>> is
>> >>>>>>> for
>> >>>>>>> corresponding changelog mechanisms. But as part of KIP-258 we do
>> >>> want to
>> >>>>>>> have "handling out-of-order data for source KTable" such that
>> >>> instead of
>> >>>>>>> blindly apply the updates to the materialized store, i.e.
>> following
>> >>>>>>> offset
>> >>>>>>> ordering, we will reject updates that are older than the current
>> >>> key's
>> >>>>>>> timestamps, i.e. following timestamp ordering.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Guozhang
>> >>>>>>>
>> >>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <
>> wangg...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Hello Adam,
>> >>>>>>>>
>> >>>>>>>> Thanks for the explanation. Regarding the final step (i.e. the
>> high
>> >>>>>>>> watermark store, now altered to be replaced with a window
>> store), I
>> >>>>>>>> think
>> >>>>>>>> another current on-going KIP may actually help:
>> >>>>>>>>
>> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> >>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> This is for adding the timestamp into a key-value store (i.e.
>> only
>> >>> for
>> >>>>>>>> non-windowed KTable), and then one of its usage, as described in
>> >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that we can
>> >>> then
>> >>>>>>>> "reject" updates from the source topics if its timestamp is
>> smaller
>> >>> than
>> >>>>>>>> the current key's latest update timestamp. I think it is very
>> >>> similar to
>> >>>>>>>> what you have in mind for high watermark based filtering, while
>> you
>> >>> only
>> >>>>>>>> need to make sure that the timestamps of the joining records are
>> >>>>>>>>
>> >>>>>>> correctly
>> >>>>>>>
>> >>>>>>>> inherited though the whole topology to the final stage.
>> >>>>>>>>
>> >>>>>>>> Note that this KIP is for key-value store and hence non-windowed
>> >>> KTables
>> >>>>>>>> only, but for windowed KTables we do not really have a good
>> support
>> >>> for
>> >>>>>>>> their joins anyways (
>> >>> https://issues.apache.org/jira/browse/KAFKA-7107)
>> >>>>>>>> I
>> >>>>>>>> think we can just consider non-windowed KTable-KTable non-key
>> joins
>> >>> for
>> >>>>>>>> now. In which case, KIP-258 should help.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <
>> >>> jan.filip...@trivago.com
>> >>>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Guozhang
>> >>>>>>>>>>
>> >>>>>>>>>> Current highwater mark implementation would grow endlessly
>> based
>> >>> on
>> >>>>>>>>>> primary key of original event. It is a pair of (<this table
>> >>> primary
>> >>>>>>>>>>
>> >>>>>>>>> key>,
>> >>>>>>>
>> >>>>>>>> <highest offset seen for that key>). This is used to
>> differentiate
>> >>>>>>>>>>
>> >>>>>>>>> between
>> >>>>>>>
>> >>>>>>>> late arrivals and new updates. My newest proposal would be to
>> >>> replace
>> >>>>>>>>>>
>> >>>>>>>>> it
>> >>>>>>>
>> >>>>>>>> with a Windowed state store of Duration N. This would allow the
>> same
>> >>>>>>>>>> behaviour, but cap the size based on time. This should allow
>> for
>> >>> all
>> >>>>>>>>>> late-arriving events to be processed, and should be
>> customizable
>> >>> by
>> >>>>>>>>>> the
>> >>>>>>>>>> user to tailor to their own needs (ie: perhaps just 10 minutes
>> of
>> >>>>>>>>>>
>> >>>>>>>>> window,
>> >>>>>>>
>> >>>>>>>> or perhaps 7 days...).
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Adam, using time based retention can do the trick here. Even
>> >>> if I
>> >>>>>>>>> would still like to see the automatic repartitioning optional
>> >>> since I
>> >>>>>>>>>
>> >>>>>>>> would
>> >>>>>>>
>> >>>>>>>> just reshuffle again. With windowed store I am a little bit
>> >>> sceptical
>> >>>>>>>>>
>> >>>>>>>> about
>> >>>>>>>
>> >>>>>>>> how to determine the window. So esentially one could run into
>> >>> problems
>> >>>>>>>>>
>> >>>>>>>> when
>> >>>>>>>
>> >>>>>>>> the rapid change happens near a window border. I will check you
>> >>>>>>>>> implementation in detail, if its problematic, we could still
>> check
>> >>>>>>>>> _all_
>> >>>>>>>>> windows on read with not to bad performance impact I guess. Will
>> >>> let
>> >>>>>>>>> you
>> >>>>>>>>> know if the implementation would be correct as is. I wouldn't
>> not
>> >>> like
>> >>>>>>>>>
>> >>>>>>>> to
>> >>>>>>>
>> >>>>>>>> assume that: offset(A) < offset(B) => timestamp(A)  <
>> timestamp(B).
>> >>> I
>> >>>>>>>>>
>> >>>>>>>> think
>> >>>>>>>
>> >>>>>>>> we can't expect that.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> @Jan
>> >>>>>>>>>> I believe I understand what you mean now - thanks for the
>> >>> diagram, it
>> >>>>>>>>>> did really help. You are correct that I do not have the
>> original
>> >>>>>>>>>>
>> >>>>>>>>> primary
>> >>>>>>>
>> >>>>>>>> key available, and I can see that if it was available then you
>> >>> would be
>> >>>>>>>>>> able to add and remove events from the Map. That being said, I
>> >>>>>>>>>>
>> >>>>>>>>> encourage
>> >>>>>>>
>> >>>>>>>> you to finish your diagrams / charts just for clarity for
>> everyone
>> >>>>>>>>>>
>> >>>>>>>>> else.
>> >>>>>>>
>> >>>>>>>>
>> >>>>>>>>>> Yeah 100%, this giphy thing is just really hard work. But I
>> >>> understand
>> >>>>>>>>>>
>> >>>>>>>>> the benefits for the rest. Sorry about the original primary
>> key, We
>> >>>>>>>>> have
>> >>>>>>>>> join and Group by implemented our own in PAPI and basically not
>> >>> using
>> >>>>>>>>>
>> >>>>>>>> any
>> >>>>>>>
>> >>>>>>>> DSL (Just the abstraction). Completely missed that in original
>> DSL
>> >>> its
>> >>>>>>>>>
>> >>>>>>>> not
>> >>>>>>>
>> >>>>>>>> there and just assumed it. total brain mess up on my end. Will
>> >>> finish
>> >>>>>>>>>
>> >>>>>>>> the
>> >>>>>>>
>> >>>>>>>> chart as soon as i get a quite evening this week.
>> >>>>>>>>>
>> >>>>>>>>> My follow up question for you is, won't the Map stay inside the
>> >>> State
>> >>>>>>>>>
>> >>>>>>>>>> Store indefinitely after all of the changes have propagated?
>> Isn't
>> >>>>>>>>>> this
>> >>>>>>>>>> effectively the same as a highwater mark state store?
>> >>>>>>>>>>
>> >>>>>>>>>> Thing is that if the map is empty, substractor is gonna return
>> >>> `null`
>> >>>>>>>>>
>> >>>>>>>> and
>> >>>>>>>
>> >>>>>>>> the key is removed from the keyspace. But there is going to be a
>> >>> store
>> >>>>>>>>> 100%, the good thing is that I can use this store directly for
>> >>>>>>>>> materialize() / enableSendingOldValues() is a regular store,
>> >>> satisfying
>> >>>>>>>>> all gurantees needed for further groupby / join. The Windowed
>> >>> store is
>> >>>>>>>>>
>> >>>>>>>> not
>> >>>>>>>
>> >>>>>>>> keeping the values, so for the next statefull operation we would
>> >>>>>>>>> need to instantiate an extra store. or we have the window store
>> >>> also
>> >>>>>>>>>
>> >>>>>>>> have
>> >>>>>>>
>> >>>>>>>> the values then.
>> >>>>>>>>>
>> >>>>>>>>> Long story short. if we can flip in a custom group by before
>> >>>>>>>>> repartitioning to the original primary key i think it would help
>> >>> the
>> >>>>>>>>>
>> >>>>>>>> users
>> >>>>>>>
>> >>>>>>>> big time in building efficient apps. Given the original primary
>> key
>> >>>>>>>>>
>> >>>>>>>> issue I
>> >>>>>>>
>> >>>>>>>> understand that we do not have a solid foundation to build on.
>> >>>>>>>>> Leaving primary key carry along to the user. very unfortunate. I
>> >>> could
>> >>>>>>>>> understand the decision goes like that. I do not think its a
>> good
>> >>>>>>>>>
>> >>>>>>>> decision.
>> >>>>>>>
>> >>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks
>> >>>>>>>>>> Adam
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
>> >>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
>> dumbreprajakta...@gmail.com>>
>> >>>>>>>>>>
>> >>>>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>>
>> >>>>>>>>>>        please remove me from this group
>> >>>>>>>>>>
>> >>>>>>>>>>        On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
>> >>>>>>>>>>        <jan.filip...@trivago.com <mailto:
>> jan.filip...@trivago.com
>> >>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>        wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>        > Hi Adam,
>> >>>>>>>>>>        >
>> >>>>>>>>>>        > give me some time, will make such a chart. last time i
>> >>> didn't
>> >>>>>>>>>>        get along
>> >>>>>>>>>>        > well with giphy and ruined all your charts.
>> >>>>>>>>>>        > Hopefully i can get it done today
>> >>>>>>>>>>        >
>> >>>>>>>>>>        > On 08.09.2018 16:00, Adam Bellemare wrote:
>> >>>>>>>>>>        > > Hi Jan
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > > I have included a diagram of what I attempted on the
>> >>> KIP.
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        >
>> >>>>>>>>>>
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>> >>>>>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>> >>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
>> >>>>>>>>>>        <
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>> >>>>>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>> >>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > > I attempted this back at the start of my own
>> >>> implementation
>> >>>>>>>>>> of
>> >>>>>>>>>>        this
>> >>>>>>>>>>        > > solution, and since I could not get it to work I
>> have
>> >>> since
>> >>>>>>>>>>        discarded the
>> >>>>>>>>>>        > > code. At this point in time, if you wish to continue
>> >>> pursuing
>> >>>>>>>>>>        for your
>> >>>>>>>>>>        > > groupBy solution, I ask that you please create a
>> >>> diagram on
>> >>>>>>>>>>        the KIP
>> >>>>>>>>>>        > > carefully explaining your solution. Please feel
>> free to
>> >>> use
>> >>>>>>>>>>        the image I
>> >>>>>>>>>>        > > just posted as a starting point. I am having trouble
>> >>>>>>>>>>        understanding your
>> >>>>>>>>>>        > > explanations but I think that a carefully
>> constructed
>> >>> diagram
>> >>>>>>>>>>        will clear
>> >>>>>>>>>>        > up
>> >>>>>>>>>>        > > any misunderstandings. Alternately, please post a
>> >>>>>>>>>>        comprehensive PR with
>> >>>>>>>>>>        > > your solution. I can only guess at what you mean,
>> and
>> >>> since I
>> >>>>>>>>>>        value my
>> >>>>>>>>>>        > own
>> >>>>>>>>>>        > > time as much as you value yours, I believe it is
>> your
>> >>>>>>>>>>        responsibility to
>> >>>>>>>>>>        > > provide an implementation instead of me trying to
>> guess.
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > > Adam
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
>> >>>>>>>>>>        <jan.filip...@trivago.com <mailto:
>> jan.filip...@trivago.com
>> >>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>        > > wrote:
>> >>>>>>>>>>        > >
>> >>>>>>>>>>        > >> Hi James,
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >> nice to see you beeing interested. kafka streams at
>> >>> this
>> >>>>>>>>>>        point supports
>> >>>>>>>>>>        > >> all sorts of joins as long as both streams have the
>> >>> same
>> >>>>>>>>>> key.
>> >>>>>>>>>>        > >> Adam is currently implementing a join where a
>> KTable
>> >>> and a
>> >>>>>>>>>>        KTable can
>> >>>>>>>>>>        > have
>> >>>>>>>>>>        > >> a one to many relation ship (1:n). We exploit that
>> >>> rocksdb
>> >>>>>>>>>> is
>> >>>>>>>>>>
>> >>>>>>>>> a
>> >>>>>>>
>> >>>>>>>>        > >> datastore that keeps data sorted (At least exposes an
>> >>> API to
>> >>>>>>>>>>        access the
>> >>>>>>>>>>        > >> stored data in a sorted fashion).
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >> I think the technical caveats are well understood
>> now
>> >>> and we
>> >>>>>>>>>>
>> >>>>>>>>> are
>> >>>>>>>
>> >>>>>>>>        > basically
>> >>>>>>>>>>        > >> down to philosophy and API Design ( when Adam sees
>> my
>> >>> newest
>> >>>>>>>>>>        message).
>> >>>>>>>>>>        > >> I have a lengthy track record of loosing those
>> kinda
>> >>>>>>>>>>        arguments within
>> >>>>>>>>>>        > the
>> >>>>>>>>>>        > >> streams community and I have no clue why. So I
>> >>> literally
>> >>>>>>>>>>        can't wait for
>> >>>>>>>>>>        > you
>> >>>>>>>>>>        > >> to churn through this thread and give you opinion
>> on
>> >>> how we
>> >>>>>>>>>>        should
>> >>>>>>>>>>        > design
>> >>>>>>>>>>        > >> the return type of the oneToManyJoin and how many
>> >>> power we
>> >>>>>>>>>>        want to give
>> >>>>>>>>>>        > to
>> >>>>>>>>>>        > >> the user vs "simplicity" (where simplicity isn't
>> >>> really that
>> >>>>>>>>>>        as users
>> >>>>>>>>>>        > still
>> >>>>>>>>>>        > >> need to understand it I argue)
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >> waiting for you to join in on the discussion
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >> Best Jan
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >> On 07.09.2018 15:49, James Kwan wrote:
>> >>>>>>>>>>        > >>
>> >>>>>>>>>>        > >>> I am new to this group and I found this subject
>> >>>>>>>>>>        interesting.  Sounds
>> >>>>>>>>>>        > like
>> >>>>>>>>>>        > >>> you guys want to implement a join table of two
>> >>> streams? Is
>> >>>>>>>>>> there
>> >>>>>>>>>>        > somewhere
>> >>>>>>>>>>        > >>> I can see the original requirement or proposal?
>> >>>>>>>>>>        > >>>
>> >>>>>>>>>>        > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
>> >>>>>>>>>>        <jan.filip...@trivago.com <mailto:
>> jan.filip...@trivago.com
>> >>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>        > >>>> wrote:
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>> I'm currently testing using a Windowed Store to
>> >>> store the
>> >>>>>>>>>>        highwater
>> >>>>>>>>>>        > >>>>> mark.
>> >>>>>>>>>>        > >>>>> By all indications this should work fine, with
>> the
>> >>> caveat
>> >>>>>>>>>>        being that
>> >>>>>>>>>>        > it
>> >>>>>>>>>>        > >>>>> can
>> >>>>>>>>>>        > >>>>> only resolve out-of-order arrival for up to the
>> >>> size of
>> >>>>>>>>>>        the window
>> >>>>>>>>>>        > (ie:
>> >>>>>>>>>>        > >>>>> 24h, 72h, etc). This would remove the
>> possibility
>> >>> of it
>> >>>>>>>>>>
>> >>>>>>>>> being
>> >>>>>>>
>> >>>>>>>>        > unbounded
>> >>>>>>>>>>        > >>>>> in
>> >>>>>>>>>>        > >>>>> size.
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>> With regards to Jan's suggestion, I believe
>> this is
>> >>> where
>> >>>>>>>>>>        we will
>> >>>>>>>>>>        > have
>> >>>>>>>>>>        > >>>>> to
>> >>>>>>>>>>        > >>>>> remain in disagreement. While I do not disagree
>> >>> with your
>> >>>>>>>>>>        statement
>> >>>>>>>>>>        > >>>>> about
>> >>>>>>>>>>        > >>>>> there likely to be additional joins done in a
>> >>> real-world
>> >>>>>>>>>>        workflow, I
>> >>>>>>>>>>        > do
>> >>>>>>>>>>        > >>>>> not
>> >>>>>>>>>>        > >>>>> see how you can conclusively deal with
>> out-of-order
>> >>>>>>>>>> arrival
>> >>>>>>>>>> of
>> >>>>>>>>>>        > >>>>> foreign-key
>> >>>>>>>>>>        > >>>>> changes and subsequent joins. I have attempted
>> what
>> >>> I
>> >>>>>>>>>>        think you have
>> >>>>>>>>>>        > >>>>> proposed (without a high-water, using groupBy
>> and
>> >>> reduce)
>> >>>>>>>>>>        and found
>> >>>>>>>>>>        > >>>>> that if
>> >>>>>>>>>>        > >>>>> the foreign key changes too quickly, or the
>> load on
>> >>> a
>> >>>>>>>>>>        stream thread
>> >>>>>>>>>>        > is
>> >>>>>>>>>>        > >>>>> too
>> >>>>>>>>>>        > >>>>> high, the joined messages will arrive
>> out-of-order
>> >>> and be
>> >>>>>>>>>>        incorrectly
>> >>>>>>>>>>        > >>>>> propagated, such that an intermediate event is
>> >>>>>>>>>> represented
>> >>>>>>>>>>        as the
>> >>>>>>>>>>        > final
>> >>>>>>>>>>        > >>>>> event.
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>> Can you shed some light on your groupBy
>> >>> implementation.
>> >>>>>>>>>>        There must be
>> >>>>>>>>>>        > >>>> some sort of flaw in it.
>> >>>>>>>>>>        > >>>> I have a suspicion where it is, I would just
>> like to
>> >>>>>>>>>>        confirm. The idea
>> >>>>>>>>>>        > >>>> is bullet proof and it must be
>> >>>>>>>>>>        > >>>> an implementation mess up. I would like to
>> clarify
>> >>> before
>> >>>>>>>>>>        we draw a
>> >>>>>>>>>>        > >>>> conclusion.
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>    Repartitioning the scattered events back to
>> their
>> >>>>>>>>>>
>> >>>>>>>>> original
>> >>>>>>>
>> >>>>>>>>        > >>>>> partitions is the only way I know how to
>> conclusively
>> >>> deal
>> >>>>>>>>>>        with
>> >>>>>>>>>>        > >>>>> out-of-order events in a given time frame, and
>> to
>> >>> ensure
>> >>>>>>>>>>        that the
>> >>>>>>>>>>        > data
>> >>>>>>>>>>        > >>>>> is
>> >>>>>>>>>>        > >>>>> eventually consistent with the input events.
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>> If you have some code to share that illustrates
>> your
>> >>>>>>>>>>        approach, I
>> >>>>>>>>>>        > would
>> >>>>>>>>>>        > >>>>> be
>> >>>>>>>>>>        > >>>>> very grateful as it would remove any
>> >>> misunderstandings
>> >>>>>>>>>>        that I may
>> >>>>>>>>>>        > have.
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>> ah okay you were looking for my code. I don't
>> have
>> >>>>>>>>>>        something easily
>> >>>>>>>>>>        > >>>> readable here as its bloated with OO-patterns.
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>> its anyhow trivial:
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>> @Override
>> >>>>>>>>>>        > >>>>      public T apply(K aggKey, V value, T
>> aggregate)
>> >>>>>>>>>>        > >>>>      {
>> >>>>>>>>>>        > >>>>          Map<U, V> currentStateAsMap =
>> >>> asMap(aggregate);
>> >>>>>>>>>> <<
>> >>>>>>>>>>        imaginary
>> >>>>>>>>>>        > >>>>          U toModifyKey = mapper.apply(value);
>> >>>>>>>>>>        > >>>>              << this is the place where people
>> >>> actually
>> >>>>>>>>>>        gonna have
>> >>>>>>>>>>        > issues
>> >>>>>>>>>>        > >>>> and why you probably couldn't do it. we would
>> need
>> >>> to find
>> >>>>>>>>>>        a solution
>> >>>>>>>>>>        > here.
>> >>>>>>>>>>        > >>>> I didn't realize that yet.
>> >>>>>>>>>>        > >>>>              << we propagate the field in the
>> >>> joiner, so
>> >>>>>>>>>>        that we can
>> >>>>>>>>>>        > pick
>> >>>>>>>>>>        > >>>> it up in an aggregate. Probably you have not
>> thought
>> >>> of
>> >>>>>>>>>>        this in your
>> >>>>>>>>>>        > >>>> approach right?
>> >>>>>>>>>>        > >>>>              << I am very open to find a generic
>> >>> solution
>> >>>>>>>>>>        here. In my
>> >>>>>>>>>>        > >>>> honest opinion this is broken in
>> KTableImpl.GroupBy
>> >>> that
>> >>>>>>>>>> it
>> >>>>>>>>>>        looses
>> >>>>>>>>>>        > the keys
>> >>>>>>>>>>        > >>>> and only maintains the aggregate key.
>> >>>>>>>>>>        > >>>>              << I abstracted it away back then
>> way
>> >>> before
>> >>>>>>>>>> i
>> >>>>>>>>>> was
>> >>>>>>>>>>        > thinking
>> >>>>>>>>>>        > >>>> of oneToMany join. That is why I didn't realize
>> its
>> >>>>>>>>>>        significance here.
>> >>>>>>>>>>        > >>>>              << Opinions?
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>          for (V m : current)
>> >>>>>>>>>>        > >>>>          {
>> >>>>>>>>>>        > >>>> currentStateAsMap.put(mapper.apply(m), m);
>> >>>>>>>>>>        > >>>>          }
>> >>>>>>>>>>        > >>>>          if (isAdder)
>> >>>>>>>>>>        > >>>>          {
>> >>>>>>>>>>        > >>>> currentStateAsMap.put(toModifyKey, value);
>> >>>>>>>>>>        > >>>>          }
>> >>>>>>>>>>        > >>>>          else
>> >>>>>>>>>>        > >>>>          {
>> >>>>>>>>>>        > >>>> currentStateAsMap.remove(toModifyKey);
>> >>>>>>>>>>        > >>>> if(currentStateAsMap.isEmpty()){
>> >>>>>>>>>>        > >>>>                  return null;
>> >>>>>>>>>>        > >>>>              }
>> >>>>>>>>>>        > >>>>          }
>> >>>>>>>>>>        > >>>>          retrun
>> asAggregateType(currentStateAsMap)
>> >>>>>>>>>>        > >>>>      }
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>>
>> >>>>>>>>>>        > >>>> Thanks,
>> >>>>>>>>>>        > >>>>> Adam
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
>> >>>>>>>>>>        > jan.filip...@trivago.com <mailto:
>> jan.filip...@trivago.com
>> >>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>        > >>>>> wrote:
>> >>>>>>>>>>        > >>>>>
>> >>>>>>>>>>        > >>>>> Thanks Adam for bringing Matthias to speed!
>> >>>>>>>>>>        > >>>>>> about the differences. I think re-keying back
>> >>> should be
>> >>>>>>>>>>        optional at
>> >>>>>>>>>>        > >>>>>> best.
>> >>>>>>>>>>        > >>>>>> I would say we return a KScatteredTable with
>> >>> reshuffle()
>> >>>>>>>>>>        returning
>> >>>>>>>>>>        > >>>>>> KTable<originalKey,Joined> to make the
>> backwards
>> >>>>>>>>>>        repartitioning
>> >>>>>>>>>>        > >>>>>> optional.
>> >>>>>>>>>>        > >>>>>> I am also in a big favour of doing the out of
>> order
>> >>>>>>>>>>        processing using
>> >>>>>>>>>>        > >>>>>> group
>> >>>>>>>>>>        > >>>>>> by instead high water mark tracking.
>> >>>>>>>>>>        > >>>>>> Just because unbounded growth is just scary +
>> It
>> >>> saves
>> >>>>>>>>>> us
>> >>>>>>>>>>        the header
>> >>>>>>>>>>        > >>>>>> stuff.
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>> I think the abstraction of always
>> repartitioning
>> >>> back is
>> >>>>>>>>>>        just not so
>> >>>>>>>>>>        > >>>>>> strong. Like the work has been done before we
>> >>> partition
>> >>>>>>>>>>        back and
>> >>>>>>>>>>        > >>>>>> grouping
>> >>>>>>>>>>        > >>>>>> by something else afterwards is really common.
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
>> >>>>>>>>>>        > >>>>>>
>> >>>>>>>>>>        > >>>>>> Hi Matthias
>> >>>>>>>>>>        > >>>>>>> Thank you for your feedback, I do appreciate
>> it!
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> While name spacing would be possible, it would
>> >>> require
>> >>>>>>>>>> to
>> >>>>>>>>>>        > deserialize
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>> user headers what implies a runtime
>> overhead. I
>> >>> would
>> >>>>>>>>>>        suggest to
>> >>>>>>>>>>        > no
>> >>>>>>>>>>        > >>>>>>>> namespace for now to avoid the overhead. If
>> this
>> >>>>>>>>>>
>> >>>>>>>>> becomes a
>> >>>>>>>
>> >>>>>>>>        > problem in
>> >>>>>>>>>>        > >>>>>>>> the future, we can still add name spacing
>> later
>> >>> on.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Agreed. I will go with using a reserved
>> string
>> >>> and
>> >>>>>>>>>>        document it.
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> My main concern about the design it the type
>> of
>> >>> the
>> >>>>>>>>>>        result KTable:
>> >>>>>>>>>>        > If
>> >>>>>>>>>>        > >>>>>>> I
>> >>>>>>>>>>        > >>>>>>> understood the proposal correctly,
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> In your example, you have table1 and table2
>> >>> swapped.
>> >>>>>>>>>>        Here is how it
>> >>>>>>>>>>        > >>>>>>> works
>> >>>>>>>>>>        > >>>>>>> currently:
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> 1) table1 has the records that contain the
>> >>> foreign key
>> >>>>>>>>>>        within their
>> >>>>>>>>>>        > >>>>>>> value.
>> >>>>>>>>>>        > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
>> >>>>>>>>>> <b,(fk=A,bar=2)>,
>> >>>>>>>>>>        > >>>>>>> <c,(fk=B,bar=3)>
>> >>>>>>>>>>        > >>>>>>> table2 input stream: <A,X>, <B,Y>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> 2) A Value mapper is required to extract the
>> >>> foreign
>> >>>>>>>>>> key.
>> >>>>>>>>>>        > >>>>>>> table1 foreign key mapper: ( value =>
>> value.fk
>> >>>>>>>>>>        <http://value.fk> )
>> >>>>>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> The mapper is applied to each element in
>> table1,
>> >>> and a
>> >>>>>>>>>>        new combined
>> >>>>>>>>>>        > >>>>>>> key is
>> >>>>>>>>>>        > >>>>>>> made:
>> >>>>>>>>>>        > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b,
>> >>>>>>>>>> (fk=A,bar=2)>,
>> >>>>>>>>>>        <B-c,
>> >>>>>>>>>>        > >>>>>>> (fk=B,bar=3)>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> 3) The rekeyed events are copartitioned with
>> >>> table2:
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> a) Stream Thread with Partition 0:
>> >>>>>>>>>>        > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>,
>> <A-b,
>> >>>>>>>>>>        (fk=A,bar=2)>
>> >>>>>>>>>>        > >>>>>>> Table2: <A,X>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> b) Stream Thread with Partition 1:
>> >>>>>>>>>>        > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>> >>>>>>>>>>        > >>>>>>> Table2: <B,Y>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> 4) From here, they can be joined together
>> locally
>> >>> by
>> >>>>>>>>>>        applying the
>> >>>>>>>>>>        > >>>>>>> joiner
>> >>>>>>>>>>        > >>>>>>> function.
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> At this point, Jan's design and my design
>> >>> deviate. My
>> >>>>>>>>>>        design goes
>> >>>>>>>>>>        > on
>> >>>>>>>>>>        > >>>>>>> to
>> >>>>>>>>>>        > >>>>>>> repartition the data post-join and resolve
>> >>> out-of-order
>> >>>>>>>>>>        arrival of
>> >>>>>>>>>>        > >>>>>>> records,
>> >>>>>>>>>>        > >>>>>>> finally returning the data keyed just the
>> >>> original key.
>> >>>>>>>>>>        I do not
>> >>>>>>>>>>        > >>>>>>> expose
>> >>>>>>>>>>        > >>>>>>> the
>> >>>>>>>>>>        > >>>>>>> CombinedKey or any of the internals outside
>> of the
>> >>>>>>>>>>        joinOnForeignKey
>> >>>>>>>>>>        > >>>>>>> function. This does make for larger footprint,
>> >>> but it
>> >>>>>>>>>>        removes all
>> >>>>>>>>>>        > >>>>>>> agency
>> >>>>>>>>>>        > >>>>>>> for resolving out-of-order arrivals and
>> handling
>> >>>>>>>>>>        CombinedKeys from
>> >>>>>>>>>>        > the
>> >>>>>>>>>>        > >>>>>>> user. I believe that this makes the function
>> much
>> >>>>>>>>>> easier
>> >>>>>>>>>>        to use.
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> Let me know if this helps resolve your
>> questions,
>> >>> and
>> >>>>>>>>>>        please feel
>> >>>>>>>>>>        > >>>>>>> free to
>> >>>>>>>>>>        > >>>>>>> add anything else on your mind.
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> Thanks again,
>> >>>>>>>>>>        > >>>>>>> Adam
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J.
>> Sax <
>> >>>>>>>>>>        > >>>>>>> matth...@confluent.io <mailto:
>> >>> matth...@confluent.io>>
>> >>>>>>>>>>
>> >>>>>>>>>>        > >>>>>>> wrote:
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>> Hi,
>> >>>>>>>>>>        > >>>>>>>
>> >>>>>>>>>>        > >>>>>>>> I am just catching up on this thread. I did
>> not
>> >>> read
>> >>>>>>>>>>        everything so
>> >>>>>>>>>>        > >>>>>>>> far,
>> >>>>>>>>>>        > >>>>>>>> but want to share couple of initial thoughts:
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Headers: I think there is a fundamental
>> >>> difference
>> >>>>>>>>>>        between header
>> >>>>>>>>>>        > >>>>>>>> usage
>> >>>>>>>>>>        > >>>>>>>> in this KIP and KP-258. For 258, we add
>> headers
>> >>> to
>> >>>>>>>>>>        changelog topic
>> >>>>>>>>>>        > >>>>>>>> that
>> >>>>>>>>>>        > >>>>>>>> are owned by Kafka Streams and nobody else is
>> >>> supposed
>> >>>>>>>>>>        to write
>> >>>>>>>>>>        > into
>> >>>>>>>>>>        > >>>>>>>> them. In fact, no user header are written
>> into
>> >>> the
>> >>>>>>>>>>        changelog topic
>> >>>>>>>>>>        > >>>>>>>> and
>> >>>>>>>>>>        > >>>>>>>> thus, there are not conflicts.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Nevertheless, I don't see a big issue with
>> using
>> >>>>>>>>>>        headers within
>> >>>>>>>>>>        > >>>>>>>> Streams.
>> >>>>>>>>>>        > >>>>>>>> As long as we document it, we can have some
>> >>> "reserved"
>> >>>>>>>>>>        header keys
>> >>>>>>>>>>        > >>>>>>>> and
>> >>>>>>>>>>        > >>>>>>>> users are not allowed to use when processing
>> >>> data with
>> >>>>>>>>>>        Kafka
>> >>>>>>>>>>        > Streams.
>> >>>>>>>>>>        > >>>>>>>> IMHO, this should be ok.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> I think there is a safe way to avoid
>> conflicts,
>> >>> since
>> >>>>>>>>>> these
>> >>>>>>>>>>        > headers
>> >>>>>>>>>>        > >>>>>>>> are
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>> only needed in internal topics (I think):
>> >>>>>>>>>>        > >>>>>>>>> For internal and changelog topics, we can
>> >>> namespace
>> >>>>>>>>>>        all headers:
>> >>>>>>>>>>        > >>>>>>>>> * user-defined headers are namespaced as
>> >>> "external."
>> >>>>>>>>>> +
>> >>>>>>>>>>        headerKey
>> >>>>>>>>>>        > >>>>>>>>> * internal headers are namespaced as
>> >>> "internal." +
>> >>>>>>>>>>        headerKey
>> >>>>>>>>>>        > >>>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>> While name spacing would be possible, it
>> would
>> >>>>>>>>>> require
>> >>>>>>>>>>
>> >>>>>>>>> to
>> >>>>>>>
>> >>>>>>>>        > >>>>>>>> deserialize
>> >>>>>>>>>>        > >>>>>>>> user headers what implies a runtime
>> overhead. I
>> >>> would
>> >>>>>>>>>>        suggest to
>> >>>>>>>>>>        > no
>> >>>>>>>>>>        > >>>>>>>> namespace for now to avoid the overhead. If
>> this
>> >>>>>>>>>>
>> >>>>>>>>> becomes a
>> >>>>>>>
>> >>>>>>>>        > problem in
>> >>>>>>>>>>        > >>>>>>>> the future, we can still add name spacing
>> later
>> >>> on.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> My main concern about the design it the type
>> of
>> >>> the
>> >>>>>>>>>>        result KTable:
>> >>>>>>>>>>        > >>>>>>>> If I
>> >>>>>>>>>>        > >>>>>>>> understood the proposal correctly,
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> KTable<K1,V1> table1 = ...
>> >>>>>>>>>>        > >>>>>>>> KTable<K2,V2> table2 = ...
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> KTable<K1,V3> joinedTable =
>> >>> table1.join(table2,...);
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> implies that the `joinedTable` has the same
>> key
>> >>> as the
>> >>>>>>>>>>        left input
>> >>>>>>>>>>        > >>>>>>>> table.
>> >>>>>>>>>>        > >>>>>>>> IMHO, this does not work because if table2
>> >>> contains
>> >>>>>>>>>>        multiple rows
>> >>>>>>>>>>        > >>>>>>>> that
>> >>>>>>>>>>        > >>>>>>>> join with a record in table1 (what is the
>> main
>> >>> purpose
>> >>>>>>>>>>
>> >>>>>>>>> of
>> >>>>>>>
>> >>>>>>>> a
>> >>>>>>>>>>        > foreign
>> >>>>>>>>>>        > >>>>>>>> key
>> >>>>>>>>>>        > >>>>>>>> join), the result table would only contain a
>> >>> single
>> >>>>>>>>>>        join result,
>> >>>>>>>>>>        > but
>> >>>>>>>>>>        > >>>>>>>> not
>> >>>>>>>>>>        > >>>>>>>> multiple.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Example:
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> table1 input stream: <A,X>
>> >>>>>>>>>>        > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> We use table2 value a foreign key to table1
>> key
>> >>> (ie,
>> >>>>>>>>>>        "A" joins).
>> >>>>>>>>>>        > If
>> >>>>>>>>>>        > >>>>>>>> the
>> >>>>>>>>>>        > >>>>>>>> result key is the same key as key of table1,
>> this
>> >>>>>>>>>>        implies that the
>> >>>>>>>>>>        > >>>>>>>> result can either be <A, join(X,1)> or <A,
>> >>> join(X,2)>
>> >>>>>>>>>>        but not
>> >>>>>>>>>>        > both.
>> >>>>>>>>>>        > >>>>>>>> Because the share the same key, whatever
>> result
>> >>> record
>> >>>>>>>>>>        we emit
>> >>>>>>>>>>        > later,
>> >>>>>>>>>>        > >>>>>>>> overwrite the previous result.
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> This is the reason why Jan originally
>> proposed
>> >>> to use
>> >>>>>>>>>> a
>> >>>>>>>>>>        > combination
>> >>>>>>>>>>        > >>>>>>>> of
>> >>>>>>>>>>        > >>>>>>>> both primary keys of the input tables as key
>> of
>> >>> the
>> >>>>>>>>>>        output table.
>> >>>>>>>>>>        > >>>>>>>> This
>> >>>>>>>>>>        > >>>>>>>> makes the keys of the output table unique
>> and we
>> >>> can
>> >>>>>>>>>>        store both in
>> >>>>>>>>>>        > >>>>>>>> the
>> >>>>>>>>>>        > >>>>>>>> output table:
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b,
>> >>> join(X,2)>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Thoughts?
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> -Matthias
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>> >>>>>>>>>>        > >>>>>>>>
>> >>>>>>>>>>        > >>>>>>>> Just on remark here.
>> >>>>>>>>>>        > >>>>>>>>> The high-watermark could be disregarded. The
>> >>> decision
>> >>>>>>>>>>        about the
>> >>>>>>>>>>        > >>>>>>>>> forward
>> >>>>>>>>>>        > >>>>>>>>> depends on the size of the aggregated map.
>> >>>>>>>>>>        > >>>>>>>>> Only 1 element long maps would be unpacked
>> and
>> >>>>>>>>>>        forwarded. 0
>> >>>>>>>>>>        > element
>> >>>>>>>>>>        > >>>>>>>>> maps
>> >>>>>>>>>>        > >>>>>>>>> would be published as delete. Any other
>> count
>> >>>>>>>>>>        > >>>>>>>>> of map entries is in "waiting for correct
>> >>> deletes to
>> >>>>>>>>>>        > arrive"-state.
>> >>>>>>>>>>        > >>>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
>> >>>>>>>>>>        > >>>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>> It does look like I could replace the second
>> >>>>>>>>>>        repartition store
>> >>>>>>>>>>        > and
>> >>>>>>>>>>        > >>>>>>>>>> highwater store with a groupBy and reduce.
>> >>> However,
>> >>>>>>>>>>        it looks
>> >>>>>>>>>>        > like
>> >>>>>>>>>>        > >>>>>>>>>> I
>> >>>>>>>>>>        > >>>>>>>>>> would
>> >>>>>>>>>>        > >>>>>>>>>> still need to store the highwater value
>> within
>> >>> the
>> >>>>>>>>>>        materialized
>> >>>>>>>>>>        > >>>>>>>>>> store,
>> >>>>>>>>>>        > >>>>>>>>>>
>> >>>>>>>>>>        > >>>>>>>>>> to
>> >>>>>>>>>>        > >>>>>>>>> compare the arrival of out-of-order records
>> >>> (assuming
>> >>>>>>>>>>
>> >>>>>>>>> my
>> >>>>>>>
>> >>>>>>>>        > >>>>>>>>> understanding
>> >>>>>>>>>>        > >>>>>>>>> of
>> >>>>>>>>>>        > >>>>>>>>> THIS is correct...). This in effect is the
>> same
>> >>> as
>> >>>>>>>>>> the
>> >>>>>>>>>>        design I
>> >>>>>>>>>>        > have
>> >>>>>>>>>>        > >>>>>>>>> now,
>> >>>>>>>>>>        > >>>>>>>>> just with the two tables merged together.
>> >>>>>>>>>>        > >>>>>>>>>
>> >>>>>>>>>>        >
>> >>>>>>>>>>        >
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> --
>> >>>>>>> -- Guozhang
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to