Hi John and Guozhang

Ah yes, I lost that in the mix! Thanks for the convergent solutions - I do
think that the attachment that John included makes for a better design. It
should also help with overall performance as very high-cardinality foreign
keyed data (say millions of events with the same entity) will be able to
leverage the multiple nodes for join functionality instead of having it all
performed in one node. There is still a bottleneck in the right table
having to propagate all those events, but with slimmer structures, less IO
and no need to perform the join I think the throughput will be much higher
in those scenarios.

Okay, I am convinced. I will update the KIP accordingly to a Gliffy version
of John's diagram and ensure that the example flow matches correctly. Then
I can go back to working on the PR to match the diagram.

Thanks both of you for all the help - very much appreciated.

Adam







On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi John,
>
> Just made a pass on your diagram (nice hand-drawing btw!), and obviously we
> are thinking about the same thing :) A neat difference that I like, is that
> in the pre-join repartition topic we can still send message in the format
> of `K=k, V=(i=2)` while using "i" as the partition key in StreamsPartition,
> this way we do not need to even augment the key for the repartition topic,
> but just do a projection on the foreign key part but trim all other fields:
> as long as we still materialize the store as `A-2` co-located with the
> right KTable, that is fine.
>
> As I mentioned in my previous email, I also think this has a few advantages
> on saving over-the-wire bytes as well as disk bytes.
>
> Guozhang
>
>
> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for taking a look! I think Adam's already addressed your questions
> > as well as I could have.
> >
> > Hi Adam,
> >
> > Thanks for updating the KIP. It looks great, especially how all the
> > need-to-know information is right at the top, followed by the details.
> >
> > Also, thanks for that high-level diagram. Actually, now that I'm looking
> > at it, I think part of my proposal got lost in translation, although I do
> > think that what you have there is also correct.
> >
> > I sketched up a crude diagram based on yours and attached it to the KIP
> > (I'm not sure if attached or inline images work on the mailing list):
> >
> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
> > . It's also attached to this email for convenience.
> >
> > Hopefully, you can see how it's intended to line up, and which parts are
> > modified.
> > At a high level, instead of performing the join on the right-hand side,
> > we're essentially just registering interest, like "LHS key A wishes to
> > receive updates for RHS key 2". Then, when there is a new "interest" or
> any
> > updates to the RHS records, it "broadcasts" its state back to the LHS
> > records who are interested in it.
> >
> > Thus, instead of sending the LHS values to the RHS joiner workers and
> then
> > sending the join results back to the LHS worke be co-partitioned and
> > validated, we instead only send the LHS *keys* to the RHS workers and
> then
> > only the RHS k/v back to be joined by the LHS worker.
> >
> > I've been considering both your diagram and mine, and I *think* what I'm
> > proposing has a few advantages.
> >
> > Here are some points of interest as you look at the diagram:
> > * When we extract the foreign key and send it to the Pre-Join Repartition
> > Topic, we can send only the FK/PK pair. There's no need to worry about
> > custom partitioner logic, since we can just use the foreign key plainly
> as
> > the repartition record key. Also, we save on transmitting the LHS value,
> > since we only send its key in this step.
> > * We also only need to store the RHSKey:LHSKey mapping in the
> > MaterializedSubscriptionStore, saving on disk. We can use the same rocks
> > key format you proposed and the same algorithm involving range scans when
> > the RHS records get updated.
> > * Instead of joining on the right side, all we do is compose a
> > re-repartition record so we can broadcast the RHS k/v pair back to the
> > original LHS partition. (this is what the "rekey" node is doing)
> > * Then, there is a special kind of Joiner that's co-resident in the same
> > StreamTask as the LHS table, subscribed to the Post-Join Repartition
> Topic.
> > ** This Joiner is *not* triggered directly by any changes in the LHS
> > KTable. Instead, LHS events indirectly trigger the join via the whole
> > lifecycle.
> > ** For each event arriving from the Post-Join Repartition Topic, the
> > Joiner looks up the corresponding record in the LHS KTable. It validates
> > the FK as you noted, discarding any inconsistent events. Otherwise, it
> > unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the join
> > result
> > ** Note that the Joiner itself is stateless, so materializing the join
> > result is optional, just as with the 1:1 joins.
> >
> > So in summary:
> > * instead of transmitting the LHS keys and values to the right and the
> > JoinResult back to the left, we only transmit the LHS keys to the right
> and
> > the RHS values to the left. Assuming the average RHS value is on smaller
> > than or equal to the average join result size, it's a clear win on broker
> > traffic. I think this is actually a reasonable assumption, which we can
> > discuss more if you're suspicious.
> > * we only need one copy of the data (the left and right tables need to be
> > materialized) and one extra copy of the PK:FK pairs in the Materialized
> > Subscription Store. Materializing the join result is optional, just as
> with
> > the existing 1:1 joins.
> > * we still need the fancy range-scan algorithm on the right to locate all
> > interested LHS keys when a RHS value is updated, but we don't need a
> custom
> > partitioner for either repartition topic (this is of course a
> modification
> > we could make to your version as well)
> >
> > How does this sound to you? (And did I miss anything?)
> > -John
> >
> > On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <adam.bellem...@gmail.com
> >
> > wrote:
> >
> >> Hi John & Guozhang
> >>
> >> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the
> KIP,
> >> pruned much of what I wrote and put a simplified diagram near the top to
> >> illustrate the workflow. I encapsulated Jan's content at the bottom of
> the
> >> document. I believe it is simpler to read by far now.
> >>
> >> @Guozhang Wang <wangg...@gmail.com>:
> >> > #1: rekey left table
> >> >   -> source from the left upstream, send to rekey-processor to
> generate
> >> combined key, and then sink to copartition topic.
> >> Correct.
> >>
> >> > #2: first-join with right table
> >> >   -> source from the right table upstream, materialize the right
> table.
> >> >   -> source from the co-partition topic, materialize the rekeyed left
> >> table, join with the right table, rekey back, and then sink to the
> >> rekeyed-back topic.
> >> Almost - I cleared up the KIP. We do not rekey back yet, as I need the
> >> Foreign-Key value generated in #1 above to compare in the resolution
> >> stage.
> >>
> >> > #3: second join
> >> >    -> source from the rekeyed-back topic, materialize the rekeyed back
> >> table.
> >> >   -> source from the left upstream, materialize the left table, join
> >> with
> >> the rekeyed back table.
> >> Almost - As each event comes in, we just run it through a stateful
> >> processor that checks the original ("This") KTable for the key. The
> value
> >> payload then has the foreignKeyExtractor applied again as in Part #1
> >> above,
> >> and gets the current foreign key. Then we compare it to the joined event
> >> that we are currently resolving. If they have the same foreign-key,
> >> propagate the result out. If they don't, throw the event away.
> >>
> >> The end result is that we do need to materialize 2 additional tables
> >> (left/this-combinedkey table, and the final Joined table) as I've
> >> illustrated in the updated KIP. I hope the diagram clears it up a lot
> >> better. Please let me know.
> >>
> >> Thanks again
> >> Adam
> >>
> >>
> >>
> >>
> >> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> > John,
> >> >
> >> > Thanks a lot for the suggestions on refactoring the wiki, I agree with
> >> you
> >> > that we should consider the KIP proposal to be easily understood by
> >> anyone
> >> > in the future to read, and hence should provide a good summary on the
> >> > user-facing interfaces, as well as rejected alternatives to represent
> >> > briefly "how we came a long way to this conclusion, and what we have
> >> > argued, disagreed, and agreed about, etc" so that readers do not need
> to
> >> > dig into the DISCUSS thread to get all the details. We can, of course,
> >> keep
> >> > the implementation details like "workflows" on the wiki page as a
> >> addendum
> >> > section since it also has correlations.
> >> >
> >> > Regarding your proposal on comment 6): that's a very interesting idea!
> >> Just
> >> > to clarify that I understands it fully correctly: the proposal's
> >> resulted
> >> > topology is still the same as the current proposal, where we will
> have 3
> >> > sub-topologies for this operator:
> >> >
> >> > #1: rekey left table
> >> >    -> source from the left upstream, send to rekey-processor to
> generate
> >> > combined key, and then sink to copartition topic.
> >> >
> >> > #2: first-join with right table
> >> >    -> source from the right table upstream, materialize the right
> table.
> >> >    -> source from the co-partition topic, materialize the rekeyed left
> >> > table, join with the right table, rekey back, and then sink to the
> >> > rekeyed-back topic.
> >> >
> >> > #3: second join
> >> >    -> source from the rekeyed-back topic, materialize the rekeyed back
> >> > table.
> >> >    -> source from the left upstream, materialize the left table, join
> >> with
> >> > the rekeyed back table.
> >> >
> >> > Sub-topology #1 and #3 may be merged to a single sub-topology since
> >> both of
> >> > them read from the left table source stream. In this workflow, we need
> >> to
> >> > materialize 4 tables (left table in #3, right table in #2, rekeyed
> left
> >> > table in #2, rekeyed-back table in #3), and 2 repartition topics
> >> > (copartition topic, rekeyed-back topic).
> >> >
> >> > Compared with Adam's current proposal in the workflow overview, it has
> >> the
> >> > same num.materialize tables (left table, rekeyed left table, right
> >> table,
> >> > out-of-ordering resolver table), and same num.internal topics (two).
> The
> >> > advantage is that on the copartition topic, we can save bandwidth by
> not
> >> > sending value, and in #2 the rekeyed left table is smaller since we do
> >> not
> >> > have any values to materialize. Is that right?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> > On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io>
> wrote:
> >> >
> >> > > Hi Adam,
> >> > >
> >> > > Given that the committers are all pretty busy right now, I think
> that
> >> it
> >> > > would help if you were to refactor the KIP a little to reduce the
> >> > workload
> >> > > for reviewers.
> >> > >
> >> > > I'd recommend the following changes:
> >> > > * relocate all internal details to a section at the end called
> >> something
> >> > > like "Implementation Notes" or something like that.
> >> > > * rewrite the rest of the KIP to be a succinct as possible and
> mention
> >> > only
> >> > > publicly-facing API changes.
> >> > > ** for example, the interface that you've already listed there, as
> >> well
> >> > as
> >> > > a textual description of the guarantees we'll be providing (join
> >> result
> >> > is
> >> > > copartitioned with the LHS, and the join result is guaranteed
> correct)
> >> > >
> >> > > A good target would be that the whole main body of the KIP,
> including
> >> > > Status, Motivation, Proposal, Justification, and Rejected
> Alternatives
> >> > all
> >> > > fit "above the fold" (i.e., all fit on the screen at a comfortable
> >> zoom
> >> > > level).
> >> > > I think the only real Rejected Alternative that bears mention at
> this
> >> > point
> >> > > is KScatteredTable, which you could just include the executive
> >> summary on
> >> > > (no implementation details), and link to extra details in the
> >> > > Implementation Notes section.
> >> > >
> >> > > Taking a look at the wiki page, ~90% of the text there is internal
> >> > detail,
> >> > > which is useful for the dubious, but doesn't need to be ratified in
> a
> >> > vote
> >> > > (and would be subject to change without notice in the future
> anyway).
> >> > > There's also a lot of conflicting discussion, as you've very
> >> respectfully
> >> > > tried to preserve the original proposal from Jan while adding your
> >> own.
> >> > > Isolating all this information in a dedicated section at the bottom
> >> frees
> >> > > the voters up to focus on the public API part of the proposal, which
> >> is
> >> > > really all they need to consider.
> >> > >
> >> > > Plus, it'll be clear to future readers which parts of the document
> are
> >> > > enduring, and which parts are a snapshot of our implementation
> >> thinking
> >> > at
> >> > > the time.
> >> > >
> >> > > I'm suggesting this because I suspect that the others haven't made
> >> time
> >> > to
> >> > > review it partly because it seems daunting. If it seems like it
> would
> >> be
> >> > a
> >> > > huge time investment to review, people will just keep putting it
> off.
> >> But
> >> > > if the KIP is a single page, then they'll be more inclined to give
> it
> >> a
> >> > > read.
> >> > >
> >> > > Honestly, I don't think the KIP itself is that controversial (apart
> >> from
> >> > > the scattered table thing (sorry, Jan) ). Most of the discussion has
> >> been
> >> > > around the implementation, which we can continue more effectively in
> >> a PR
> >> > > once the KIP has passed.
> >> > >
> >> > > How does that sound?
> >> > > -John
> >> > >
> >> > > On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
> >> adam.bellem...@gmail.com
> >> > >
> >> > > wrote:
> >> > >
> >> > > > 1) I believe that the resolution mechanism John has proposed is
> >> > > sufficient
> >> > > > - it is clean and easy and doesn't require additional RocksDB
> >> stores,
> >> > > which
> >> > > > reduces the footprint greatly. I don't think we need to resolve
> >> based
> >> > on
> >> > > > timestamp or offset anymore, but if we decide to do to that would
> be
> >> > > within
> >> > > > the bounds of the existing API.
> >> > > >
> >> > > > 2) Is the current API sufficient, or does it need to be altered to
> >> go
> >> > > back
> >> > > > to vote?
> >> > > >
> >> > > > 3) KScatteredTable implementation can always be added in a future
> >> > > revision.
> >> > > > This API does not rule it out. This implementation of this
> function
> >> > would
> >> > > > simply be replaced with `KScatteredTable.resolve()` while still
> >> > > maintaining
> >> > > > the existing API, thereby giving both features as Jan outlined
> >> earlier.
> >> > > > Would this work?
> >> > > >
> >> > > >
> >> > > > Thanks Guozhang, John and Jan
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io>
> >> > wrote:
> >> > > >
> >> > > > > Hi, all,
> >> > > > >
> >> > > > > >> In fact, we
> >> > > > > >> can just keep a single final-result store with timestamps and
> >> > reject
> >> > > > > values
> >> > > > > >> that have a smaller timestamp, is that right?
> >> > > > >
> >> > > > > > Which is the correct output should at least be decided on the
> >> > offset
> >> > > of
> >> > > > > > the original message.
> >> > > > >
> >> > > > > Thanks for this point, Jan.
> >> > > > >
> >> > > > > KIP-258 is merely to allow embedding the record timestamp  in
> the
> >> k/v
> >> > > > > store,
> >> > > > > as well as providing a storage-format upgrade path.
> >> > > > >
> >> > > > > I might have missed it, but I think we have yet to discuss
> whether
> >> > it's
> >> > > > > safe
> >> > > > > or desirable just to swap topic-ordering our for
> >> timestamp-ordering.
> >> > > This
> >> > > > > is
> >> > > > > a very deep topic, and I think it would only pollute the current
> >> > > > > discussion.
> >> > > > >
> >> > > > > What Adam has proposed is safe, given the *current* ordering
> >> > semantics
> >> > > > > of the system. If we can agree on his proposal, I think we can
> >> merge
> >> > > the
> >> > > > > feature well before the conversation about timestamp ordering
> even
> >> > > takes
> >> > > > > place, much less reaches a conclusion. In the mean time, it
> would
> >> > seem
> >> > > to
> >> > > > > be unfortunate to have one join operator with different ordering
> >> > > > semantics
> >> > > > > from every other KTable operator.
> >> > > > >
> >> > > > > If and when that timestamp discussion takes place, many (all?)
> >> KTable
> >> > > > > operations
> >> > > > > will need to be updated, rendering the many:one join a small
> >> marginal
> >> > > > cost.
> >> > > > >
> >> > > > > And, just to plug it again, I proposed an algorithm above that I
> >> > > believe
> >> > > > > provides
> >> > > > > correct ordering without any additional metadata, and regardless
> >> of
> >> > the
> >> > > > > ordering semantics. I didn't bring it up further, because I felt
> >> the
> >> > > KIP
> >> > > > > only needs
> >> > > > > to agree on the public API, and we can discuss the
> implementation
> >> at
> >> > > > > leisure in
> >> > > > > a PR...
> >> > > > >
> >> > > > > Thanks,
> >> > > > > -John
> >> > > > >
> >> > > > >
> >> > > > > On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
> >> > jan.filip...@trivago.com
> >> > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On 10.12.2018 07:42, Guozhang Wang wrote:
> >> > > > > > > Hello Adam / Jan / John,
> >> > > > > > >
> >> > > > > > > Sorry for being late on this thread! I've finally got some
> >> time
> >> > > this
> >> > > > > > > weekend to cleanup a load of tasks on my queue (actually
> I've
> >> > also
> >> > > > > > realized
> >> > > > > > > there are a bunch of other things I need to enqueue while
> >> > cleaning
> >> > > > them
> >> > > > > > up
> >> > > > > > > --- sth I need to improve on my side). So here are my
> >> thoughts:
> >> > > > > > >
> >> > > > > > > Regarding the APIs: I like the current written API in the
> KIP.
> >> > More
> >> > > > > > > generally I'd prefer to keep the 1) one-to-many join
> >> > > functionalities
> >> > > > as
> >> > > > > > > well as 2) other join types than inner as separate KIPs
> since
> >> 1)
> >> > > may
> >> > > > > > worth
> >> > > > > > > a general API refactoring that can benefit not only
> foreignkey
> >> > > joins
> >> > > > > but
> >> > > > > > > collocate joins as well (e.g. an extended proposal of
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> >> > > > > > ),
> >> > > > > > > and I'm not sure if other join types would actually be
> needed
> >> > > (maybe
> >> > > > > left
> >> > > > > > > join still makes sense), so it's better to
> >> > > > > wait-for-people-to-ask-and-add
> >> > > > > > > than add-sth-that-no-one-uses.
> >> > > > > > >
> >> > > > > > > Regarding whether we enforce step 3) / 4) v.s. introducing a
> >> > > > > > > KScatteredTable for users to inject their own optimization:
> >> I'd
> >> > > > prefer
> >> > > > > to
> >> > > > > > > do the current option as-is, and my main rationale is for
> >> > > > optimization
> >> > > > > > > rooms inside the Streams internals and the API succinctness.
> >> For
> >> > > > > advanced
> >> > > > > > > users who may indeed prefer KScatteredTable and do their own
> >> > > > > > optimization,
> >> > > > > > > while it is too much of the work to use Processor API
> >> directly, I
> >> > > > think
> >> > > > > > we
> >> > > > > > > can still extend the current API to support it in the future
> >> if
> >> > it
> >> > > > > > becomes
> >> > > > > > > necessary.
> >> > > > > >
> >> > > > > > no internal optimization potential. it's a myth
> >> > > > > >
> >> > > > > > ¯\_(ツ)_/¯
> >> > > > > >
> >> > > > > > :-)
> >> > > > > >
> >> > > > > > >
> >> > > > > > > Another note about step 4) resolving out-of-ordering data,
> as
> >> I
> >> > > > > mentioned
> >> > > > > > > before I think with KIP-258 (embedded timestamp with
> key-value
> >> > > store)
> >> > > > > we
> >> > > > > > > can actually make this step simpler than the current
> >> proposal. In
> >> > > > fact,
> >> > > > > > we
> >> > > > > > > can just keep a single final-result store with timestamps
> and
> >> > > reject
> >> > > > > > values
> >> > > > > > > that have a smaller timestamp, is that right?
> >> > > > > >
> >> > > > > > Which is the correct output should at least be decided on the
> >> > offset
> >> > > of
> >> > > > > > the original message.
> >> > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > That's all I have in mind now. Again, great appreciation to
> >> Adam
> >> > to
> >> > > > > make
> >> > > > > > > such HUGE progress on this KIP!
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Guozhang
> >> > > > > > >
> >> > > > > > > On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> >> > > > jan.filip...@trivago.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > >> If they don't find the time:
> >> > > > > > >> They usually take the opposite path from me :D
> >> > > > > > >> so the answer would be clear.
> >> > > > > > >>
> >> > > > > > >> hence my suggestion to vote.
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On 04.12.2018 21:06, Adam Bellemare wrote:
> >> > > > > > >>> Hi Guozhang and Matthias
> >> > > > > > >>>
> >> > > > > > >>> I know both of you are quite busy, but we've gotten this
> KIP
> >> > to a
> >> > > > > point
> >> > > > > > >>> where we need more guidance on the API (perhaps a bit of a
> >> > > > > tie-breaker,
> >> > > > > > >> if
> >> > > > > > >>> you will). If you have anyone else you may think should
> >> look at
> >> > > > this,
> >> > > > > > >>> please tag them accordingly.
> >> > > > > > >>>
> >> > > > > > >>> The scenario is as such:
> >> > > > > > >>>
> >> > > > > > >>> Current Option:
> >> > > > > > >>> API:
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> >> > > > > > >>> 1) Rekey the data to CombinedKey, and shuffles it to the
> >> > > partition
> >> > > > > with
> >> > > > > > >> the
> >> > > > > > >>> foreignKey (repartition 1)
> >> > > > > > >>> 2) Join the data
> >> > > > > > >>> 3) Shuffle the data back to the original node (repartition
> >> 2)
> >> > > > > > >>> 4) Resolve out-of-order arrival / race condition due to
> >> > > foreign-key
> >> > > > > > >> changes.
> >> > > > > > >>>
> >> > > > > > >>> Alternate Option:
> >> > > > > > >>> Perform #1 and #2 above, and return a KScatteredTable.
> >> > > > > > >>> - It would be keyed on a wrapped key function:
> >> <CombinedKey<KO,
> >> > > K>,
> >> > > > > VR>
> >> > > > > > >> (KO
> >> > > > > > >>> = Other Table Key, K = This Table Key, VR = Joined Result)
> >> > > > > > >>> - KScatteredTable.resolve() would perform #3 and #4 but
> >> > > otherwise a
> >> > > > > > user
> >> > > > > > >>> would be able to perform additional functions directly
> from
> >> the
> >> > > > > > >>> KScatteredTable (TBD - currently out of scope).
> >> > > > > > >>> - John's analysis 2-emails up is accurate as to the
> >> tradeoffs.
> >> > > > > > >>>
> >> > > > > > >>> Current Option is coded as-is. Alternate option is
> possible,
> >> > but
> >> > > > will
> >> > > > > > >>> require for implementation details to be made in the API
> and
> >> > some
> >> > > > > > >> exposure
> >> > > > > > >>> of new data structures into the API (ie: CombinedKey).
> >> > > > > > >>>
> >> > > > > > >>> I appreciate any insight into this.
> >> > > > > > >>>
> >> > > > > > >>> Thanks.
> >> > > > > > >>>
> >> > > > > > >>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> >> > > > > > adam.bellem...@gmail.com>
> >> > > > > > >>> wrote:
> >> > > > > > >>>
> >> > > > > > >>>> Hi John
> >> > > > > > >>>>
> >> > > > > > >>>> Thanks for your feedback and assistance. I think your
> >> summary
> >> > is
> >> > > > > > >> accurate
> >> > > > > > >>>> from my perspective. Additionally, I would like to add
> that
> >> > > there
> >> > > > > is a
> >> > > > > > >> risk
> >> > > > > > >>>> of inconsistent final states without performing the
> >> > resolution.
> >> > > > This
> >> > > > > > is
> >> > > > > > >> a
> >> > > > > > >>>> major concern for me as most of the data I have dealt
> with
> >> is
> >> > > > > produced
> >> > > > > > >> by
> >> > > > > > >>>> relational databases. We have seen a number of cases
> where
> >> a
> >> > > user
> >> > > > in
> >> > > > > > the
> >> > > > > > >>>> Rails UI has modified the field (foreign key), realized
> >> they
> >> > > made
> >> > > > a
> >> > > > > > >>>> mistake, and then updated the field again with a new key.
> >> The
> >> > > > events
> >> > > > > > are
> >> > > > > > >>>> propagated out as they are produced, and as such we have
> >> had
> >> > > > > > real-world
> >> > > > > > >>>> cases where these inconsistencies were propagated
> >> downstream
> >> > as
> >> > > > the
> >> > > > > > >> final
> >> > > > > > >>>> values due to the race conditions in the fanout of the
> >> data.
> >> > > > > > >>>>
> >> > > > > > >>>> This solution that I propose values correctness of the
> >> final
> >> > > > result
> >> > > > > > over
> >> > > > > > >>>> other factors.
> >> > > > > > >>>>
> >> > > > > > >>>> We could always move this function over to using a
> >> > > KScatteredTable
> >> > > > > > >>>> implementation in the future, and simply deprecate it
> this
> >> > join
> >> > > > API
> >> > > > > in
> >> > > > > > >>>> time. I think I would like to hear more from some of the
> >> other
> >> > > > major
> >> > > > > > >>>> committers on which course of action they would think is
> >> best
> >> > > > before
> >> > > > > > any
> >> > > > > > >>>> more coding is done.
> >> > > > > > >>>>
> >> > > > > > >>>> Thanks again
> >> > > > > > >>>>
> >> > > > > > >>>> Adam
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
> >> > j...@confluent.io>
> >> > > > > > wrote:
> >> > > > > > >>>>
> >> > > > > > >>>>> Hi Jan and Adam,
> >> > > > > > >>>>>
> >> > > > > > >>>>> Wow, thanks for doing that test, Adam. Those results are
> >> > > > > encouraging.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Thanks for your performance experience as well, Jan. I
> >> agree
> >> > > that
> >> > > > > > >> avoiding
> >> > > > > > >>>>> unnecessary join outputs is especially important when
> the
> >> > > fan-out
> >> > > > > is
> >> > > > > > so
> >> > > > > > >>>>> high. I suppose this could also be built into the
> >> > > implementation
> >> > > > > > we're
> >> > > > > > >>>>> discussing, but it wouldn't have to be specified in the
> >> KIP
> >> > > > (since
> >> > > > > > >> it's an
> >> > > > > > >>>>> API-transparent optimization).
> >> > > > > > >>>>>
> >> > > > > > >>>>> As far as whether or not to re-repartition the data, I
> >> didn't
> >> > > > bring
> >> > > > > > it
> >> > > > > > >> up
> >> > > > > > >>>>> because it sounded like the two of you agreed to leave
> the
> >> > KIP
> >> > > > > as-is,
> >> > > > > > >>>>> despite the disagreement.
> >> > > > > > >>>>>
> >> > > > > > >>>>> If you want my opinion, I feel like both approaches are
> >> > > > reasonable.
> >> > > > > > >>>>> It sounds like Jan values more the potential for
> >> developers
> >> > to
> >> > > > > > optimize
> >> > > > > > >>>>> their topologies to re-use the intermediate nodes,
> whereas
> >> > Adam
> >> > > > > > places
> >> > > > > > >>>>> more
> >> > > > > > >>>>> value on having a single operator that people can use
> >> without
> >> > > > extra
> >> > > > > > >> steps
> >> > > > > > >>>>> at the end.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Personally, although I do find it exceptionally annoying
> >> > when a
> >> > > > > > >> framework
> >> > > > > > >>>>> gets in my way when I'm trying to optimize something, it
> >> > seems
> >> > > > > better
> >> > > > > > >> to
> >> > > > > > >>>>> go
> >> > > > > > >>>>> for a single operation.
> >> > > > > > >>>>> * Encapsulating the internal transitions gives us
> >> significant
> >> > > > > > latitude
> >> > > > > > >> in
> >> > > > > > >>>>> the implementation (for example, joining only at the
> end,
> >> not
> >> > > in
> >> > > > > the
> >> > > > > > >>>>> middle
> >> > > > > > >>>>> to avoid extra data copying and out-of-order resolution;
> >> how
> >> > we
> >> > > > > > >> represent
> >> > > > > > >>>>> the first repartition keys (combined keys vs. value
> >> vectors),
> >> > > > > etc.).
> >> > > > > > >> If we
> >> > > > > > >>>>> publish something like a KScatteredTable with the
> >> > > > right-partitioned
> >> > > > > > >> joined
> >> > > > > > >>>>> data, then the API pretty much locks in the
> >> implementation as
> >> > > > well.
> >> > > > > > >>>>> * The API seems simpler to understand and use. I do mean
> >> > > "seems";
> >> > > > > if
> >> > > > > > >>>>> anyone
> >> > > > > > >>>>> wants to make the case that KScatteredTable is actually
> >> > > simpler,
> >> > > > I
> >> > > > > > >> think
> >> > > > > > >>>>> hypothetical usage code would help. From a relational
> >> algebra
> >> > > > > > >> perspective,
> >> > > > > > >>>>> it seems like KTable.join(KTable) should produce a new
> >> KTable
> >> > > in
> >> > > > > all
> >> > > > > > >>>>> cases.
> >> > > > > > >>>>> * That said, there might still be room in the API for a
> >> > > different
> >> > > > > > >>>>> operation
> >> > > > > > >>>>> like what Jan has proposed to scatter a KTable, and then
> >> do
> >> > > > things
> >> > > > > > like
> >> > > > > > >>>>> join, re-group, etc from there... I'm not sure; I
> haven't
> >> > > thought
> >> > > > > > >> through
> >> > > > > > >>>>> all the consequences yet.
> >> > > > > > >>>>>
> >> > > > > > >>>>> This is all just my opinion after thinking over the
> >> > discussion
> >> > > so
> >> > > > > > >> far...
> >> > > > > > >>>>> -John
> >> > > > > > >>>>>
> >> > > > > > >>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> >> > > > > > >> adam.bellem...@gmail.com>
> >> > > > > > >>>>> wrote:
> >> > > > > > >>>>>
> >> > > > > > >>>>>> Updated the PR to take into account John's feedback.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> I did some preliminary testing for the performance of
> the
> >> > > > > > prefixScan.
> >> > > > > > >> I
> >> > > > > > >>>>>> have attached the file, but I will also include the
> text
> >> in
> >> > > the
> >> > > > > body
> >> > > > > > >>>>> here
> >> > > > > > >>>>>> for archival purposes (I am not sure what happens to
> >> > attached
> >> > > > > > files).
> >> > > > > > >> I
> >> > > > > > >>>>>> also updated the PR and the KIP accordingly.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Summary: It scales exceptionally well for scanning
> large
> >> > > values
> >> > > > of
> >> > > > > > >>>>>> records. As Jan mentioned previously, the real issue
> >> would
> >> > be
> >> > > > more
> >> > > > > > >>>>> around
> >> > > > > > >>>>>> processing the resulting records after obtaining them.
> >> For
> >> > > > > instance,
> >> > > > > > >> it
> >> > > > > > >>>>>> takes approximately ~80-120 mS to flush the buffer and
> a
> >> > > further
> >> > > > > > >>>>> ~35-85mS
> >> > > > > > >>>>>> to scan 27.5M records, obtaining matches for 2.5M of
> >> them.
> >> > > > > Iterating
> >> > > > > > >>>>>> through the records just to generate a simple count
> >> takes ~
> >> > 40
> >> > > > > times
> >> > > > > > >>>>> longer
> >> > > > > > >>>>>> than the flush + scan combined.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ============================================================================================
> >> > > > > > >>>>>> Setup:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ============================================================================================
> >> > > > > > >>>>>> Java 9 with default settings aside from a 512 MB heap
> >> > > (Xmx512m,
> >> > > > > > >> Xms512m)
> >> > > > > > >>>>>> CPU: i7 2.2 Ghz.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Note: I am using a slightly-modified,
> directly-accessible
> >> > > Kafka
> >> > > > > > >> Streams
> >> > > > > > >>>>>> RocksDB
> >> > > > > > >>>>>> implementation (RocksDB.java, basically just avoiding
> the
> >> > > > > > >>>>>> ProcessorContext).
> >> > > > > > >>>>>> There are no modifications to the default RocksDB
> values
> >> > > > provided
> >> > > > > in
> >> > > > > > >> the
> >> > > > > > >>>>>> 2.1/trunk release.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> keysize = 128 bytes
> >> > > > > > >>>>>> valsize = 512 bytes
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Step 1:
> >> > > > > > >>>>>> Write X positive matching events: (key = prefix +
> >> > left-padded
> >> > > > > > >>>>>> auto-incrementing integer)
> >> > > > > > >>>>>> Step 2:
> >> > > > > > >>>>>> Write 10X negative matching events (key = left-padded
> >> > > > > > >> auto-incrementing
> >> > > > > > >>>>>> integer)
> >> > > > > > >>>>>> Step 3:
> >> > > > > > >>>>>> Perform flush
> >> > > > > > >>>>>> Step 4:
> >> > > > > > >>>>>> Perform prefixScan
> >> > > > > > >>>>>> Step 5:
> >> > > > > > >>>>>> Iterate through return Iterator and validate the count
> of
> >> > > > expected
> >> > > > > > >>>>> events.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ============================================================================================
> >> > > > > > >>>>>> Results:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> ============================================================================================
> >> > > > > > >>>>>> X = 1k (11k events total)
> >> > > > > > >>>>>> Flush Time = 39 mS
> >> > > > > > >>>>>> Scan Time = 7 mS
> >> > > > > > >>>>>> 6.9 MB disk
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------------------------
> >> > > > > > >>>>>> X = 10k (110k events total)
> >> > > > > > >>>>>> Flush Time = 45 mS
> >> > > > > > >>>>>> Scan Time = 8 mS
> >> > > > > > >>>>>> 127 MB
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------------------------
> >> > > > > > >>>>>> X = 100k (1.1M events total)
> >> > > > > > >>>>>> Test1:
> >> > > > > > >>>>>> Flush Time = 60 mS
> >> > > > > > >>>>>> Scan Time = 12 mS
> >> > > > > > >>>>>> 678 MB
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Test2:
> >> > > > > > >>>>>> Flush Time = 45 mS
> >> > > > > > >>>>>> Scan Time = 7 mS
> >> > > > > > >>>>>> 576 MB
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------------------------
> >> > > > > > >>>>>> X = 1MB (11M events total)
> >> > > > > > >>>>>> Test1:
> >> > > > > > >>>>>> Flush Time = 52 mS
> >> > > > > > >>>>>> Scan Time = 19 mS
> >> > > > > > >>>>>> 7.2 GB
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Test2:
> >> > > > > > >>>>>> Flush Time = 84 mS
> >> > > > > > >>>>>> Scan Time = 34 mS
> >> > > > > > >>>>>> 9.1 GB
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------------------------
> >> > > > > > >>>>>> X = 2.5M (27.5M events total)
> >> > > > > > >>>>>> Test1:
> >> > > > > > >>>>>> Flush Time = 82 mS
> >> > > > > > >>>>>> Scan Time = 63 mS
> >> > > > > > >>>>>> 17GB - 276 sst files
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Test2:
> >> > > > > > >>>>>> Flush Time = 116 mS
> >> > > > > > >>>>>> Scan Time = 35 mS
> >> > > > > > >>>>>> 23GB - 361 sst files
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Test3:
> >> > > > > > >>>>>> Flush Time = 103 mS
> >> > > > > > >>>>>> Scan Time = 82 mS
> >> > > > > > >>>>>> 19 GB - 300 sst files
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------------------------
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> I had to limit my testing on my laptop to X = 2.5M
> >> events. I
> >> > > > tried
> >> > > > > > to
> >> > > > > > >> go
> >> > > > > > >>>>>> to X = 10M (110M events) but RocksDB was going into the
> >> > 100GB+
> >> > > > > range
> >> > > > > > >>>>> and my
> >> > > > > > >>>>>> laptop ran out of disk. More extensive testing could be
> >> done
> >> > > > but I
> >> > > > > > >>>>> suspect
> >> > > > > > >>>>>> that it would be in line with what we're seeing in the
> >> > results
> >> > > > > > above.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> At this point in time, I think the only major
> discussion
> >> > point
> >> > > > is
> >> > > > > > >> really
> >> > > > > > >>>>>> around what Jan and I have disagreed on: repartitioning
> >> > back +
> >> > > > > > >> resolving
> >> > > > > > >>>>>> potential out of order issues or leaving that up to the
> >> > client
> >> > > > to
> >> > > > > > >>>>> handle.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Thanks folks,
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Adam
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> >> > > > > > jan.filip...@trivago.com
> >> > > > > > >>>
> >> > > > > > >>>>>> wrote:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> >> > > > > > >>>>>>>> Hi all,
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Sorry that this discussion petered out... I think the
> >> 2.1
> >> > > > > release
> >> > > > > > >>>>>>> caused an
> >> > > > > > >>>>>>>> extended distraction that pushed it off everyone's
> >> radar
> >> > > > (which
> >> > > > > > was
> >> > > > > > >>>>>>>> precisely Adam's concern). Personally, I've also had
> >> some
> >> > > > extend
> >> > > > > > >>>>>>>> distractions of my own that kept (and continue to
> >> keep) me
> >> > > > > > >>>>> preoccupied.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> However, calling for a vote did wake me up, so I
> guess
> >> Jan
> >> > > was
> >> > > > > on
> >> > > > > > >> the
> >> > > > > > >>>>>>> right
> >> > > > > > >>>>>>>> track!
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> I've gone back and reviewed the whole KIP document
> and
> >> the
> >> > > > prior
> >> > > > > > >>>>>>>> discussion, and I'd like to offer a few thoughts:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> API Thoughts:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 1. If I read the KIP right, you are proposing a
> >> > many-to-one
> >> > > > > join.
> >> > > > > > >>>>> Could
> >> > > > > > >>>>>>> we
> >> > > > > > >>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
> >> flip
> >> > > the
> >> > > > > > design
> >> > > > > > >>>>>>> around
> >> > > > > > >>>>>>>> and make it a oneToManyJoin?
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> The proposed name "joinOnForeignKey" disguises the
> join
> >> > > type,
> >> > > > > and
> >> > > > > > it
> >> > > > > > >>>>>>> seems
> >> > > > > > >>>>>>>> like it might trick some people into using it for a
> >> > > one-to-one
> >> > > > > > join.
> >> > > > > > >>>>>>> This
> >> > > > > > >>>>>>>> would work, of course, but it would be super
> >> inefficient
> >> > > > > compared
> >> > > > > > to
> >> > > > > > >>>>> a
> >> > > > > > >>>>>>>> simple rekey-and-join.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 2. I might have missed it, but I don't think it's
> >> > specified
> >> > > > > > whether
> >> > > > > > >>>>>>> it's an
> >> > > > > > >>>>>>>> inner, outer, or left join. I'm guessing an outer
> >> join, as
> >> > > > > > >>>>> (neglecting
> >> > > > > > >>>>>>> IQ),
> >> > > > > > >>>>>>>> the rest can be achieved by filtering or by handling
> >> it in
> >> > > the
> >> > > > > > >>>>>>> ValueJoiner.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
> quite
> >> > > right.
> >> > > > > > >>>>>>>> 3a. Regarding Serialized: There are a few different
> >> > > paradigms
> >> > > > in
> >> > > > > > >>>>> play in
> >> > > > > > >>>>>>>> the Streams API, so it's confusing, but instead of
> >> three
> >> > > > > > Serialized
> >> > > > > > >>>>>>> args, I
> >> > > > > > >>>>>>>> think it would be better to have one that allows
> >> > > (optionally)
> >> > > > > > >> setting
> >> > > > > > >>>>>>> the 4
> >> > > > > > >>>>>>>> incoming serdes. The result serde is defined by the
> >> > > > > Materialized.
> >> > > > > > >> The
> >> > > > > > >>>>>>>> incoming serdes can be optional because they might
> >> already
> >> > > be
> >> > > > > > >>>>> available
> >> > > > > > >>>>>>> on
> >> > > > > > >>>>>>>> the source KTables, or the default serdes from the
> >> config
> >> > > > might
> >> > > > > be
> >> > > > > > >>>>>>>> applicable.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 3b. Is the StreamPartitioner necessary? The other
> joins
> >> > > don't
> >> > > > > > allow
> >> > > > > > >>>>>>> setting
> >> > > > > > >>>>>>>> one, and it seems like it might actually be harmful,
> >> since
> >> > > the
> >> > > > > > rekey
> >> > > > > > >>>>>>>> operation needs to produce results that are
> >> co-partitioned
> >> > > > with
> >> > > > > > the
> >> > > > > > >>>>>>> "other"
> >> > > > > > >>>>>>>> KTable.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 4. I'm fine with the "reserved word" header, but I
> >> didn't
> >> > > > > actually
> >> > > > > > >>>>>>> follow
> >> > > > > > >>>>>>>> what Matthias meant about namespacing requiring
> >> > > > "deserializing"
> >> > > > > > the
> >> > > > > > >>>>>>> record
> >> > > > > > >>>>>>>> header. The headers are already Strings, so I don't
> >> think
> >> > > that
> >> > > > > > >>>>>>>> deserialization is required. If we applied the
> >> namespace
> >> > at
> >> > > > > source
> >> > > > > > >>>>> nodes
> >> > > > > > >>>>>>>> and stripped it at sink nodes, this would be
> >> practically
> >> > no
> >> > > > > > >> overhead.
> >> > > > > > >>>>>>> The
> >> > > > > > >>>>>>>> advantage of the namespace idea is that no public API
> >> > change
> >> > > > wrt
> >> > > > > > >>>>> headers
> >> > > > > > >>>>>>>> needs to happen, and no restrictions need to be
> placed
> >> on
> >> > > > users'
> >> > > > > > >>>>>>> headers.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> (Although I'm wondering if we can get away without
> the
> >> > > header
> >> > > > at
> >> > > > > > >>>>> all...
> >> > > > > > >>>>>>>> stay tuned)
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 5. I also didn't follow the discussion about the HWM
> >> table
> >> > > > > growing
> >> > > > > > >>>>>>> without
> >> > > > > > >>>>>>>> bound. As I read it, the HWM table is effectively
> >> > > implementing
> >> > > > > OCC
> >> > > > > > >> to
> >> > > > > > >>>>>>>> resolve the problem you noted with disordering when
> the
> >> > > rekey
> >> > > > is
> >> > > > > > >>>>>>>> reversed... particularly notable when the FK changes.
> >> As
> >> > > such,
> >> > > > > it
> >> > > > > > >>>>> only
> >> > > > > > >>>>>>>> needs to track the most recent "version" (the offset
> in
> >> > the
> >> > > > > source
> >> > > > > > >>>>>>>> partition) of each key. Therefore, it should have the
> >> same
> >> > > > > number
> >> > > > > > of
> >> > > > > > >>>>>>> keys
> >> > > > > > >>>>>>>> as the source table at all times.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> I see that you are aware of KIP-258, which I think
> >> might
> >> > be
> >> > > > > > relevant
> >> > > > > > >>>>> in
> >> > > > > > >>>>>>> a
> >> > > > > > >>>>>>>> couple of ways. One: it's just about storing the
> >> timestamp
> >> > > in
> >> > > > > the
> >> > > > > > >>>>> state
> >> > > > > > >>>>>>>> store, but the ultimate idea is to effectively use
> the
> >> > > > timestamp
> >> > > > > > as
> >> > > > > > >>>>> an
> >> > > > > > >>>>>>> OCC
> >> > > > > > >>>>>>>> "version" to drop disordered updates. You wouldn't
> >> want to
> >> > > use
> >> > > > > the
> >> > > > > > >>>>>>>> timestamp for this operation, but if you were to use
> a
> >> > > similar
> >> > > > > > >>>>>>> mechanism to
> >> > > > > > >>>>>>>> store the source offset in the store alongside the
> >> > re-keyed
> >> > > > > > values,
> >> > > > > > >>>>> then
> >> > > > > > >>>>>>>> you could avoid a separate table.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 6. You and Jan have been thinking about this for a
> long
> >> > > time,
> >> > > > so
> >> > > > > > >> I've
> >> > > > > > >>>>>>>> probably missed something here, but I'm wondering if
> we
> >> > can
> >> > > > > avoid
> >> > > > > > >> the
> >> > > > > > >>>>>>> HWM
> >> > > > > > >>>>>>>> tracking at all and resolve out-of-order during a
> final
> >> > join
> >> > > > > > >>>>> instead...
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Let's say we're joining a left table (Integer K:
> Letter
> >> > FK,
> >> > > > > (other
> >> > > > > > >>>>>>> data))
> >> > > > > > >>>>>>>> to a right table (Letter K: (some data)).
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Left table:
> >> > > > > > >>>>>>>> 1: (A, xyz)
> >> > > > > > >>>>>>>> 2: (B, asd)
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Right table:
> >> > > > > > >>>>>>>> A: EntityA
> >> > > > > > >>>>>>>> B: EntityB
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> We could do a rekey as you proposed with a combined
> >> key,
> >> > but
> >> > > > not
> >> > > > > > >>>>>>>> propagating the value at all..
> >> > > > > > >>>>>>>> Rekey table:
> >> > > > > > >>>>>>>> A-1: (dummy value)
> >> > > > > > >>>>>>>> B-2: (dummy value)
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Which we then join with the right table to produce:
> >> > > > > > >>>>>>>> A-1: EntityA
> >> > > > > > >>>>>>>> B-2: EntityB
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Which gets rekeyed back:
> >> > > > > > >>>>>>>> 1: A, EntityA
> >> > > > > > >>>>>>>> 2: B, EntityB
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> And finally we do the actual join:
> >> > > > > > >>>>>>>> Result table:
> >> > > > > > >>>>>>>> 1: ((A, xyz), EntityA)
> >> > > > > > >>>>>>>> 2: ((B, asd), EntityB)
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> The thing is that in that last join, we have the
> >> > opportunity
> >> > > > to
> >> > > > > > >>>>> compare
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>> current FK in the left table with the incoming PK of
> >> the
> >> > > right
> >> > > > > > >>>>> table. If
> >> > > > > > >>>>>>>> they don't match, we just drop the event, since it
> >> must be
> >> > > > > > outdated.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>> In your KIP, you gave an example in which (1: A, xyz)
> >> gets
> >> > > > > updated
> >> > > > > > >> to
> >> > > > > > >>>>>>> (1:
> >> > > > > > >>>>>>>> B, xyz), ultimately yielding a conundrum about
> whether
> >> the
> >> > > > final
> >> > > > > > >>>>> state
> >> > > > > > >>>>>>>> should be (1: null) or (1: joined-on-B). With the
> >> > algorithm
> >> > > > > above,
> >> > > > > > >>>>> you
> >> > > > > > >>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
> >> (B,
> >> > > xyz),
> >> > > > > (B,
> >> > > > > > >>>>>>>> EntityB)). It seems like this does give you enough
> >> > > information
> >> > > > > to
> >> > > > > > >>>>> make
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>> right choice, regardless of disordering.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Will check Adams patch, but this should work. As
> >> mentioned
> >> > > > often
> >> > > > > I
> >> > > > > > am
> >> > > > > > >>>>>>> not convinced on partitioning back for the user
> >> > > automatically.
> >> > > > I
> >> > > > > > >> think
> >> > > > > > >>>>>>> this is the real performance eater ;)
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> 7. Last thought... I'm a little concerned about the
> >> > > > performance
> >> > > > > of
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>> range scans when records change in the right table.
> >> You've
> >> > > > said
> >> > > > > > that
> >> > > > > > >>>>>>> you've
> >> > > > > > >>>>>>>> been using the algorithm you presented in production
> >> for a
> >> > > > > while.
> >> > > > > > >> Can
> >> > > > > > >>>>>>> you
> >> > > > > > >>>>>>>> give us a sense of the performance characteristics
> >> you've
> >> > > > > > observed?
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Make it work, make it fast, make it beautiful. The
> >> topmost
> >> > > > thing
> >> > > > > > here
> >> > > > > > >>>>> is
> >> > > > > > >>>>>>> / was correctness. In practice I do not measure the
> >> > > performance
> >> > > > > of
> >> > > > > > >> the
> >> > > > > > >>>>>>> range scan. Usual cases I run this with is emitting
> >> 500k -
> >> > > 1kk
> >> > > > > rows
> >> > > > > > >>>>>>> on a left hand side change. The range scan is just the
> >> work
> >> > > you
> >> > > > > > gotta
> >> > > > > > >>>>>>> do, also when you pack your data into different
> formats,
> >> > > > usually
> >> > > > > > the
> >> > > > > > >>>>>>> rocks performance is very tight to the size of the
> data
> >> and
> >> > > we
> >> > > > > > can't
> >> > > > > > >>>>>>> really change that. It is more important for users to
> >> > prevent
> >> > > > > > useless
> >> > > > > > >>>>>>> updates to begin with. My left hand side is guarded to
> >> drop
> >> > > > > changes
> >> > > > > > >>>>> that
> >> > > > > > >>>>>>> are not going to change my join output.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> usually it's:
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> drop unused fields and then don't forward if
> >> > old.equals(new)
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> regarding to the performance of creating an iterator
> for
> >> > > > smaller
> >> > > > > > >>>>>>> fanouts, users can still just do a group by first then
> >> > > anyways.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>> I could only think of one alternative, but I'm not
> >> sure if
> >> > > > it's
> >> > > > > > >>>>> better
> >> > > > > > >>>>>>> or
> >> > > > > > >>>>>>>> worse... If the first re-key only needs to preserve
> the
> >> > > > original
> >> > > > > > >> key,
> >> > > > > > >>>>>>> as I
> >> > > > > > >>>>>>>> proposed in #6, then we could store a vector of keys
> in
> >> > the
> >> > > > > value:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Left table:
> >> > > > > > >>>>>>>> 1: A,...
> >> > > > > > >>>>>>>> 2: B,...
> >> > > > > > >>>>>>>> 3: A,...
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Gets re-keyed:
> >> > > > > > >>>>>>>> A: [1, 3]
> >> > > > > > >>>>>>>> B: [2]
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Then, the rhs part of the join would only need a
> >> regular
> >> > > > > > single-key
> >> > > > > > >>>>>>> lookup.
> >> > > > > > >>>>>>>> Of course we have to deal with the problem of large
> >> > values,
> >> > > as
> >> > > > > > >>>>> there's
> >> > > > > > >>>>>>> no
> >> > > > > > >>>>>>>> bound on the number of lhs records that can reference
> >> rhs
> >> > > > > records.
> >> > > > > > >>>>>>> Offhand,
> >> > > > > > >>>>>>>> I'd say we could page the values, so when one row is
> >> past
> >> > > the
> >> > > > > > >>>>>>> threshold, we
> >> > > > > > >>>>>>>> append the key for the next page. Then in most cases,
> >> it
> >> > > would
> >> > > > > be
> >> > > > > > a
> >> > > > > > >>>>>>> single
> >> > > > > > >>>>>>>> key lookup, but for large fan-out updates, it would
> be
> >> one
> >> > > per
> >> > > > > > (max
> >> > > > > > >>>>>>> value
> >> > > > > > >>>>>>>> size)/(avg lhs key size).
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> This seems more complex, though... Plus, I think
> >> there's
> >> > > some
> >> > > > > > extra
> >> > > > > > >>>>>>>> tracking we'd need to do to know when to emit a
> >> > retraction.
> >> > > > For
> >> > > > > > >>>>> example,
> >> > > > > > >>>>>>>> when record 1 is deleted, the re-key table would just
> >> have
> >> > > (A:
> >> > > > > > [3]).
> >> > > > > > >>>>>>> Some
> >> > > > > > >>>>>>>> kind of tombstone is needed so that the join result
> >> for 1
> >> > > can
> >> > > > > also
> >> > > > > > >> be
> >> > > > > > >>>>>>>> retracted.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> That's all!
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Thanks so much to both Adam and Jan for the
> thoughtful
> >> > KIP.
> >> > > > > Sorry
> >> > > > > > >> the
> >> > > > > > >>>>>>>> discussion has been slow.
> >> > > > > > >>>>>>>> -John
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> >> > > > > > >>>>> jan.filip...@trivago.com>
> >> > > > > > >>>>>>>> wrote:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>> Id say you can just call the vote.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> that happens all the time, and if something comes
> up,
> >> it
> >> > > just
> >> > > > > > goes
> >> > > > > > >>>>> back
> >> > > > > > >>>>>>>>> to discuss.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> would not expect to much attention with another
> >> another
> >> > > email
> >> > > > > in
> >> > > > > > >>>>> this
> >> > > > > > >>>>>>>>> thread.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> best Jan
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> >> > > > > > >>>>>>>>>> Hello Contributors
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> I know that 2.1 is about to be released, but I do
> >> need
> >> > to
> >> > > > bump
> >> > > > > > >>>>> this to
> >> > > > > > >>>>>>>>> keep
> >> > > > > > >>>>>>>>>> visibility up. I am still intending to push this
> >> through
> >> > > > once
> >> > > > > > >>>>>>> contributor
> >> > > > > > >>>>>>>>>> feedback is given.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Main points that need addressing:
> >> > > > > > >>>>>>>>>> 1) Any way (or benefit) in structuring the current
> >> > > singular
> >> > > > > > graph
> >> > > > > > >>>>> node
> >> > > > > > >>>>>>>>> into
> >> > > > > > >>>>>>>>>> multiple nodes? It has a whopping 25 parameters
> right
> >> > > now. I
> >> > > > > am
> >> > > > > > a
> >> > > > > > >>>>> bit
> >> > > > > > >>>>>>>>> fuzzy
> >> > > > > > >>>>>>>>>> on how the optimizations are supposed to work, so I
> >> > would
> >> > > > > > >>>>> appreciate
> >> > > > > > >>>>>>> any
> >> > > > > > >>>>>>>>>> help on this aspect.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> 2) Overall strategy for joining + resolving. This
> >> thread
> >> > > has
> >> > > > > > much
> >> > > > > > >>>>>>>>> discourse
> >> > > > > > >>>>>>>>>> between Jan and I between the current highwater
> mark
> >> > > > proposal
> >> > > > > > and
> >> > > > > > >> a
> >> > > > > > >>>>>>>>> groupBy
> >> > > > > > >>>>>>>>>> + reduce proposal. I am of the opinion that we need
> >> to
> >> > > > > strictly
> >> > > > > > >>>>> handle
> >> > > > > > >>>>>>>>> any
> >> > > > > > >>>>>>>>>> chance of out-of-order data and leave none of it up
> >> to
> >> > the
> >> > > > > > >>>>> consumer.
> >> > > > > > >>>>>>> Any
> >> > > > > > >>>>>>>>>> comments or suggestions here would also help.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> 3) Anything else that you see that would prevent
> this
> >> > from
> >> > > > > > moving
> >> > > > > > >>>>> to a
> >> > > > > > >>>>>>>>> vote?
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Thanks
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Adam
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> >> > > > > > >>>>>>>>> adam.bellem...@gmail.com>
> >> > > > > > >>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Hi Jan
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> With the Stores.windowStoreBuilder and
> >> > > > > > >>>>> Stores.persistentWindowStore,
> >> > > > > > >>>>>>> you
> >> > > > > > >>>>>>>>>>> actually only need to specify the amount of
> segments
> >> > you
> >> > > > want
> >> > > > > > and
> >> > > > > > >>>>> how
> >> > > > > > >>>>>>>>> large
> >> > > > > > >>>>>>>>>>> they are. To the best of my understanding, what
> >> happens
> >> > > is
> >> > > > > that
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>>>> segments are automatically rolled over as new data
> >> with
> >> > > new
> >> > > > > > >>>>>>> timestamps
> >> > > > > > >>>>>>>>> are
> >> > > > > > >>>>>>>>>>> created. We use this exact functionality in some
> of
> >> the
> >> > > > work
> >> > > > > > done
> >> > > > > > >>>>>>>>>>> internally at my company. For reference, this is
> the
> >> > > > hopping
> >> > > > > > >>>>> windowed
> >> > > > > > >>>>>>>>> store.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> In the code that I have provided, there are going
> >> to be
> >> > > two
> >> > > > > 24h
> >> > > > > > >>>>>>>>> segments.
> >> > > > > > >>>>>>>>>>> When a record is put into the windowStore, it will
> >> be
> >> > > > > inserted
> >> > > > > > at
> >> > > > > > >>>>>>> time
> >> > > > > > >>>>>>>>> T in
> >> > > > > > >>>>>>>>>>> both segments. The two segments will always
> overlap
> >> by
> >> > > 12h.
> >> > > > > As
> >> > > > > > >>>>> time
> >> > > > > > >>>>>>>>> goes on
> >> > > > > > >>>>>>>>>>> and new records are added (say at time T+12h+),
> the
> >> > > oldest
> >> > > > > > >> segment
> >> > > > > > >>>>>>> will
> >> > > > > > >>>>>>>>> be
> >> > > > > > >>>>>>>>>>> automatically deleted and a new segment created.
> The
> >> > > > records
> >> > > > > > are
> >> > > > > > >>>>> by
> >> > > > > > >>>>>>>>> default
> >> > > > > > >>>>>>>>>>> inserted with the context.timestamp(), such that
> it
> >> is
> >> > > the
> >> > > > > > record
> >> > > > > > >>>>>>> time,
> >> > > > > > >>>>>>>>> not
> >> > > > > > >>>>>>>>>>> the clock time, which is used.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> To the best of my understanding, the timestamps
> are
> >> > > > retained
> >> > > > > > when
> >> > > > > > >>>>>>>>>>> restoring from the changelog.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Basically, this is heavy-handed way to deal with
> TTL
> >> > at a
> >> > > > > > >>>>>>> segment-level,
> >> > > > > > >>>>>>>>>>> instead of at an individual record level.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> >> > > > > > >>>>>>> jan.filip...@trivago.com>
> >> > > > > > >>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Will that work? I expected it to blow up with
> >> > > > > > ClassCastException
> >> > > > > > >>>>> or
> >> > > > > > >>>>>>>>>>>> similar.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> You either would have to specify the window you
> >> > > fetch/put
> >> > > > or
> >> > > > > > >>>>> iterate
> >> > > > > > >>>>>>>>>>>> across all windows the key was found in right?
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> I just hope the window-store doesn't check
> >> stream-time
> >> > > > under
> >> > > > > > the
> >> > > > > > >>>>>>> hoods
> >> > > > > > >>>>>>>>>>>> that would be a questionable interface.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> If it does: did you see my comment on checking
> all
> >> the
> >> > > > > windows
> >> > > > > > >>>>>>> earlier?
> >> > > > > > >>>>>>>>>>>> that would be needed to actually give reasonable
> >> time
> >> > > > > > gurantees.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Best
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> >> > > > > > >>>>>>>>>>>>> Hi Jan
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
> >> changed
> >> > > the
> >> > > > > > state
> >> > > > > > >>>>>>> store,
> >> > > > > > >>>>>>>>>>>> not
> >> > > > > > >>>>>>>>>>>>> the ProcessorSupplier.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Thanks,
> >> > > > > > >>>>>>>>>>>>> Adam
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> @Guozhang
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> Thanks for the information. This is indeed
> >> > something
> >> > > > that
> >> > > > > > >>>>> will be
> >> > > > > > >>>>>>>>>>>>>>> extremely
> >> > > > > > >>>>>>>>>>>>>>> useful for this KIP.
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> @Jan
> >> > > > > > >>>>>>>>>>>>>>> Thanks for your explanations. That being
> said, I
> >> > will
> >> > > > not
> >> > > > > > be
> >> > > > > > >>>>>>> moving
> >> > > > > > >>>>>>>>>>>> ahead
> >> > > > > > >>>>>>>>>>>>>>> with an implementation using reshuffle/groupBy
> >> > > solution
> >> > > > > as
> >> > > > > > >> you
> >> > > > > > >>>>>>>>>>>> propose.
> >> > > > > > >>>>>>>>>>>>>>> That being said, if you wish to implement it
> >> > yourself
> >> > > > off
> >> > > > > > of
> >> > > > > > >>>>> my
> >> > > > > > >>>>>>>>>>>> current PR
> >> > > > > > >>>>>>>>>>>>>>> and submit it as a competitive alternative, I
> >> would
> >> > > be
> >> > > > > more
> >> > > > > > >>>>> than
> >> > > > > > >>>>>>>>>>>> happy to
> >> > > > > > >>>>>>>>>>>>>>> help vet that as an alternate solution. As it
> >> > stands
> >> > > > > right
> >> > > > > > >>>>> now,
> >> > > > > > >>>>>>> I do
> >> > > > > > >>>>>>>>>>>> not
> >> > > > > > >>>>>>>>>>>>>>> really have more time to invest into
> >> alternatives
> >> > > > without
> >> > > > > > >>>>> there
> >> > > > > > >>>>>>>>> being
> >> > > > > > >>>>>>>>>>>> a
> >> > > > > > >>>>>>>>>>>>>>> strong indication from the binding voters
> which
> >> > they
> >> > > > > would
> >> > > > > > >>>>>>> prefer.
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> Hey, total no worries. I think I personally
> gave
> >> up
> >> > on
> >> > > > the
> >> > > > > > >>>>> streams
> >> > > > > > >>>>>>>>> DSL
> >> > > > > > >>>>>>>>>>>> for
> >> > > > > > >>>>>>>>>>>>>> some time already, otherwise I would have
> pulled
> >> > this
> >> > > > KIP
> >> > > > > > >>>>> through
> >> > > > > > >>>>>>>>>>>> already.
> >> > > > > > >>>>>>>>>>>>>> I am currently reimplementing my own DSL based
> on
> >> > > PAPI.
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> I will look at finishing up my PR with the
> >> windowed
> >> > > > state
> >> > > > > > >>>>> store
> >> > > > > > >>>>>>> in
> >> > > > > > >>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>> next
> >> > > > > > >>>>>>>>>>>>>>> week or so, exercising it via tests, and then
> I
> >> > will
> >> > > > come
> >> > > > > > >> back
> >> > > > > > >>>>>>> for
> >> > > > > > >>>>>>>>>>>> final
> >> > > > > > >>>>>>>>>>>>>>> discussions. In the meantime, I hope that any
> of
> >> > the
> >> > > > > > binding
> >> > > > > > >>>>>>> voters
> >> > > > > > >>>>>>>>>>>> could
> >> > > > > > >>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
> >> updated
> >> > it
> >> > > > > > >>>>> according
> >> > > > > > >>>>>>> to
> >> > > > > > >>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>> latest plan:
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >> > > > > > >>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> I have also updated the KIP PR to use a
> windowed
> >> > > store.
> >> > > > > > This
> >> > > > > > >>>>>>> could
> >> > > > > > >>>>>>>>> be
> >> > > > > > >>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
> they
> >> > are
> >> > > > > > >>>>> completed.
> >> > > > > > >>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> Thanks,
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> Adam
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
> >> > already
> >> > > > > > updated
> >> > > > > > >>>>> in
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> PR?
> >> > > > > > >>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
> Missing
> >> > > > > something?
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
> Wang <
> >> > > > > > >>>>>>> wangg...@gmail.com>
> >> > > > > > >>>>>>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533 is
> >> the
> >> > > > wrong
> >> > > > > > >> link,
> >> > > > > > >>>>>>> as it
> >> > > > > > >>>>>>>>>>>> is
> >> > > > > > >>>>>>>>>>>>>>>> for
> >> > > > > > >>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
> >> part of
> >> > > > > KIP-258
> >> > > > > > >>>>> we do
> >> > > > > > >>>>>>>>>>>> want to
> >> > > > > > >>>>>>>>>>>>>>>> have "handling out-of-order data for source
> >> > KTable"
> >> > > > such
> >> > > > > > >> that
> >> > > > > > >>>>>>>>>>>> instead of
> >> > > > > > >>>>>>>>>>>>>>>> blindly apply the updates to the materialized
> >> > store,
> >> > > > > i.e.
> >> > > > > > >>>>>>> following
> >> > > > > > >>>>>>>>>>>>>>>> offset
> >> > > > > > >>>>>>>>>>>>>>>> ordering, we will reject updates that are
> older
> >> > than
> >> > > > the
> >> > > > > > >>>>> current
> >> > > > > > >>>>>>>>>>>> key's
> >> > > > > > >>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
> ordering.
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>> Guozhang
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
> >> Wang <
> >> > > > > > >>>>>>>>> wangg...@gmail.com>
> >> > > > > > >>>>>>>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>> Hello Adam,
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
> >> final
> >> > > step
> >> > > > > > (i.e.
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>> high
> >> > > > > > >>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
> >> with
> >> > a
> >> > > > > window
> >> > > > > > >>>>>>> store),
> >> > > > > > >>>>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>> think
> >> > > > > > >>>>>>>>>>>>>>>>> another current on-going KIP may actually
> >> help:
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> This is for adding the timestamp into a
> >> key-value
> >> > > > store
> >> > > > > > >>>>> (i.e.
> >> > > > > > >>>>>>> only
> >> > > > > > >>>>>>>>>>>> for
> >> > > > > > >>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
> >> usage,
> >> > as
> >> > > > > > >>>>> described
> >> > > > > > >>>>>>> in
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> https://issues.apache.org/jira/browse/KAFKA-5533
> >> > ,
> >> > > is
> >> > > > > > that
> >> > > > > > >>>>> we
> >> > > > > > >>>>>>> can
> >> > > > > > >>>>>>>>>>>> then
> >> > > > > > >>>>>>>>>>>>>>>>> "reject" updates from the source topics if
> its
> >> > > > > timestamp
> >> > > > > > is
> >> > > > > > >>>>>>>>> smaller
> >> > > > > > >>>>>>>>>>>> than
> >> > > > > > >>>>>>>>>>>>>>>>> the current key's latest update timestamp. I
> >> > think
> >> > > it
> >> > > > > is
> >> > > > > > >>>>> very
> >> > > > > > >>>>>>>>>>>> similar to
> >> > > > > > >>>>>>>>>>>>>>>>> what you have in mind for high watermark
> based
> >> > > > > filtering,
> >> > > > > > >>>>> while
> >> > > > > > >>>>>>>>> you
> >> > > > > > >>>>>>>>>>>> only
> >> > > > > > >>>>>>>>>>>>>>>>> need to make sure that the timestamps of the
> >> > > joining
> >> > > > > > >> records
> >> > > > > > >>>>>>> are
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>> correctly
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> inherited though the whole topology to the
> >> final
> >> > > > stage.
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
> and
> >> > hence
> >> > > > > > >>>>>>> non-windowed
> >> > > > > > >>>>>>>>>>>> KTables
> >> > > > > > >>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
> >> really
> >> > > have
> >> > > > a
> >> > > > > > good
> >> > > > > > >>>>>>>>> support
> >> > > > > > >>>>>>>>>>>> for
> >> > > > > > >>>>>>>>>>>>>>>>> their joins anyways (
> >> > > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7107
> )
> >> > > > > > >>>>>>>>>>>>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>> think we can just consider non-windowed
> >> > > KTable-KTable
> >> > > > > > >>>>> non-key
> >> > > > > > >>>>>>>>> joins
> >> > > > > > >>>>>>>>>>>> for
> >> > > > > > >>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> Guozhang
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
> Filipiak
> >> <
> >> > > > > > >>>>>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> Hi Guozhang
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Current highwater mark implementation
> would
> >> > grow
> >> > > > > > >> endlessly
> >> > > > > > >>>>>>> based
> >> > > > > > >>>>>>>>>>>> on
> >> > > > > > >>>>>>>>>>>>>>>>>>> primary key of original event. It is a
> pair
> >> of
> >> > > > (<this
> >> > > > > > >>>>> table
> >> > > > > > >>>>>>>>>>>> primary
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> key>,
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> <highest offset seen for that key>). This is
> >> used
> >> > > to
> >> > > > > > >>>>>>> differentiate
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> between
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
> >> proposal
> >> > > > would
> >> > > > > > be
> >> > > > > > >>>>> to
> >> > > > > > >>>>>>>>>>>> replace
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> it
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
> >> This
> >> > > would
> >> > > > > > allow
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>> same
> >> > > > > > >>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on time.
> >> This
> >> > > > > should
> >> > > > > > >>>>> allow
> >> > > > > > >>>>>>> for
> >> > > > > > >>>>>>>>>>>> all
> >> > > > > > >>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
> >> > should
> >> > > be
> >> > > > > > >>>>>>> customizable
> >> > > > > > >>>>>>>>>>>> by
> >> > > > > > >>>>>>>>>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
> >> perhaps
> >> > > just
> >> > > > > 10
> >> > > > > > >>>>>>> minutes
> >> > > > > > >>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> window,
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> or perhaps 7 days...).
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can do
> >> the
> >> > > > trick
> >> > > > > > >> here.
> >> > > > > > >>>>>>> Even
> >> > > > > > >>>>>>>>>>>> if I
> >> > > > > > >>>>>>>>>>>>>>>>>> would still like to see the automatic
> >> > > repartitioning
> >> > > > > > >>>>> optional
> >> > > > > > >>>>>>>>>>>> since I
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
> >> am a
> >> > > > little
> >> > > > > > bit
> >> > > > > > >>>>>>>>>>>> sceptical
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> about
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> how to determine the window. So esentially
> one
> >> > > could
> >> > > > > run
> >> > > > > > >>>>> into
> >> > > > > > >>>>>>>>>>>> problems
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> when
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> the rapid change happens near a window
> >> border. I
> >> > > will
> >> > > > > > check
> >> > > > > > >>>>> you
> >> > > > > > >>>>>>>>>>>>>>>>>> implementation in detail, if its
> >> problematic, we
> >> > > > could
> >> > > > > > >>>>> still
> >> > > > > > >>>>>>>>> check
> >> > > > > > >>>>>>>>>>>>>>>>>> _all_
> >> > > > > > >>>>>>>>>>>>>>>>>> windows on read with not to bad performance
> >> > > impact I
> >> > > > > > >> guess.
> >> > > > > > >>>>>>> Will
> >> > > > > > >>>>>>>>>>>> let
> >> > > > > > >>>>>>>>>>>>>>>>>> you
> >> > > > > > >>>>>>>>>>>>>>>>>> know if the implementation would be correct
> >> as
> >> > > is. I
> >> > > > > > >>>>> wouldn't
> >> > > > > > >>>>>>> not
> >> > > > > > >>>>>>>>>>>> like
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> >> > > timestamp(A)  <
> >> > > > > > >>>>>>>>> timestamp(B).
> >> > > > > > >>>>>>>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> think
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> we can't expect that.
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> @Jan
> >> > > > > > >>>>>>>>>>>>>>>>>>> I believe I understand what you mean now -
> >> > thanks
> >> > > > for
> >> > > > > > the
> >> > > > > > >>>>>>>>>>>> diagram, it
> >> > > > > > >>>>>>>>>>>>>>>>>>> did really help. You are correct that I do
> >> not
> >> > > have
> >> > > > > the
> >> > > > > > >>>>>>> original
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> primary
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> key available, and I can see that if it was
> >> > > available
> >> > > > > > then
> >> > > > > > >>>>> you
> >> > > > > > >>>>>>>>>>>> would be
> >> > > > > > >>>>>>>>>>>>>>>>>>> able to add and remove events from the
> Map.
> >> > That
> >> > > > > being
> >> > > > > > >>>>> said,
> >> > > > > > >>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> encourage
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
> for
> >> > > clarity
> >> > > > > for
> >> > > > > > >>>>>>> everyone
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> else.
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just really
> >> hard
> >> > > > work.
> >> > > > > > But
> >> > > > > > >>>>> I
> >> > > > > > >>>>>>>>>>>> understand
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> >> > > original
> >> > > > > > >> primary
> >> > > > > > >>>>>>> key,
> >> > > > > > >>>>>>>>> We
> >> > > > > > >>>>>>>>>>>>>>>>>> have
> >> > > > > > >>>>>>>>>>>>>>>>>> join and Group by implemented our own in
> PAPI
> >> > and
> >> > > > > > >> basically
> >> > > > > > >>>>>>> not
> >> > > > > > >>>>>>>>>>>> using
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> any
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
> missed
> >> > that
> >> > > in
> >> > > > > > >>>>> original
> >> > > > > > >>>>>>> DSL
> >> > > > > > >>>>>>>>>>>> its
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> not
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
> >> up on
> >> > > my
> >> > > > > end.
> >> > > > > > >>>>> Will
> >> > > > > > >>>>>>>>>>>> finish
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
> >> week.
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> My follow up question for you is, won't the
> >> Map
> >> > > stay
> >> > > > > > >> inside
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> State
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
> changes
> >> > have
> >> > > > > > >>>>> propagated?
> >> > > > > > >>>>>>>>> Isn't
> >> > > > > > >>>>>>>>>>>>>>>>>>> this
> >> > > > > > >>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
> >> state
> >> > > > store?
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
> >> substractor
> >> > is
> >> > > > > gonna
> >> > > > > > >>>>>>> return
> >> > > > > > >>>>>>>>>>>> `null`
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
> >> there
> >> > is
> >> > > > > going
> >> > > > > > to
> >> > > > > > >>>>> be
> >> > > > > > >>>>>>> a
> >> > > > > > >>>>>>>>>>>> store
> >> > > > > > >>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use this
> >> > store
> >> > > > > > directly
> >> > > > > > >>>>> for
> >> > > > > > >>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
> is a
> >> > > > regular
> >> > > > > > >>>>> store,
> >> > > > > > >>>>>>>>>>>> satisfying
> >> > > > > > >>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
> >> join.
> >> > > The
> >> > > > > > >>>>> Windowed
> >> > > > > > >>>>>>>>>>>> store is
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> not
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> keeping the values, so for the next
> statefull
> >> > > > operation
> >> > > > > > we
> >> > > > > > >>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
> >> have
> >> > the
> >> > > > > > window
> >> > > > > > >>>>>>> store
> >> > > > > > >>>>>>>>>>>> also
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> have
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> the values then.
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
> custom
> >> > group
> >> > > > by
> >> > > > > > >>>>> before
> >> > > > > > >>>>>>>>>>>>>>>>>> repartitioning to the original primary key
> i
> >> > think
> >> > > > it
> >> > > > > > >> would
> >> > > > > > >>>>>>> help
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> users
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> big time in building efficient apps. Given
> the
> >> > > > original
> >> > > > > > >>>>> primary
> >> > > > > > >>>>>>>>> key
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> issue I
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> understand that we do not have a solid
> >> foundation
> >> > > to
> >> > > > > > build
> >> > > > > > >>>>> on.
> >> > > > > > >>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
> user.
> >> > very
> >> > > > > > >>>>>>> unfortunate. I
> >> > > > > > >>>>>>>>>>>> could
> >> > > > > > >>>>>>>>>>>>>>>>>> understand the decision goes like that. I
> do
> >> not
> >> > > > think
> >> > > > > > its
> >> > > > > > >>>>> a
> >> > > > > > >>>>>>> good
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> decision.
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> Thanks
> >> > > > > > >>>>>>>>>>>>>>>>>>> Adam
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta
> >> > > Dumbre <
> >> > > > > > >>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
> >> > > > > > >>>>>>> dumbreprajakta...@gmail.com
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           please remove me from this group
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           On Tue, Sep 11, 2018 at 1:29 PM
> >> Jan
> >> > > > > Filipiak
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
> >> <mailto:
> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Hi Adam,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > give me some time, will make
> >> such a
> >> > > > > chart.
> >> > > > > > >> last
> >> > > > > > >>>>>>> time i
> >> > > > > > >>>>>>>>>>>> didn't
> >> > > > > > >>>>>>>>>>>>>>>>>>>           get along
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > well with giphy and ruined all
> >> your
> >> > > > > charts.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Hopefully i can get it done
> >> today
> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > On 08.09.2018 16:00, Adam
> >> Bellemare
> >> > > > > wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > Hi Jan
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > I have included a diagram of
> >> > what I
> >> > > > > > >> attempted
> >> > > > > > >>>>> on
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> KIP.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>
> >> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> >> > > > > > >>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <
> >> > > > > > >>>>>>>>>>>>
> >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>
> >> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> >> > > > > > >>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > I attempted this back at the
> >> > start
> >> > > of
> >> > > > > my
> >> > > > > > own
> >> > > > > > >>>>>>>>>>>> implementation
> >> > > > > > >>>>>>>>>>>>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           this
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > solution, and since I could
> >> not
> >> > get
> >> > > > it
> >> > > > > to
> >> > > > > > >>>>> work I
> >> > > > > > >>>>>>> have
> >> > > > > > >>>>>>>>>>>> since
> >> > > > > > >>>>>>>>>>>>>>>>>>>           discarded the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > code. At this point in time,
> >> if
> >> > you
> >> > > > > wish
> >> > > > > > to
> >> > > > > > >>>>>>> continue
> >> > > > > > >>>>>>>>>>>> pursuing
> >> > > > > > >>>>>>>>>>>>>>>>>>>           for your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > groupBy solution, I ask that
> >> you
> >> > > > please
> >> > > > > > >>>>> create a
> >> > > > > > >>>>>>>>>>>> diagram on
> >> > > > > > >>>>>>>>>>>>>>>>>>>           the KIP
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > carefully explaining your
> >> > solution.
> >> > > > > > Please
> >> > > > > > >>>>> feel
> >> > > > > > >>>>>>> free
> >> > > > > > >>>>>>>>> to
> >> > > > > > >>>>>>>>>>>> use
> >> > > > > > >>>>>>>>>>>>>>>>>>>           the image I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > just posted as a starting
> >> point.
> >> > I
> >> > > am
> >> > > > > > having
> >> > > > > > >>>>>>> trouble
> >> > > > > > >>>>>>>>>>>>>>>>>>>           understanding your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > explanations but I think
> that
> >> a
> >> > > > > carefully
> >> > > > > > >>>>>>> constructed
> >> > > > > > >>>>>>>>>>>> diagram
> >> > > > > > >>>>>>>>>>>>>>>>>>>           will clear
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > up
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > any misunderstandings.
> >> > Alternately,
> >> > > > > > please
> >> > > > > > >>>>> post a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           comprehensive PR with
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > your solution. I can only
> >> guess
> >> > at
> >> > > > what
> >> > > > > > you
> >> > > > > > >>>>>>> mean, and
> >> > > > > > >>>>>>>>>>>> since I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           value my
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > own
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > time as much as you value
> >> yours,
> >> > I
> >> > > > > > believe
> >> > > > > > >> it
> >> > > > > > >>>>> is
> >> > > > > > >>>>>>> your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           responsibility to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > provide an implementation
> >> instead
> >> > > of
> >> > > > me
> >> > > > > > >>>>> trying to
> >> > > > > > >>>>>>>>> guess.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > Adam
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > On Sat, Sep 8, 2018 at 8:00
> >> AM,
> >> > Jan
> >> > > > > > Filipiak
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
> >> <mailto:
> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > > wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Hi James,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> nice to see you beeing
> >> > interested.
> >> > > > > kafka
> >> > > > > > >>>>>>> streams at
> >> > > > > > >>>>>>>>>>>> this
> >> > > > > > >>>>>>>>>>>>>>>>>>>           point supports
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> all sorts of joins as long
> as
> >> > both
> >> > > > > > streams
> >> > > > > > >>>>> have
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> same
> >> > > > > > >>>>>>>>>>>>>>>>>>> key.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Adam is currently
> >> implementing a
> >> > > > join
> >> > > > > > >> where a
> >> > > > > > >>>>>>> KTable
> >> > > > > > >>>>>>>>>>>> and a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           KTable can
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> a one to many relation ship
> >> > (1:n).
> >> > > > We
> >> > > > > > >> exploit
> >> > > > > > >>>>>>> that
> >> > > > > > >>>>>>>>>>>> rocksdb
> >> > > > > > >>>>>>>>>>>>>>>>>>> is
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> a
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > >> datastore that keeps data
> >> sorted
> >> > (At
> >> > > > > least
> >> > > > > > >>>>>>> exposes an
> >> > > > > > >>>>>>>>>>>> API to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           access the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> stored data in a sorted
> >> > fashion).
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I think the technical
> caveats
> >> > are
> >> > > > well
> >> > > > > > >>>>>>> understood
> >> > > > > > >>>>>>>>> now
> >> > > > > > >>>>>>>>>>>> and we
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> are
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > basically
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> down to philosophy and API
> >> > Design
> >> > > (
> >> > > > > when
> >> > > > > > >> Adam
> >> > > > > > >>>>>>> sees
> >> > > > > > >>>>>>>>> my
> >> > > > > > >>>>>>>>>>>> newest
> >> > > > > > >>>>>>>>>>>>>>>>>>>           message).
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> I have a lengthy track
> >> record of
> >> > > > > loosing
> >> > > > > > >>>>> those
> >> > > > > > >>>>>>> kinda
> >> > > > > > >>>>>>>>>>>>>>>>>>>           arguments within
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> streams community and I
> have
> >> no
> >> > > clue
> >> > > > > > why.
> >> > > > > > >> So
> >> > > > > > >>>>> I
> >> > > > > > >>>>>>>>>>>> literally
> >> > > > > > >>>>>>>>>>>>>>>>>>>           can't wait for
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > you
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> to churn through this
> thread
> >> and
> >> > > > give
> >> > > > > > you
> >> > > > > > >>>>>>> opinion on
> >> > > > > > >>>>>>>>>>>> how we
> >> > > > > > >>>>>>>>>>>>>>>>>>>           should
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > design
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the return type of the
> >> > > oneToManyJoin
> >> > > > > and
> >> > > > > > >> how
> >> > > > > > >>>>>>> many
> >> > > > > > >>>>>>>>>>>> power we
> >> > > > > > >>>>>>>>>>>>>>>>>>>           want to give
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> the user vs "simplicity"
> >> (where
> >> > > > > > simplicity
> >> > > > > > >>>>> isn't
> >> > > > > > >>>>>>>>>>>> really that
> >> > > > > > >>>>>>>>>>>>>>>>>>>           as users
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > still
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> need to understand it I
> >> argue)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> waiting for you to join in
> on
> >> > the
> >> > > > > > >> discussion
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> Best Jan
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >> On 07.09.2018 15:49, James
> >> Kwan
> >> > > > wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I am new to this group
> and I
> >> > > found
> >> > > > > this
> >> > > > > > >>>>> subject
> >> > > > > > >>>>>>>>>>>>>>>>>>>           interesting.  Sounds
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > like
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> you guys want to
> implement a
> >> > join
> >> > > > > > table of
> >> > > > > > >>>>> two
> >> > > > > > >>>>>>>>>>>> streams? Is
> >> > > > > > >>>>>>>>>>>>>>>>>>> there
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > somewhere
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> I can see the original
> >> > > requirement
> >> > > > or
> >> > > > > > >>>>> proposal?
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>> On Sep 7, 2018, at 8:13
> AM,
> >> Jan
> >> > > > > > Filipiak
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <jan.filip...@trivago.com
> >> <mailto:
> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> On 05.09.2018 22:17, Adam
> >> > > > Bellemare
> >> > > > > > >> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> I'm currently testing
> >> using a
> >> > > > > > Windowed
> >> > > > > > >>>>> Store
> >> > > > > > >>>>>>> to
> >> > > > > > >>>>>>>>>>>> store the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           highwater
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> mark.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> By all indications this
> >> > should
> >> > > > work
> >> > > > > > >> fine,
> >> > > > > > >>>>>>> with
> >> > > > > > >>>>>>>>> the
> >> > > > > > >>>>>>>>>>>> caveat
> >> > > > > > >>>>>>>>>>>>>>>>>>>           being that
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > it
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> can
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> only resolve
> out-of-order
> >> > > arrival
> >> > > > > > for up
> >> > > > > > >>>>> to
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> size of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           the window
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > (ie:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> 24h, 72h, etc). This
> would
> >> > > remove
> >> > > > > the
> >> > > > > > >>>>>>> possibility
> >> > > > > > >>>>>>>>>>>> of it
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> being
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > unbounded
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> in
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> size.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> With regards to Jan's
> >> > > > suggestion, I
> >> > > > > > >>>>> believe
> >> > > > > > >>>>>>> this
> >> > > > > > >>>>>>>>> is
> >> > > > > > >>>>>>>>>>>> where
> >> > > > > > >>>>>>>>>>>>>>>>>>>           we will
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> remain in disagreement.
> >> > While I
> >> > > > do
> >> > > > > > not
> >> > > > > > >>>>>>> disagree
> >> > > > > > >>>>>>>>>>>> with your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           statement
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> about
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> there likely to be
> >> additional
> >> > > > joins
> >> > > > > > done
> >> > > > > > >>>>> in a
> >> > > > > > >>>>>>>>>>>> real-world
> >> > > > > > >>>>>>>>>>>>>>>>>>>           workflow, I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > do
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> not
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> see how you can
> >> conclusively
> >> > > deal
> >> > > > > > with
> >> > > > > > >>>>>>>>> out-of-order
> >> > > > > > >>>>>>>>>>>>>>>>>>> arrival
> >> > > > > > >>>>>>>>>>>>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> foreign-key
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> changes and subsequent
> >> > joins. I
> >> > > > > have
> >> > > > > > >>>>>>> attempted
> >> > > > > > >>>>>>>>> what
> >> > > > > > >>>>>>>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           think you have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> proposed (without a
> >> > high-water,
> >> > > > > using
> >> > > > > > >>>>>>> groupBy and
> >> > > > > > >>>>>>>>>>>> reduce)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           and found
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> that if
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> the foreign key changes
> >> too
> >> > > > > quickly,
> >> > > > > > or
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>> load
> >> > > > > > >>>>>>>>> on
> >> > > > > > >>>>>>>>>>>> a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           stream thread
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > is
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> too
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> high, the joined
> messages
> >> > will
> >> > > > > arrive
> >> > > > > > >>>>>>>>> out-of-order
> >> > > > > > >>>>>>>>>>>> and be
> >> > > > > > >>>>>>>>>>>>>>>>>>>           incorrectly
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> propagated, such that an
> >> > > > > intermediate
> >> > > > > > >>>>> event
> >> > > > > > >>>>>>> is
> >> > > > > > >>>>>>>>>>>>>>>>>>> represented
> >> > > > > > >>>>>>>>>>>>>>>>>>>           as the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > final
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> event.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Can you shed some light
> on
> >> > your
> >> > > > > > groupBy
> >> > > > > > >>>>>>>>>>>> implementation.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           There must be
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> some sort of flaw in it.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I have a suspicion where
> it
> >> > is,
> >> > > I
> >> > > > > > would
> >> > > > > > >>>>> just
> >> > > > > > >>>>>>> like
> >> > > > > > >>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           confirm. The idea
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> is bullet proof and it
> >> must be
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> an implementation mess
> up.
> >> I
> >> > > would
> >> > > > > > like
> >> > > > > > >> to
> >> > > > > > >>>>>>> clarify
> >> > > > > > >>>>>>>>>>>> before
> >> > > > > > >>>>>>>>>>>>>>>>>>>           we draw a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> conclusion.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>    Repartitioning the
> >> > scattered
> >> > > > > events
> >> > > > > > >>>>> back to
> >> > > > > > >>>>>>>>> their
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> original
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>> partitions is the only
> way I
> >> > know
> >> > > > how
> >> > > > > > to
> >> > > > > > >>>>>>>>> conclusively
> >> > > > > > >>>>>>>>>>>> deal
> >> > > > > > >>>>>>>>>>>>>>>>>>>           with
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> out-of-order events in a
> >> > given
> >> > > > time
> >> > > > > > >> frame,
> >> > > > > > >>>>>>> and to
> >> > > > > > >>>>>>>>>>>> ensure
> >> > > > > > >>>>>>>>>>>>>>>>>>>           that the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > data
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> is
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> eventually consistent
> with
> >> > the
> >> > > > > input
> >> > > > > > >>>>> events.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> If you have some code to
> >> > share
> >> > > > that
> >> > > > > > >>>>>>> illustrates
> >> > > > > > >>>>>>>>> your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           approach, I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > would
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> be
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> very grateful as it
> would
> >> > > remove
> >> > > > > any
> >> > > > > > >>>>>>>>>>>> misunderstandings
> >> > > > > > >>>>>>>>>>>>>>>>>>>           that I may
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> ah okay you were looking
> >> for
> >> > my
> >> > > > > code.
> >> > > > > > I
> >> > > > > > >>>>> don't
> >> > > > > > >>>>>>> have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           something easily
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> readable here as its
> >> bloated
> >> > > with
> >> > > > > > >>>>> OO-patterns.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> its anyhow trivial:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> @Override
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      public T apply(K
> >> aggKey,
> >> > V
> >> > > > > > value, T
> >> > > > > > >>>>>>>>> aggregate)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      {
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          Map<U, V>
> >> > > > > currentStateAsMap =
> >> > > > > > >>>>>>>>>>>> asMap(aggregate);
> >> > > > > > >>>>>>>>>>>>>>>>>>> <<
> >> > > > > > >>>>>>>>>>>>>>>>>>>           imaginary
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          U toModifyKey =
> >> > > > > > >>>>> mapper.apply(value);
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << this is
> the
> >> > > place
> >> > > > > > where
> >> > > > > > >>>>> people
> >> > > > > > >>>>>>>>>>>> actually
> >> > > > > > >>>>>>>>>>>>>>>>>>>           gonna have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > issues
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and why you probably
> >> couldn't
> >> > do
> >> > > > it.
> >> > > > > > we
> >> > > > > > >>>>> would
> >> > > > > > >>>>>>> need
> >> > > > > > >>>>>>>>>>>> to find
> >> > > > > > >>>>>>>>>>>>>>>>>>>           a solution
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > here.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> I didn't realize that
> yet.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << we
> >> propagate
> >> > the
> >> > > > > > field in
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>>>>> joiner, so
> >> > > > > > >>>>>>>>>>>>>>>>>>>           that we can
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > pick
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> it up in an aggregate.
> >> > Probably
> >> > > > you
> >> > > > > > have
> >> > > > > > >>>>> not
> >> > > > > > >>>>>>>>> thought
> >> > > > > > >>>>>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           this in your
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> approach right?
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I am very
> >> open
> >> > > to
> >> > > > > > find a
> >> > > > > > >>>>>>> generic
> >> > > > > > >>>>>>>>>>>> solution
> >> > > > > > >>>>>>>>>>>>>>>>>>>           here. In my
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> honest opinion this is
> >> broken
> >> > in
> >> > > > > > >>>>>>>>> KTableImpl.GroupBy
> >> > > > > > >>>>>>>>>>>> that
> >> > > > > > >>>>>>>>>>>>>>>>>>> it
> >> > > > > > >>>>>>>>>>>>>>>>>>>           looses
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the keys
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> and only maintains the
> >> > aggregate
> >> > > > > key.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << I
> >> abstracted
> >> > it
> >> > > > away
> >> > > > > > back
> >> > > > > > >>>>>>> then way
> >> > > > > > >>>>>>>>>>>> before
> >> > > > > > >>>>>>>>>>>>>>>>>>> i
> >> > > > > > >>>>>>>>>>>>>>>>>>> was
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > thinking
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> of oneToMany join. That
> is
> >> > why I
> >> > > > > > didn't
> >> > > > > > >>>>>>> realize
> >> > > > > > >>>>>>>>> its
> >> > > > > > >>>>>>>>>>>>>>>>>>>           significance here.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              << Opinions?
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          for (V m :
> >> current)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > currentStateAsMap.put(mapper.apply(m),
> >> > > > > > >> m);
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          if (isAdder)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > currentStateAsMap.put(toModifyKey,
> >> > > > > > >> value);
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          else
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          {
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > currentStateAsMap.remove(toModifyKey);
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > if(currentStateAsMap.isEmpty()){
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>                  return
> >> null;
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>              }
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          }
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>          retrun
> >> > > > > > >>>>>>> asAggregateType(currentStateAsMap)
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>      }
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>> Thanks,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Adam
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> On Wed, Sep 5, 2018 at
> >> 3:35
> >> > PM,
> >> > > > Jan
> >> > > > > > >>>>> Filipiak
> >> > > > > > >>>>>>> <
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > jan.filip...@trivago.com
> >> <mailto:
> >> > > > > > >>>>>>>>> jan.filip...@trivago.com
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>> Thanks Adam for bringing
> >> > > Matthias
> >> > > > > to
> >> > > > > > >>>>> speed!
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> about the differences.
> I
> >> > think
> >> > > > > > >> re-keying
> >> > > > > > >>>>>>> back
> >> > > > > > >>>>>>>>>>>> should be
> >> > > > > > >>>>>>>>>>>>>>>>>>>           optional at
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> best.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I would say we return a
> >> > > > > > KScatteredTable
> >> > > > > > >>>>> with
> >> > > > > > >>>>>>>>>>>> reshuffle()
> >> > > > > > >>>>>>>>>>>>>>>>>>>           returning
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> KTable<originalKey,Joined>
> >> > to
> >> > > > make
> >> > > > > > the
> >> > > > > > >>>>>>> backwards
> >> > > > > > >>>>>>>>>>>>>>>>>>>           repartitioning
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> optional.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I am also in a big
> >> favour of
> >> > > > doing
> >> > > > > > the
> >> > > > > > >>>>> out
> >> > > > > > >>>>>>> of
> >> > > > > > >>>>>>>>> order
> >> > > > > > >>>>>>>>>>>>>>>>>>>           processing using
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> group
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by instead high water
> >> mark
> >> > > > > tracking.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Just because unbounded
> >> > growth
> >> > > is
> >> > > > > > just
> >> > > > > > >>>>> scary
> >> > > > > > >>>>>>> + It
> >> > > > > > >>>>>>>>>>>> saves
> >> > > > > > >>>>>>>>>>>>>>>>>>> us
> >> > > > > > >>>>>>>>>>>>>>>>>>>           the header
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> stuff.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> I think the abstraction
> >> of
> >> > > > always
> >> > > > > > >>>>>>> repartitioning
> >> > > > > > >>>>>>>>>>>> back is
> >> > > > > > >>>>>>>>>>>>>>>>>>>           just not so
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> strong. Like the work
> has
> >> > been
> >> > > > > done
> >> > > > > > >>>>> before
> >> > > > > > >>>>>>> we
> >> > > > > > >>>>>>>>>>>> partition
> >> > > > > > >>>>>>>>>>>>>>>>>>>           back and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> grouping
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> by something else
> >> afterwards
> >> > > is
> >> > > > > > really
> >> > > > > > >>>>>>> common.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> On 05.09.2018 13:49,
> Adam
> >> > > > > Bellemare
> >> > > > > > >>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>> Hi Matthias
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thank you for your
> >> > feedback,
> >> > > I
> >> > > > do
> >> > > > > > >>>>>>> appreciate
> >> > > > > > >>>>>>>>> it!
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> While name spacing
> >> would be
> >> > > > > > possible,
> >> > > > > > >> it
> >> > > > > > >>>>>>> would
> >> > > > > > >>>>>>>>>>>> require
> >> > > > > > >>>>>>>>>>>>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > deserialize
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
> >> implies
> >> > a
> >> > > > > > runtime
> >> > > > > > >>>>>>> overhead.
> >> > > > > > >>>>>>>>> I
> >> > > > > > >>>>>>>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > no
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
> >> avoid
> >> > > the
> >> > > > > > >>>>> overhead.
> >> > > > > > >>>>>>> If
> >> > > > > > >>>>>>>>> this
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > problem in
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
> >> still
> >> > add
> >> > > > > name
> >> > > > > > >>>>> spacing
> >> > > > > > >>>>>>>>> later
> >> > > > > > >>>>>>>>>>>> on.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Agreed. I will go
> with
> >> > > using a
> >> > > > > > >> reserved
> >> > > > > > >>>>>>> string
> >> > > > > > >>>>>>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           document it.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> My main concern about
> >> the
> >> > > > design
> >> > > > > it
> >> > > > > > >> the
> >> > > > > > >>>>>>> type of
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > If
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> understood the
> proposal
> >> > > > > correctly,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> In your example, you
> >> have
> >> > > > table1
> >> > > > > > and
> >> > > > > > >>>>> table2
> >> > > > > > >>>>>>>>>>>> swapped.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           Here is how it
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> works
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> currently:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 1) table1 has the
> >> records
> >> > > that
> >> > > > > > contain
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>>>>> foreign key
> >> > > > > > >>>>>>>>>>>>>>>>>>>           within their
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> value.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 input stream:
> >> > > > > > <a,(fk=A,bar=1)>,
> >> > > > > > >>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> <c,(fk=B,bar=3)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table2 input stream:
> >> <A,X>,
> >> > > > <B,Y>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 2) A Value mapper is
> >> > required
> >> > > > to
> >> > > > > > >> extract
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>>>>> foreign
> >> > > > > > >>>>>>>>>>>>>>>>>>> key.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 foreign key
> >> mapper:
> >> > (
> >> > > > > value
> >> > > > > > =>
> >> > > > > > >>>>>>> value.fk
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <http://value.fk> )
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> The mapper is applied
> to
> >> > each
> >> > > > > > element
> >> > > > > > >> in
> >> > > > > > >>>>>>>>> table1,
> >> > > > > > >>>>>>>>>>>> and a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           new combined
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> key is
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> made:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> table1 mapped: <A-a,
> >> > > > > (fk=A,bar=1)>,
> >> > > > > > >>>>> <A-b,
> >> > > > > > >>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           <B-c,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> (fk=B,bar=3)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 3) The rekeyed events
> >> are
> >> > > > > > >> copartitioned
> >> > > > > > >>>>>>> with
> >> > > > > > >>>>>>>>>>>> table2:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> a) Stream Thread with
> >> > > Partition
> >> > > > > 0:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
> >> <A-a,
> >> > > > > > >>>>> (fk=A,bar=1)>,
> >> > > > > > >>>>>>> <A-b,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           (fk=A,bar=2)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <A,X>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> b) Stream Thread with
> >> > > Partition
> >> > > > > 1:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> RepartitionedTable1:
> >> <B-c,
> >> > > > > > >> (fk=B,bar=3)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Table2: <B,Y>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> 4) From here, they can
> >> be
> >> > > > joined
> >> > > > > > >>>>> together
> >> > > > > > >>>>>>>>> locally
> >> > > > > > >>>>>>>>>>>> by
> >> > > > > > >>>>>>>>>>>>>>>>>>>           applying the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> joiner
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> At this point, Jan's
> >> design
> >> > > and
> >> > > > > my
> >> > > > > > >>>>> design
> >> > > > > > >>>>>>>>>>>> deviate. My
> >> > > > > > >>>>>>>>>>>>>>>>>>>           design goes
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > on
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> repartition the data
> >> > > post-join
> >> > > > > and
> >> > > > > > >>>>> resolve
> >> > > > > > >>>>>>>>>>>> out-of-order
> >> > > > > > >>>>>>>>>>>>>>>>>>>           arrival of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> records,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> finally returning the
> >> data
> >> > > > keyed
> >> > > > > > just
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>>>>>> original key.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           I do not
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> expose
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> CombinedKey or any of
> >> the
> >> > > > > internals
> >> > > > > > >>>>>>> outside of
> >> > > > > > >>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           joinOnForeignKey
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> function. This does
> make
> >> > for
> >> > > > > larger
> >> > > > > > >>>>>>> footprint,
> >> > > > > > >>>>>>>>>>>> but it
> >> > > > > > >>>>>>>>>>>>>>>>>>>           removes all
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> agency
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> for resolving
> >> out-of-order
> >> > > > > arrivals
> >> > > > > > >> and
> >> > > > > > >>>>>>>>> handling
> >> > > > > > >>>>>>>>>>>>>>>>>>>           CombinedKeys from
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> user. I believe that
> >> this
> >> > > makes
> >> > > > > the
> >> > > > > > >>>>>>> function
> >> > > > > > >>>>>>>>> much
> >> > > > > > >>>>>>>>>>>>>>>>>>> easier
> >> > > > > > >>>>>>>>>>>>>>>>>>>           to use.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Let me know if this
> >> helps
> >> > > > resolve
> >> > > > > > your
> >> > > > > > >>>>>>>>> questions,
> >> > > > > > >>>>>>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           please feel
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> free to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> add anything else on
> >> your
> >> > > mind.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Thanks again,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Adam
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> On Tue, Sep 4, 2018 at
> >> 8:36
> >> > > PM,
> >> > > > > > >>>>> Matthias J.
> >> > > > > > >>>>>>>>> Sax <
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> matth...@confluent.io
> >> > > <mailto:
> >> > > > > > >>>>>>>>>>>> matth...@confluent.io>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>> Hi,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I am just catching up
> >> on
> >> > > this
> >> > > > > > >> thread. I
> >> > > > > > >>>>>>> did
> >> > > > > > >>>>>>>>> not
> >> > > > > > >>>>>>>>>>>> read
> >> > > > > > >>>>>>>>>>>>>>>>>>>           everything so
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> far,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> but want to share
> >> couple
> >> > of
> >> > > > > > initial
> >> > > > > > >>>>>>> thoughts:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Headers: I think
> there
> >> is
> >> > a
> >> > > > > > >> fundamental
> >> > > > > > >>>>>>>>>>>> difference
> >> > > > > > >>>>>>>>>>>>>>>>>>>           between header
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> usage
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> in this KIP and
> KP-258.
> >> > For
> >> > > > 258,
> >> > > > > > we
> >> > > > > > >> add
> >> > > > > > >>>>>>>>> headers
> >> > > > > > >>>>>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are owned by Kafka
> >> Streams
> >> > > and
> >> > > > > > nobody
> >> > > > > > >>>>>>> else is
> >> > > > > > >>>>>>>>>>>> supposed
> >> > > > > > >>>>>>>>>>>>>>>>>>>           to write
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > into
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> them. In fact, no
> user
> >> > > header
> >> > > > > are
> >> > > > > > >>>>> written
> >> > > > > > >>>>>>> into
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           changelog topic
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> thus, there are not
> >> > > conflicts.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Nevertheless, I don't
> >> see
> >> > a
> >> > > > big
> >> > > > > > issue
> >> > > > > > >>>>> with
> >> > > > > > >>>>>>>>> using
> >> > > > > > >>>>>>>>>>>>>>>>>>>           headers within
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Streams.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> As long as we
> document
> >> it,
> >> > > we
> >> > > > > can
> >> > > > > > >> have
> >> > > > > > >>>>>>> some
> >> > > > > > >>>>>>>>>>>> "reserved"
> >> > > > > > >>>>>>>>>>>>>>>>>>>           header keys
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> users are not allowed
> >> to
> >> > use
> >> > > > > when
> >> > > > > > >>>>>>> processing
> >> > > > > > >>>>>>>>>>>> data with
> >> > > > > > >>>>>>>>>>>>>>>>>>>           Kafka
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > Streams.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this should be
> >> ok.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> I think there is a
> safe
> >> > way
> >> > > to
> >> > > > > > avoid
> >> > > > > > >>>>>>>>> conflicts,
> >> > > > > > >>>>>>>>>>>> since
> >> > > > > > >>>>>>>>>>>>>>>>>>> these
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > headers
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> are
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> only needed in
> >> internal
> >> > > > topics
> >> > > > > (I
> >> > > > > > >>>>> think):
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> For internal and
> >> > changelog
> >> > > > > > topics,
> >> > > > > > >> we
> >> > > > > > >>>>> can
> >> > > > > > >>>>>>>>>>>> namespace
> >> > > > > > >>>>>>>>>>>>>>>>>>>           all headers:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * user-defined
> headers
> >> > are
> >> > > > > > >> namespaced
> >> > > > > > >>>>> as
> >> > > > > > >>>>>>>>>>>> "external."
> >> > > > > > >>>>>>>>>>>>>>>>>>> +
> >> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> * internal headers
> are
> >> > > > > > namespaced as
> >> > > > > > >>>>>>>>>>>> "internal." +
> >> > > > > > >>>>>>>>>>>>>>>>>>>           headerKey
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> While name spacing
> >> would
> >> > be
> >> > > > > > >> possible,
> >> > > > > > >>>>> it
> >> > > > > > >>>>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>>>> require
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>> deserialize
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> user headers what
> >> implies
> >> > a
> >> > > > > > runtime
> >> > > > > > >>>>>>> overhead.
> >> > > > > > >>>>>>>>> I
> >> > > > > > >>>>>>>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>>>>           suggest to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > no
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> namespace for now to
> >> avoid
> >> > > the
> >> > > > > > >>>>> overhead.
> >> > > > > > >>>>>>> If
> >> > > > > > >>>>>>>>> this
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> becomes a
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > problem in
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the future, we can
> >> still
> >> > add
> >> > > > > name
> >> > > > > > >>>>> spacing
> >> > > > > > >>>>>>>>> later
> >> > > > > > >>>>>>>>>>>> on.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> My main concern about
> >> the
> >> > > > design
> >> > > > > > it
> >> > > > > > >> the
> >> > > > > > >>>>>>> type
> >> > > > > > >>>>>>>>> of
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           result KTable:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> If I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> understood the
> proposal
> >> > > > > correctly,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V1> table1
> =
> >> ...
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K2,V2> table2
> =
> >> ...
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> KTable<K1,V3>
> >> joinedTable
> >> > =
> >> > > > > > >>>>>>>>>>>> table1.join(table2,...);
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> implies that the
> >> > > `joinedTable`
> >> > > > > has
> >> > > > > > >> the
> >> > > > > > >>>>>>> same
> >> > > > > > >>>>>>>>> key
> >> > > > > > >>>>>>>>>>>> as the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           left input
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> IMHO, this does not
> >> work
> >> > > > because
> >> > > > > > if
> >> > > > > > >>>>> table2
> >> > > > > > >>>>>>>>>>>> contains
> >> > > > > > >>>>>>>>>>>>>>>>>>>           multiple rows
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> that
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join with a record in
> >> > table1
> >> > > > > > (what is
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>> main
> >> > > > > > >>>>>>>>>>>> purpose
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > foreign
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> key
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> join), the result
> table
> >> > > would
> >> > > > > only
> >> > > > > > >>>>>>> contain a
> >> > > > > > >>>>>>>>>>>> single
> >> > > > > > >>>>>>>>>>>>>>>>>>>           join result,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > but
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> not
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> multiple.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Example:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table1 input stream:
> >> <A,X>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> table2 input stream:
> >> > > > <a,(A,1)>,
> >> > > > > > >>>>> <b,(A,2)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> We use table2 value a
> >> > > foreign
> >> > > > > key
> >> > > > > > to
> >> > > > > > >>>>>>> table1
> >> > > > > > >>>>>>>>> key
> >> > > > > > >>>>>>>>>>>> (ie,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           "A" joins).
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > If
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result key is the
> same
> >> key
> >> > > as
> >> > > > > key
> >> > > > > > of
> >> > > > > > >>>>>>> table1,
> >> > > > > > >>>>>>>>> this
> >> > > > > > >>>>>>>>>>>>>>>>>>>           implies that the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> result can either be
> >> <A,
> >> > > > > > join(X,1)>
> >> > > > > > >> or
> >> > > > > > >>>>> <A,
> >> > > > > > >>>>>>>>>>>> join(X,2)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           but not
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > both.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Because the share the
> >> same
> >> > > > key,
> >> > > > > > >>>>> whatever
> >> > > > > > >>>>>>>>> result
> >> > > > > > >>>>>>>>>>>> record
> >> > > > > > >>>>>>>>>>>>>>>>>>>           we emit
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > later,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> overwrite the
> previous
> >> > > result.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This is the reason
> why
> >> Jan
> >> > > > > > originally
> >> > > > > > >>>>>>> proposed
> >> > > > > > >>>>>>>>>>>> to use
> >> > > > > > >>>>>>>>>>>>>>>>>>> a
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > combination
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> both primary keys of
> >> the
> >> > > input
> >> > > > > > tables
> >> > > > > > >>>>> as
> >> > > > > > >>>>>>> key
> >> > > > > > >>>>>>>>> of
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           output table.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> This
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> makes the keys of the
> >> > output
> >> > > > > table
> >> > > > > > >>>>> unique
> >> > > > > > >>>>>>> and
> >> > > > > > >>>>>>>>> we
> >> > > > > > >>>>>>>>>>>> can
> >> > > > > > >>>>>>>>>>>>>>>>>>>           store both in
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> output table:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Result would be <A-a,
> >> > > > > join(X,1)>,
> >> > > > > > >> <A-b,
> >> > > > > > >>>>>>>>>>>> join(X,2)>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Thoughts?
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> -Matthias
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> On 9/4/18 1:36 PM,
> Jan
> >> > > > Filipiak
> >> > > > > > >> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>> Just on remark here.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> The high-watermark
> >> could
> >> > be
> >> > > > > > >>>>> disregarded.
> >> > > > > > >>>>>>> The
> >> > > > > > >>>>>>>>>>>> decision
> >> > > > > > >>>>>>>>>>>>>>>>>>>           about the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> forward
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> depends on the size
> of
> >> > the
> >> > > > > > >> aggregated
> >> > > > > > >>>>>>> map.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> Only 1 element long
> >> maps
> >> > > > would
> >> > > > > be
> >> > > > > > >>>>>>> unpacked
> >> > > > > > >>>>>>>>> and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           forwarded. 0
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > element
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> maps
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> would be published
> as
> >> > > delete.
> >> > > > > Any
> >> > > > > > >>>>> other
> >> > > > > > >>>>>>> count
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of map entries is in
> >> > > "waiting
> >> > > > > for
> >> > > > > > >>>>> correct
> >> > > > > > >>>>>>>>>>>> deletes to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > arrive"-state.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> On 04.09.2018 21:29,
> >> Adam
> >> > > > > > Bellemare
> >> > > > > > >>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> It does look like I
> >> could
> >> > > > > replace
> >> > > > > > >> the
> >> > > > > > >>>>>>> second
> >> > > > > > >>>>>>>>>>>>>>>>>>>           repartition store
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > and
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> highwater store
> with
> >> a
> >> > > > groupBy
> >> > > > > > and
> >> > > > > > >>>>>>> reduce.
> >> > > > > > >>>>>>>>>>>> However,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           it looks
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > like
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> would
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> still need to store
> >> the
> >> > > > > > highwater
> >> > > > > > >>>>> value
> >> > > > > > >>>>>>>>> within
> >> > > > > > >>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           materialized
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> store,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>> to
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> compare the arrival
> of
> >> > > > > > out-of-order
> >> > > > > > >>>>>>> records
> >> > > > > > >>>>>>>>>>>> (assuming
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>> my
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>           > >>>>>>>>> understanding
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> of
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> THIS is correct...).
> >> This
> >> > > in
> >> > > > > > effect
> >> > > > > > >> is
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>> same
> >> > > > > > >>>>>>>>>>>> as
> >> > > > > > >>>>>>>>>>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>>>>>>>>           design I
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > have
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> now,
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>> just with the two
> >> tables
> >> > > > merged
> >> > > > > > >>>>> together.
> >> > > > > > >>>>>>>>>>>>>>>>>>>           > >>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
> >> > > > > > >>>>>>>>>>>>>>>>>>>           >
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>> --
> >> > > > > > >>>>>>>>>>>>>>>>> -- Guozhang
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>> --
> >> > > > > > >>>>>>>>>>>>>>>> -- Guozhang
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
>
> --
> -- Guozhang
>

Reply via email to