Jan

I have been thinking a lot about the history of the discussion and your
original proposal, and why you believe it is a better solution. The biggest
problem with your original proposed design is that it seems to me to be
non-deterministic. It is subject to race conditions that are dependent
entirely on the data, and without resolution of these races you can end up
with different results each time. If I am mistaken and this is indeed
deterministic, then please let me know and provide an explanation, ideally
with an example.

The way I see it is that you will get very different answers to your
non-race-condition-resolved join topology, especially if you are nesting it
with additional joins as you have indicated you are doing. Consider
rebuilding an application state from the beginning of two topics. If the
left/this side has multiple foreign-key changes in a row, spaced out every
ten minutes, you may see something like this:

(foo, foreignKey=red) t=0
(foo, foreignKey=blue) t=0+10m
(foo, foreignKey=green) t=0+20m
(foo, foreignKey=purple) t=0+30m
(foo, foreignKey=blue) t=0+40m
(foo, foreignKey=white) t=0+50m

During realtime processing, all of the updates may have correctly
propagated because it took less than 10 minutes to resolve each join. Upon
rebuilding from the start, however, all of these events would be processed
in quick succession. The presence or absence of data will affect the
results of your join, and the results can vary with each run depending on
the data. Because of this, I cannot support any kind of solution that would
allow the exposure of an unresolved intermediate state. I can understand if
you don't support this, but this is why, as you said, you have the freedom
to use the Processor API.


With that being said, either the solution that I originally proposed
(join's ocurring on the foreign node) or John + Guozhang's solution
(registering with the foreign node for notifications) is fine with me -
both have the same API and we can evaluate it further during implementation.


Thanks

Adam

On Thu, Dec 27, 2018 at 2:38 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi,
>
> just want to let you guys know that this thing is spiralling out of
> control if you ask me.
>
> First you take away the possibility for the user to optimize. Now you
> pile up complexity to perform some afterwards optimisation, that from my
> POV completely misses the point. As if the actual call to the joiner
> really gonna be an expensive part. It wont. Truth is, you don't have a
> clue which side is gonna be smaller. might be the key you shuffle around
> is >>> than the value on the other side already.
>
> You know my opinion on this. For me its dead, I just leave you the
> message here as an opportunity to reconsider the choices that were made.
>
> Whish y'll a happy new year :)
>
>
>
>
>
>
> On 27.12.2018 17:22, Adam Bellemare wrote:
> > Hi All
> >
> > Sorry for the delay - holidays and all. I have since updated the KIP with
> > John's original suggestion and have pruned a number of the no longer
> > relevant diagrams. Any more comments would be welcomed, otherwise I will
> > look to kick off the vote again shortly.
> >
> > Thanks
> > Adam
> >
> > On Mon, Dec 17, 2018 at 7:06 PM Adam Bellemare <adam.bellem...@gmail.com
> >
> > wrote:
> >
> >> Hi John and Guozhang
> >>
> >> Ah yes, I lost that in the mix! Thanks for the convergent solutions - I
> do
> >> think that the attachment that John included makes for a better design.
> It
> >> should also help with overall performance as very high-cardinality
> foreign
> >> keyed data (say millions of events with the same entity) will be able to
> >> leverage the multiple nodes for join functionality instead of having it
> all
> >> performed in one node. There is still a bottleneck in the right table
> >> having to propagate all those events, but with slimmer structures, less
> IO
> >> and no need to perform the join I think the throughput will be much
> higher
> >> in those scenarios.
> >>
> >> Okay, I am convinced. I will update the KIP accordingly to a Gliffy
> >> version of John's diagram and ensure that the example flow matches
> >> correctly. Then I can go back to working on the PR to match the diagram.
> >>
> >> Thanks both of you for all the help - very much appreciated.
> >>
> >> Adam
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Dec 17, 2018 at 6:39 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Just made a pass on your diagram (nice hand-drawing btw!), and
> obviously
> >>> we
> >>> are thinking about the same thing :) A neat difference that I like, is
> >>> that
> >>> in the pre-join repartition topic we can still send message in the
> format
> >>> of `K=k, V=(i=2)` while using "i" as the partition key in
> >>> StreamsPartition,
> >>> this way we do not need to even augment the key for the repartition
> topic,
> >>> but just do a projection on the foreign key part but trim all other
> >>> fields:
> >>> as long as we still materialize the store as `A-2` co-located with the
> >>> right KTable, that is fine.
> >>>
> >>> As I mentioned in my previous email, I also think this has a few
> >>> advantages
> >>> on saving over-the-wire bytes as well as disk bytes.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Mon, Dec 17, 2018 at 3:17 PM John Roesler <j...@confluent.io>
> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> Thanks for taking a look! I think Adam's already addressed your
> >>> questions
> >>>> as well as I could have.
> >>>>
> >>>> Hi Adam,
> >>>>
> >>>> Thanks for updating the KIP. It looks great, especially how all the
> >>>> need-to-know information is right at the top, followed by the details.
> >>>>
> >>>> Also, thanks for that high-level diagram. Actually, now that I'm
> looking
> >>>> at it, I think part of my proposal got lost in translation, although I
> >>> do
> >>>> think that what you have there is also correct.
> >>>>
> >>>> I sketched up a crude diagram based on yours and attached it to the
> KIP
> >>>> (I'm not sure if attached or inline images work on the mailing list):
> >>>>
> >>>
> https://cwiki.apache.org/confluence/download/attachments/74684836/suggestion.png
> >>>> . It's also attached to this email for convenience.
> >>>>
> >>>> Hopefully, you can see how it's intended to line up, and which parts
> are
> >>>> modified.
> >>>> At a high level, instead of performing the join on the right-hand
> side,
> >>>> we're essentially just registering interest, like "LHS key A wishes to
> >>>> receive updates for RHS key 2". Then, when there is a new "interest"
> or
> >>> any
> >>>> updates to the RHS records, it "broadcasts" its state back to the LHS
> >>>> records who are interested in it.
> >>>>
> >>>> Thus, instead of sending the LHS values to the RHS joiner workers and
> >>> then
> >>>> sending the join results back to the LHS worke be co-partitioned and
> >>>> validated, we instead only send the LHS *keys* to the RHS workers and
> >>> then
> >>>> only the RHS k/v back to be joined by the LHS worker.
> >>>>
> >>>> I've been considering both your diagram and mine, and I *think* what
> I'm
> >>>> proposing has a few advantages.
> >>>>
> >>>> Here are some points of interest as you look at the diagram:
> >>>> * When we extract the foreign key and send it to the Pre-Join
> >>> Repartition
> >>>> Topic, we can send only the FK/PK pair. There's no need to worry about
> >>>> custom partitioner logic, since we can just use the foreign key
> plainly
> >>> as
> >>>> the repartition record key. Also, we save on transmitting the LHS
> value,
> >>>> since we only send its key in this step.
> >>>> * We also only need to store the RHSKey:LHSKey mapping in the
> >>>> MaterializedSubscriptionStore, saving on disk. We can use the same
> rocks
> >>>> key format you proposed and the same algorithm involving range scans
> >>> when
> >>>> the RHS records get updated.
> >>>> * Instead of joining on the right side, all we do is compose a
> >>>> re-repartition record so we can broadcast the RHS k/v pair back to the
> >>>> original LHS partition. (this is what the "rekey" node is doing)
> >>>> * Then, there is a special kind of Joiner that's co-resident in the
> same
> >>>> StreamTask as the LHS table, subscribed to the Post-Join Repartition
> >>> Topic.
> >>>> ** This Joiner is *not* triggered directly by any changes in the LHS
> >>>> KTable. Instead, LHS events indirectly trigger the join via the whole
> >>>> lifecycle.
> >>>> ** For each event arriving from the Post-Join Repartition Topic, the
> >>>> Joiner looks up the corresponding record in the LHS KTable. It
> validates
> >>>> the FK as you noted, discarding any inconsistent events. Otherwise, it
> >>>> unpacks the RHS K/V pair and invokes the ValueJoiner to obtain the
> join
> >>>> result
> >>>> ** Note that the Joiner itself is stateless, so materializing the join
> >>>> result is optional, just as with the 1:1 joins.
> >>>>
> >>>> So in summary:
> >>>> * instead of transmitting the LHS keys and values to the right and the
> >>>> JoinResult back to the left, we only transmit the LHS keys to the
> right
> >>> and
> >>>> the RHS values to the left. Assuming the average RHS value is on
> smaller
> >>>> than or equal to the average join result size, it's a clear win on
> >>> broker
> >>>> traffic. I think this is actually a reasonable assumption, which we
> can
> >>>> discuss more if you're suspicious.
> >>>> * we only need one copy of the data (the left and right tables need to
> >>> be
> >>>> materialized) and one extra copy of the PK:FK pairs in the
> Materialized
> >>>> Subscription Store. Materializing the join result is optional, just as
> >>> with
> >>>> the existing 1:1 joins.
> >>>> * we still need the fancy range-scan algorithm on the right to locate
> >>> all
> >>>> interested LHS keys when a RHS value is updated, but we don't need a
> >>> custom
> >>>> partitioner for either repartition topic (this is of course a
> >>> modification
> >>>> we could make to your version as well)
> >>>>
> >>>> How does this sound to you? (And did I miss anything?)
> >>>> -John
> >>>>
> >>>> On Mon, Dec 17, 2018 at 9:00 AM Adam Bellemare <
> >>> adam.bellem...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi John & Guozhang
> >>>>>
> >>>>> @John & @Guozhang Wang <wangg...@gmail.com> - I have cleaned up the
> >>> KIP,
> >>>>> pruned much of what I wrote and put a simplified diagram near the top
> >>> to
> >>>>> illustrate the workflow. I encapsulated Jan's content at the bottom
> of
> >>> the
> >>>>> document. I believe it is simpler to read by far now.
> >>>>>
> >>>>> @Guozhang Wang <wangg...@gmail.com>:
> >>>>>> #1: rekey left table
> >>>>>>    -> source from the left upstream, send to rekey-processor to
> >>> generate
> >>>>> combined key, and then sink to copartition topic.
> >>>>> Correct.
> >>>>>
> >>>>>> #2: first-join with right table
> >>>>>>    -> source from the right table upstream, materialize the right
> >>> table.
> >>>>>>    -> source from the co-partition topic, materialize the rekeyed
> left
> >>>>> table, join with the right table, rekey back, and then sink to the
> >>>>> rekeyed-back topic.
> >>>>> Almost - I cleared up the KIP. We do not rekey back yet, as I need
> the
> >>>>> Foreign-Key value generated in #1 above to compare in the resolution
> >>>>> stage.
> >>>>>
> >>>>>> #3: second join
> >>>>>>     -> source from the rekeyed-back topic, materialize the rekeyed
> >>> back
> >>>>> table.
> >>>>>>    -> source from the left upstream, materialize the left table,
> join
> >>>>> with
> >>>>> the rekeyed back table.
> >>>>> Almost - As each event comes in, we just run it through a stateful
> >>>>> processor that checks the original ("This") KTable for the key. The
> >>> value
> >>>>> payload then has the foreignKeyExtractor applied again as in Part #1
> >>>>> above,
> >>>>> and gets the current foreign key. Then we compare it to the joined
> >>> event
> >>>>> that we are currently resolving. If they have the same foreign-key,
> >>>>> propagate the result out. If they don't, throw the event away.
> >>>>>
> >>>>> The end result is that we do need to materialize 2 additional tables
> >>>>> (left/this-combinedkey table, and the final Joined table) as I've
> >>>>> illustrated in the updated KIP. I hope the diagram clears it up a lot
> >>>>> better. Please let me know.
> >>>>>
> >>>>> Thanks again
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, Dec 16, 2018 at 3:18 PM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> John,
> >>>>>>
> >>>>>> Thanks a lot for the suggestions on refactoring the wiki, I agree
> >>> with
> >>>>> you
> >>>>>> that we should consider the KIP proposal to be easily understood by
> >>>>> anyone
> >>>>>> in the future to read, and hence should provide a good summary on
> the
> >>>>>> user-facing interfaces, as well as rejected alternatives to
> represent
> >>>>>> briefly "how we came a long way to this conclusion, and what we have
> >>>>>> argued, disagreed, and agreed about, etc" so that readers do not
> >>> need to
> >>>>>> dig into the DISCUSS thread to get all the details. We can, of
> >>> course,
> >>>>> keep
> >>>>>> the implementation details like "workflows" on the wiki page as a
> >>>>> addendum
> >>>>>> section since it also has correlations.
> >>>>>>
> >>>>>> Regarding your proposal on comment 6): that's a very interesting
> >>> idea!
> >>>>> Just
> >>>>>> to clarify that I understands it fully correctly: the proposal's
> >>>>> resulted
> >>>>>> topology is still the same as the current proposal, where we will
> >>> have 3
> >>>>>> sub-topologies for this operator:
> >>>>>>
> >>>>>> #1: rekey left table
> >>>>>>     -> source from the left upstream, send to rekey-processor to
> >>> generate
> >>>>>> combined key, and then sink to copartition topic.
> >>>>>>
> >>>>>> #2: first-join with right table
> >>>>>>     -> source from the right table upstream, materialize the right
> >>> table.
> >>>>>>     -> source from the co-partition topic, materialize the rekeyed
> >>> left
> >>>>>> table, join with the right table, rekey back, and then sink to the
> >>>>>> rekeyed-back topic.
> >>>>>>
> >>>>>> #3: second join
> >>>>>>     -> source from the rekeyed-back topic, materialize the rekeyed
> >>> back
> >>>>>> table.
> >>>>>>     -> source from the left upstream, materialize the left table,
> join
> >>>>> with
> >>>>>> the rekeyed back table.
> >>>>>>
> >>>>>> Sub-topology #1 and #3 may be merged to a single sub-topology since
> >>>>> both of
> >>>>>> them read from the left table source stream. In this workflow, we
> >>> need
> >>>>> to
> >>>>>> materialize 4 tables (left table in #3, right table in #2, rekeyed
> >>> left
> >>>>>> table in #2, rekeyed-back table in #3), and 2 repartition topics
> >>>>>> (copartition topic, rekeyed-back topic).
> >>>>>>
> >>>>>> Compared with Adam's current proposal in the workflow overview, it
> >>> has
> >>>>> the
> >>>>>> same num.materialize tables (left table, rekeyed left table, right
> >>>>> table,
> >>>>>> out-of-ordering resolver table), and same num.internal topics (two).
> >>> The
> >>>>>> advantage is that on the copartition topic, we can save bandwidth by
> >>> not
> >>>>>> sending value, and in #2 the rekeyed left table is smaller since we
> >>> do
> >>>>> not
> >>>>>> have any values to materialize. Is that right?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Dec 12, 2018 at 1:22 PM John Roesler <j...@confluent.io>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Adam,
> >>>>>>>
> >>>>>>> Given that the committers are all pretty busy right now, I think
> >>> that
> >>>>> it
> >>>>>>> would help if you were to refactor the KIP a little to reduce the
> >>>>>> workload
> >>>>>>> for reviewers.
> >>>>>>>
> >>>>>>> I'd recommend the following changes:
> >>>>>>> * relocate all internal details to a section at the end called
> >>>>> something
> >>>>>>> like "Implementation Notes" or something like that.
> >>>>>>> * rewrite the rest of the KIP to be a succinct as possible and
> >>> mention
> >>>>>> only
> >>>>>>> publicly-facing API changes.
> >>>>>>> ** for example, the interface that you've already listed there, as
> >>>>> well
> >>>>>> as
> >>>>>>> a textual description of the guarantees we'll be providing (join
> >>>>> result
> >>>>>> is
> >>>>>>> copartitioned with the LHS, and the join result is guaranteed
> >>> correct)
> >>>>>>>
> >>>>>>> A good target would be that the whole main body of the KIP,
> >>> including
> >>>>>>> Status, Motivation, Proposal, Justification, and Rejected
> >>> Alternatives
> >>>>>> all
> >>>>>>> fit "above the fold" (i.e., all fit on the screen at a comfortable
> >>>>> zoom
> >>>>>>> level).
> >>>>>>> I think the only real Rejected Alternative that bears mention at
> >>> this
> >>>>>> point
> >>>>>>> is KScatteredTable, which you could just include the executive
> >>>>> summary on
> >>>>>>> (no implementation details), and link to extra details in the
> >>>>>>> Implementation Notes section.
> >>>>>>>
> >>>>>>> Taking a look at the wiki page, ~90% of the text there is internal
> >>>>>> detail,
> >>>>>>> which is useful for the dubious, but doesn't need to be ratified
> >>> in a
> >>>>>> vote
> >>>>>>> (and would be subject to change without notice in the future
> >>> anyway).
> >>>>>>> There's also a lot of conflicting discussion, as you've very
> >>>>> respectfully
> >>>>>>> tried to preserve the original proposal from Jan while adding your
> >>>>> own.
> >>>>>>> Isolating all this information in a dedicated section at the bottom
> >>>>> frees
> >>>>>>> the voters up to focus on the public API part of the proposal,
> >>> which
> >>>>> is
> >>>>>>> really all they need to consider.
> >>>>>>>
> >>>>>>> Plus, it'll be clear to future readers which parts of the document
> >>> are
> >>>>>>> enduring, and which parts are a snapshot of our implementation
> >>>>> thinking
> >>>>>> at
> >>>>>>> the time.
> >>>>>>>
> >>>>>>> I'm suggesting this because I suspect that the others haven't made
> >>>>> time
> >>>>>> to
> >>>>>>> review it partly because it seems daunting. If it seems like it
> >>> would
> >>>>> be
> >>>>>> a
> >>>>>>> huge time investment to review, people will just keep putting it
> >>> off.
> >>>>> But
> >>>>>>> if the KIP is a single page, then they'll be more inclined to give
> >>> it
> >>>>> a
> >>>>>>> read.
> >>>>>>>
> >>>>>>> Honestly, I don't think the KIP itself is that controversial (apart
> >>>>> from
> >>>>>>> the scattered table thing (sorry, Jan) ). Most of the discussion
> >>> has
> >>>>> been
> >>>>>>> around the implementation, which we can continue more effectively
> >>> in
> >>>>> a PR
> >>>>>>> once the KIP has passed.
> >>>>>>>
> >>>>>>> How does that sound?
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Mon, Dec 10, 2018 at 3:54 PM Adam Bellemare <
> >>>>> adam.bellem...@gmail.com
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> 1) I believe that the resolution mechanism John has proposed is
> >>>>>>> sufficient
> >>>>>>>> - it is clean and easy and doesn't require additional RocksDB
> >>>>> stores,
> >>>>>>> which
> >>>>>>>> reduces the footprint greatly. I don't think we need to resolve
> >>>>> based
> >>>>>> on
> >>>>>>>> timestamp or offset anymore, but if we decide to do to that
> >>> would be
> >>>>>>> within
> >>>>>>>> the bounds of the existing API.
> >>>>>>>>
> >>>>>>>> 2) Is the current API sufficient, or does it need to be altered
> >>> to
> >>>>> go
> >>>>>>> back
> >>>>>>>> to vote?
> >>>>>>>>
> >>>>>>>> 3) KScatteredTable implementation can always be added in a future
> >>>>>>> revision.
> >>>>>>>> This API does not rule it out. This implementation of this
> >>> function
> >>>>>> would
> >>>>>>>> simply be replaced with `KScatteredTable.resolve()` while still
> >>>>>>> maintaining
> >>>>>>>> the existing API, thereby giving both features as Jan outlined
> >>>>> earlier.
> >>>>>>>> Would this work?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks Guozhang, John and Jan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Dec 10, 2018 at 10:39 AM John Roesler <j...@confluent.io
> >>>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi, all,
> >>>>>>>>>
> >>>>>>>>>>> In fact, we
> >>>>>>>>>>> can just keep a single final-result store with timestamps
> >>> and
> >>>>>> reject
> >>>>>>>>> values
> >>>>>>>>>>> that have a smaller timestamp, is that right?
> >>>>>>>>>
> >>>>>>>>>> Which is the correct output should at least be decided on the
> >>>>>> offset
> >>>>>>> of
> >>>>>>>>>> the original message.
> >>>>>>>>>
> >>>>>>>>> Thanks for this point, Jan.
> >>>>>>>>>
> >>>>>>>>> KIP-258 is merely to allow embedding the record timestamp  in
> >>> the
> >>>>> k/v
> >>>>>>>>> store,
> >>>>>>>>> as well as providing a storage-format upgrade path.
> >>>>>>>>>
> >>>>>>>>> I might have missed it, but I think we have yet to discuss
> >>> whether
> >>>>>> it's
> >>>>>>>>> safe
> >>>>>>>>> or desirable just to swap topic-ordering our for
> >>>>> timestamp-ordering.
> >>>>>>> This
> >>>>>>>>> is
> >>>>>>>>> a very deep topic, and I think it would only pollute the
> >>> current
> >>>>>>>>> discussion.
> >>>>>>>>>
> >>>>>>>>> What Adam has proposed is safe, given the *current* ordering
> >>>>>> semantics
> >>>>>>>>> of the system. If we can agree on his proposal, I think we can
> >>>>> merge
> >>>>>>> the
> >>>>>>>>> feature well before the conversation about timestamp ordering
> >>> even
> >>>>>>> takes
> >>>>>>>>> place, much less reaches a conclusion. In the mean time, it
> >>> would
> >>>>>> seem
> >>>>>>> to
> >>>>>>>>> be unfortunate to have one join operator with different
> >>> ordering
> >>>>>>>> semantics
> >>>>>>>>> from every other KTable operator.
> >>>>>>>>>
> >>>>>>>>> If and when that timestamp discussion takes place, many (all?)
> >>>>> KTable
> >>>>>>>>> operations
> >>>>>>>>> will need to be updated, rendering the many:one join a small
> >>>>> marginal
> >>>>>>>> cost.
> >>>>>>>>>
> >>>>>>>>> And, just to plug it again, I proposed an algorithm above that
> >>> I
> >>>>>>> believe
> >>>>>>>>> provides
> >>>>>>>>> correct ordering without any additional metadata, and
> >>> regardless
> >>>>> of
> >>>>>> the
> >>>>>>>>> ordering semantics. I didn't bring it up further, because I
> >>> felt
> >>>>> the
> >>>>>>> KIP
> >>>>>>>>> only needs
> >>>>>>>>> to agree on the public API, and we can discuss the
> >>> implementation
> >>>>> at
> >>>>>>>>> leisure in
> >>>>>>>>> a PR...
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Dec 10, 2018 at 2:28 AM Jan Filipiak <
> >>>>>> jan.filip...@trivago.com
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 10.12.2018 07:42, Guozhang Wang wrote:
> >>>>>>>>>>> Hello Adam / Jan / John,
> >>>>>>>>>>>
> >>>>>>>>>>> Sorry for being late on this thread! I've finally got some
> >>>>> time
> >>>>>>> this
> >>>>>>>>>>> weekend to cleanup a load of tasks on my queue (actually
> >>> I've
> >>>>>> also
> >>>>>>>>>> realized
> >>>>>>>>>>> there are a bunch of other things I need to enqueue while
> >>>>>> cleaning
> >>>>>>>> them
> >>>>>>>>>> up
> >>>>>>>>>>> --- sth I need to improve on my side). So here are my
> >>>>> thoughts:
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding the APIs: I like the current written API in the
> >>> KIP.
> >>>>>> More
> >>>>>>>>>>> generally I'd prefer to keep the 1) one-to-many join
> >>>>>>> functionalities
> >>>>>>>> as
> >>>>>>>>>>> well as 2) other join types than inner as separate KIPs
> >>> since
> >>>>> 1)
> >>>>>>> may
> >>>>>>>>>> worth
> >>>>>>>>>>> a general API refactoring that can benefit not only
> >>> foreignkey
> >>>>>>> joins
> >>>>>>>>> but
> >>>>>>>>>>> collocate joins as well (e.g. an extended proposal of
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> >>>>>>>>>> ),
> >>>>>>>>>>> and I'm not sure if other join types would actually be
> >>> needed
> >>>>>>> (maybe
> >>>>>>>>> left
> >>>>>>>>>>> join still makes sense), so it's better to
> >>>>>>>>> wait-for-people-to-ask-and-add
> >>>>>>>>>>> than add-sth-that-no-one-uses.
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding whether we enforce step 3) / 4) v.s. introducing
> >>> a
> >>>>>>>>>>> KScatteredTable for users to inject their own optimization:
> >>>>> I'd
> >>>>>>>> prefer
> >>>>>>>>> to
> >>>>>>>>>>> do the current option as-is, and my main rationale is for
> >>>>>>>> optimization
> >>>>>>>>>>> rooms inside the Streams internals and the API
> >>> succinctness.
> >>>>> For
> >>>>>>>>> advanced
> >>>>>>>>>>> users who may indeed prefer KScatteredTable and do their
> >>> own
> >>>>>>>>>> optimization,
> >>>>>>>>>>> while it is too much of the work to use Processor API
> >>>>> directly, I
> >>>>>>>> think
> >>>>>>>>>> we
> >>>>>>>>>>> can still extend the current API to support it in the
> >>> future
> >>>>> if
> >>>>>> it
> >>>>>>>>>> becomes
> >>>>>>>>>>> necessary.
> >>>>>>>>>>
> >>>>>>>>>> no internal optimization potential. it's a myth
> >>>>>>>>>>
> >>>>>>>>>> ¯\_(ツ)_/¯
> >>>>>>>>>>
> >>>>>>>>>> :-)
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Another note about step 4) resolving out-of-ordering data,
> >>> as
> >>>>> I
> >>>>>>>>> mentioned
> >>>>>>>>>>> before I think with KIP-258 (embedded timestamp with
> >>> key-value
> >>>>>>> store)
> >>>>>>>>> we
> >>>>>>>>>>> can actually make this step simpler than the current
> >>>>> proposal. In
> >>>>>>>> fact,
> >>>>>>>>>> we
> >>>>>>>>>>> can just keep a single final-result store with timestamps
> >>> and
> >>>>>>> reject
> >>>>>>>>>> values
> >>>>>>>>>>> that have a smaller timestamp, is that right?
> >>>>>>>>>>
> >>>>>>>>>> Which is the correct output should at least be decided on the
> >>>>>> offset
> >>>>>>> of
> >>>>>>>>>> the original message.
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> That's all I have in mind now. Again, great appreciation to
> >>>>> Adam
> >>>>>> to
> >>>>>>>>> make
> >>>>>>>>>>> such HUGE progress on this KIP!
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Dec 5, 2018 at 2:40 PM Jan Filipiak <
> >>>>>>>> jan.filip...@trivago.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> If they don't find the time:
> >>>>>>>>>>>> They usually take the opposite path from me :D
> >>>>>>>>>>>> so the answer would be clear.
> >>>>>>>>>>>>
> >>>>>>>>>>>> hence my suggestion to vote.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 04.12.2018 21:06, Adam Bellemare wrote:
> >>>>>>>>>>>>> Hi Guozhang and Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I know both of you are quite busy, but we've gotten this
> >>> KIP
> >>>>>> to a
> >>>>>>>>> point
> >>>>>>>>>>>>> where we need more guidance on the API (perhaps a bit of
> >>> a
> >>>>>>>>> tie-breaker,
> >>>>>>>>>>>> if
> >>>>>>>>>>>>> you will). If you have anyone else you may think should
> >>>>> look at
> >>>>>>>> this,
> >>>>>>>>>>>>> please tag them accordingly.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The scenario is as such:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Current Option:
> >>>>>>>>>>>>> API:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-PublicInterfaces
> >>>>>>>>>>>>> 1) Rekey the data to CombinedKey, and shuffles it to the
> >>>>>>> partition
> >>>>>>>>> with
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> foreignKey (repartition 1)
> >>>>>>>>>>>>> 2) Join the data
> >>>>>>>>>>>>> 3) Shuffle the data back to the original node
> >>> (repartition
> >>>>> 2)
> >>>>>>>>>>>>> 4) Resolve out-of-order arrival / race condition due to
> >>>>>>> foreign-key
> >>>>>>>>>>>> changes.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Alternate Option:
> >>>>>>>>>>>>> Perform #1 and #2 above, and return a KScatteredTable.
> >>>>>>>>>>>>> - It would be keyed on a wrapped key function:
> >>>>> <CombinedKey<KO,
> >>>>>>> K>,
> >>>>>>>>> VR>
> >>>>>>>>>>>> (KO
> >>>>>>>>>>>>> = Other Table Key, K = This Table Key, VR = Joined
> >>> Result)
> >>>>>>>>>>>>> - KScatteredTable.resolve() would perform #3 and #4 but
> >>>>>>> otherwise a
> >>>>>>>>>> user
> >>>>>>>>>>>>> would be able to perform additional functions directly
> >>> from
> >>>>> the
> >>>>>>>>>>>>> KScatteredTable (TBD - currently out of scope).
> >>>>>>>>>>>>> - John's analysis 2-emails up is accurate as to the
> >>>>> tradeoffs.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Current Option is coded as-is. Alternate option is
> >>> possible,
> >>>>>> but
> >>>>>>>> will
> >>>>>>>>>>>>> require for implementation details to be made in the API
> >>> and
> >>>>>> some
> >>>>>>>>>>>> exposure
> >>>>>>>>>>>>> of new data structures into the API (ie: CombinedKey).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I appreciate any insight into this.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:59 PM Adam Bellemare <
> >>>>>>>>>> adam.bellem...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi John
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for your feedback and assistance. I think your
> >>>>> summary
> >>>>>> is
> >>>>>>>>>>>> accurate
> >>>>>>>>>>>>>> from my perspective. Additionally, I would like to add
> >>> that
> >>>>>>> there
> >>>>>>>>> is a
> >>>>>>>>>>>> risk
> >>>>>>>>>>>>>> of inconsistent final states without performing the
> >>>>>> resolution.
> >>>>>>>> This
> >>>>>>>>>> is
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>> major concern for me as most of the data I have dealt
> >>> with
> >>>>> is
> >>>>>>>>> produced
> >>>>>>>>>>>> by
> >>>>>>>>>>>>>> relational databases. We have seen a number of cases
> >>> where
> >>>>> a
> >>>>>>> user
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>>> Rails UI has modified the field (foreign key), realized
> >>>>> they
> >>>>>>> made
> >>>>>>>> a
> >>>>>>>>>>>>>> mistake, and then updated the field again with a new
> >>> key.
> >>>>> The
> >>>>>>>> events
> >>>>>>>>>> are
> >>>>>>>>>>>>>> propagated out as they are produced, and as such we have
> >>>>> had
> >>>>>>>>>> real-world
> >>>>>>>>>>>>>> cases where these inconsistencies were propagated
> >>>>> downstream
> >>>>>> as
> >>>>>>>> the
> >>>>>>>>>>>> final
> >>>>>>>>>>>>>> values due to the race conditions in the fanout of the
> >>>>> data.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This solution that I propose values correctness of the
> >>>>> final
> >>>>>>>> result
> >>>>>>>>>> over
> >>>>>>>>>>>>>> other factors.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We could always move this function over to using a
> >>>>>>> KScatteredTable
> >>>>>>>>>>>>>> implementation in the future, and simply deprecate it
> >>> this
> >>>>>> join
> >>>>>>>> API
> >>>>>>>>> in
> >>>>>>>>>>>>>> time. I think I would like to hear more from some of the
> >>>>> other
> >>>>>>>> major
> >>>>>>>>>>>>>> committers on which course of action they would think is
> >>>>> best
> >>>>>>>> before
> >>>>>>>>>> any
> >>>>>>>>>>>>>> more coding is done.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks again
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 8:24 PM John Roesler <
> >>>>>> j...@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Jan and Adam,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Wow, thanks for doing that test, Adam. Those results
> >>> are
> >>>>>>>>> encouraging.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for your performance experience as well, Jan. I
> >>>>> agree
> >>>>>>> that
> >>>>>>>>>>>> avoiding
> >>>>>>>>>>>>>>> unnecessary join outputs is especially important when
> >>> the
> >>>>>>> fan-out
> >>>>>>>>> is
> >>>>>>>>>> so
> >>>>>>>>>>>>>>> high. I suppose this could also be built into the
> >>>>>>> implementation
> >>>>>>>>>> we're
> >>>>>>>>>>>>>>> discussing, but it wouldn't have to be specified in the
> >>>>> KIP
> >>>>>>>> (since
> >>>>>>>>>>>> it's an
> >>>>>>>>>>>>>>> API-transparent optimization).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As far as whether or not to re-repartition the data, I
> >>>>> didn't
> >>>>>>>> bring
> >>>>>>>>>> it
> >>>>>>>>>>>> up
> >>>>>>>>>>>>>>> because it sounded like the two of you agreed to leave
> >>> the
> >>>>>> KIP
> >>>>>>>>> as-is,
> >>>>>>>>>>>>>>> despite the disagreement.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If you want my opinion, I feel like both approaches are
> >>>>>>>> reasonable.
> >>>>>>>>>>>>>>> It sounds like Jan values more the potential for
> >>>>> developers
> >>>>>> to
> >>>>>>>>>> optimize
> >>>>>>>>>>>>>>> their topologies to re-use the intermediate nodes,
> >>> whereas
> >>>>>> Adam
> >>>>>>>>>> places
> >>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>> value on having a single operator that people can use
> >>>>> without
> >>>>>>>> extra
> >>>>>>>>>>>> steps
> >>>>>>>>>>>>>>> at the end.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Personally, although I do find it exceptionally
> >>> annoying
> >>>>>> when a
> >>>>>>>>>>>> framework
> >>>>>>>>>>>>>>> gets in my way when I'm trying to optimize something,
> >>> it
> >>>>>> seems
> >>>>>>>>> better
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> go
> >>>>>>>>>>>>>>> for a single operation.
> >>>>>>>>>>>>>>> * Encapsulating the internal transitions gives us
> >>>>> significant
> >>>>>>>>>> latitude
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> the implementation (for example, joining only at the
> >>> end,
> >>>>> not
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>>>>> middle
> >>>>>>>>>>>>>>> to avoid extra data copying and out-of-order
> >>> resolution;
> >>>>> how
> >>>>>> we
> >>>>>>>>>>>> represent
> >>>>>>>>>>>>>>> the first repartition keys (combined keys vs. value
> >>>>> vectors),
> >>>>>>>>> etc.).
> >>>>>>>>>>>> If we
> >>>>>>>>>>>>>>> publish something like a KScatteredTable with the
> >>>>>>>> right-partitioned
> >>>>>>>>>>>> joined
> >>>>>>>>>>>>>>> data, then the API pretty much locks in the
> >>>>> implementation as
> >>>>>>>> well.
> >>>>>>>>>>>>>>> * The API seems simpler to understand and use. I do
> >>> mean
> >>>>>>> "seems";
> >>>>>>>>> if
> >>>>>>>>>>>>>>> anyone
> >>>>>>>>>>>>>>> wants to make the case that KScatteredTable is actually
> >>>>>>> simpler,
> >>>>>>>> I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>>> hypothetical usage code would help. From a relational
> >>>>> algebra
> >>>>>>>>>>>> perspective,
> >>>>>>>>>>>>>>> it seems like KTable.join(KTable) should produce a new
> >>>>> KTable
> >>>>>>> in
> >>>>>>>>> all
> >>>>>>>>>>>>>>> cases.
> >>>>>>>>>>>>>>> * That said, there might still be room in the API for a
> >>>>>>> different
> >>>>>>>>>>>>>>> operation
> >>>>>>>>>>>>>>> like what Jan has proposed to scatter a KTable, and
> >>> then
> >>>>> do
> >>>>>>>> things
> >>>>>>>>>> like
> >>>>>>>>>>>>>>> join, re-group, etc from there... I'm not sure; I
> >>> haven't
> >>>>>>> thought
> >>>>>>>>>>>> through
> >>>>>>>>>>>>>>> all the consequences yet.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This is all just my opinion after thinking over the
> >>>>>> discussion
> >>>>>>> so
> >>>>>>>>>>>> far...
> >>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 2:56 PM Adam Bellemare <
> >>>>>>>>>>>> adam.bellem...@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Updated the PR to take into account John's feedback.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I did some preliminary testing for the performance of
> >>> the
> >>>>>>>>>> prefixScan.
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> have attached the file, but I will also include the
> >>> text
> >>>>> in
> >>>>>>> the
> >>>>>>>>> body
> >>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>> for archival purposes (I am not sure what happens to
> >>>>>> attached
> >>>>>>>>>> files).
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> also updated the PR and the KIP accordingly.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Summary: It scales exceptionally well for scanning
> >>> large
> >>>>>>> values
> >>>>>>>> of
> >>>>>>>>>>>>>>>> records. As Jan mentioned previously, the real issue
> >>>>> would
> >>>>>> be
> >>>>>>>> more
> >>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>> processing the resulting records after obtaining them.
> >>>>> For
> >>>>>>>>> instance,
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> takes approximately ~80-120 mS to flush the buffer
> >>> and a
> >>>>>>> further
> >>>>>>>>>>>>>>> ~35-85mS
> >>>>>>>>>>>>>>>> to scan 27.5M records, obtaining matches for 2.5M of
> >>>>> them.
> >>>>>>>>> Iterating
> >>>>>>>>>>>>>>>> through the records just to generate a simple count
> >>>>> takes ~
> >>>>>> 40
> >>>>>>>>> times
> >>>>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>> than the flush + scan combined.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> ============================================================================================
> >>>>>>>>>>>>>>>> Setup:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> ============================================================================================
> >>>>>>>>>>>>>>>> Java 9 with default settings aside from a 512 MB heap
> >>>>>>> (Xmx512m,
> >>>>>>>>>>>> Xms512m)
> >>>>>>>>>>>>>>>> CPU: i7 2.2 Ghz.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Note: I am using a slightly-modified,
> >>> directly-accessible
> >>>>>>> Kafka
> >>>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>> implementation (RocksDB.java, basically just avoiding
> >>> the
> >>>>>>>>>>>>>>>> ProcessorContext).
> >>>>>>>>>>>>>>>> There are no modifications to the default RocksDB
> >>> values
> >>>>>>>> provided
> >>>>>>>>> in
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> 2.1/trunk release.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> keysize = 128 bytes
> >>>>>>>>>>>>>>>> valsize = 512 bytes
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Step 1:
> >>>>>>>>>>>>>>>> Write X positive matching events: (key = prefix +
> >>>>>> left-padded
> >>>>>>>>>>>>>>>> auto-incrementing integer)
> >>>>>>>>>>>>>>>> Step 2:
> >>>>>>>>>>>>>>>> Write 10X negative matching events (key = left-padded
> >>>>>>>>>>>> auto-incrementing
> >>>>>>>>>>>>>>>> integer)
> >>>>>>>>>>>>>>>> Step 3:
> >>>>>>>>>>>>>>>> Perform flush
> >>>>>>>>>>>>>>>> Step 4:
> >>>>>>>>>>>>>>>> Perform prefixScan
> >>>>>>>>>>>>>>>> Step 5:
> >>>>>>>>>>>>>>>> Iterate through return Iterator and validate the
> >>> count of
> >>>>>>>> expected
> >>>>>>>>>>>>>>> events.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> ============================================================================================
> >>>>>>>>>>>>>>>> Results:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> ============================================================================================
> >>>>>>>>>>>>>>>> X = 1k (11k events total)
> >>>>>>>>>>>>>>>> Flush Time = 39 mS
> >>>>>>>>>>>>>>>> Scan Time = 7 mS
> >>>>>>>>>>>>>>>> 6.9 MB disk
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>> X = 10k (110k events total)
> >>>>>>>>>>>>>>>> Flush Time = 45 mS
> >>>>>>>>>>>>>>>> Scan Time = 8 mS
> >>>>>>>>>>>>>>>> 127 MB
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>> X = 100k (1.1M events total)
> >>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>> Flush Time = 60 mS
> >>>>>>>>>>>>>>>> Scan Time = 12 mS
> >>>>>>>>>>>>>>>> 678 MB
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>> Flush Time = 45 mS
> >>>>>>>>>>>>>>>> Scan Time = 7 mS
> >>>>>>>>>>>>>>>> 576 MB
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>> X = 1MB (11M events total)
> >>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>> Flush Time = 52 mS
> >>>>>>>>>>>>>>>> Scan Time = 19 mS
> >>>>>>>>>>>>>>>> 7.2 GB
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>> Flush Time = 84 mS
> >>>>>>>>>>>>>>>> Scan Time = 34 mS
> >>>>>>>>>>>>>>>> 9.1 GB
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>> X = 2.5M (27.5M events total)
> >>>>>>>>>>>>>>>> Test1:
> >>>>>>>>>>>>>>>> Flush Time = 82 mS
> >>>>>>>>>>>>>>>> Scan Time = 63 mS
> >>>>>>>>>>>>>>>> 17GB - 276 sst files
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Test2:
> >>>>>>>>>>>>>>>> Flush Time = 116 mS
> >>>>>>>>>>>>>>>> Scan Time = 35 mS
> >>>>>>>>>>>>>>>> 23GB - 361 sst files
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Test3:
> >>>>>>>>>>>>>>>> Flush Time = 103 mS
> >>>>>>>>>>>>>>>> Scan Time = 82 mS
> >>>>>>>>>>>>>>>> 19 GB - 300 sst files
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> --------------------------------------------------------------------------------------------
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I had to limit my testing on my laptop to X = 2.5M
> >>>>> events. I
> >>>>>>>> tried
> >>>>>>>>>> to
> >>>>>>>>>>>> go
> >>>>>>>>>>>>>>>> to X = 10M (110M events) but RocksDB was going into
> >>> the
> >>>>>> 100GB+
> >>>>>>>>> range
> >>>>>>>>>>>>>>> and my
> >>>>>>>>>>>>>>>> laptop ran out of disk. More extensive testing could
> >>> be
> >>>>> done
> >>>>>>>> but I
> >>>>>>>>>>>>>>> suspect
> >>>>>>>>>>>>>>>> that it would be in line with what we're seeing in the
> >>>>>> results
> >>>>>>>>>> above.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> At this point in time, I think the only major
> >>> discussion
> >>>>>> point
> >>>>>>>> is
> >>>>>>>>>>>> really
> >>>>>>>>>>>>>>>> around what Jan and I have disagreed on:
> >>> repartitioning
> >>>>>> back +
> >>>>>>>>>>>> resolving
> >>>>>>>>>>>>>>>> potential out of order issues or leaving that up to
> >>> the
> >>>>>> client
> >>>>>>>> to
> >>>>>>>>>>>>>>> handle.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks folks,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:34 AM Jan Filipiak <
> >>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 29.11.2018 15:14, John Roesler wrote:
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Sorry that this discussion petered out... I think
> >>> the
> >>>>> 2.1
> >>>>>>>>> release
> >>>>>>>>>>>>>>>>> caused an
> >>>>>>>>>>>>>>>>>> extended distraction that pushed it off everyone's
> >>>>> radar
> >>>>>>>> (which
> >>>>>>>>>> was
> >>>>>>>>>>>>>>>>>> precisely Adam's concern). Personally, I've also had
> >>>>> some
> >>>>>>>> extend
> >>>>>>>>>>>>>>>>>> distractions of my own that kept (and continue to
> >>>>> keep) me
> >>>>>>>>>>>>>>> preoccupied.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> However, calling for a vote did wake me up, so I
> >>> guess
> >>>>> Jan
> >>>>>>> was
> >>>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>> track!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've gone back and reviewed the whole KIP document
> >>> and
> >>>>> the
> >>>>>>>> prior
> >>>>>>>>>>>>>>>>>> discussion, and I'd like to offer a few thoughts:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> API Thoughts:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. If I read the KIP right, you are proposing a
> >>>>>> many-to-one
> >>>>>>>>> join.
> >>>>>>>>>>>>>>> Could
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> consider naming it manyToOneJoin? Or, if you prefer,
> >>>>> flip
> >>>>>>> the
> >>>>>>>>>> design
> >>>>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>> and make it a oneToManyJoin?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The proposed name "joinOnForeignKey" disguises the
> >>> join
> >>>>>>> type,
> >>>>>>>>> and
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>> like it might trick some people into using it for a
> >>>>>>> one-to-one
> >>>>>>>>>> join.
> >>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> would work, of course, but it would be super
> >>>>> inefficient
> >>>>>>>>> compared
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> simple rekey-and-join.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2. I might have missed it, but I don't think it's
> >>>>>> specified
> >>>>>>>>>> whether
> >>>>>>>>>>>>>>>>> it's an
> >>>>>>>>>>>>>>>>>> inner, outer, or left join. I'm guessing an outer
> >>>>> join, as
> >>>>>>>>>>>>>>> (neglecting
> >>>>>>>>>>>>>>>>> IQ),
> >>>>>>>>>>>>>>>>>> the rest can be achieved by filtering or by handling
> >>>>> it in
> >>>>>>> the
> >>>>>>>>>>>>>>>>> ValueJoiner.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. The arg list to joinOnForeignKey doesn't look
> >>> quite
> >>>>>>> right.
> >>>>>>>>>>>>>>>>>> 3a. Regarding Serialized: There are a few different
> >>>>>>> paradigms
> >>>>>>>> in
> >>>>>>>>>>>>>>> play in
> >>>>>>>>>>>>>>>>>> the Streams API, so it's confusing, but instead of
> >>>>> three
> >>>>>>>>>> Serialized
> >>>>>>>>>>>>>>>>> args, I
> >>>>>>>>>>>>>>>>>> think it would be better to have one that allows
> >>>>>>> (optionally)
> >>>>>>>>>>>> setting
> >>>>>>>>>>>>>>>>> the 4
> >>>>>>>>>>>>>>>>>> incoming serdes. The result serde is defined by the
> >>>>>>>>> Materialized.
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>> incoming serdes can be optional because they might
> >>>>> already
> >>>>>>> be
> >>>>>>>>>>>>>>> available
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>> the source KTables, or the default serdes from the
> >>>>> config
> >>>>>>>> might
> >>>>>>>>> be
> >>>>>>>>>>>>>>>>>> applicable.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3b. Is the StreamPartitioner necessary? The other
> >>> joins
> >>>>>>> don't
> >>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> setting
> >>>>>>>>>>>>>>>>>> one, and it seems like it might actually be harmful,
> >>>>> since
> >>>>>>> the
> >>>>>>>>>> rekey
> >>>>>>>>>>>>>>>>>> operation needs to produce results that are
> >>>>> co-partitioned
> >>>>>>>> with
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> "other"
> >>>>>>>>>>>>>>>>>> KTable.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 4. I'm fine with the "reserved word" header, but I
> >>>>> didn't
> >>>>>>>>> actually
> >>>>>>>>>>>>>>>>> follow
> >>>>>>>>>>>>>>>>>> what Matthias meant about namespacing requiring
> >>>>>>>> "deserializing"
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>> header. The headers are already Strings, so I don't
> >>>>> think
> >>>>>>> that
> >>>>>>>>>>>>>>>>>> deserialization is required. If we applied the
> >>>>> namespace
> >>>>>> at
> >>>>>>>>> source
> >>>>>>>>>>>>>>> nodes
> >>>>>>>>>>>>>>>>>> and stripped it at sink nodes, this would be
> >>>>> practically
> >>>>>> no
> >>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>> advantage of the namespace idea is that no public
> >>> API
> >>>>>> change
> >>>>>>>> wrt
> >>>>>>>>>>>>>>> headers
> >>>>>>>>>>>>>>>>>> needs to happen, and no restrictions need to be
> >>> placed
> >>>>> on
> >>>>>>>> users'
> >>>>>>>>>>>>>>>>> headers.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> (Although I'm wondering if we can get away without
> >>> the
> >>>>>>> header
> >>>>>>>> at
> >>>>>>>>>>>>>>> all...
> >>>>>>>>>>>>>>>>>> stay tuned)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 5. I also didn't follow the discussion about the HWM
> >>>>> table
> >>>>>>>>> growing
> >>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>> bound. As I read it, the HWM table is effectively
> >>>>>>> implementing
> >>>>>>>>> OCC
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> resolve the problem you noted with disordering when
> >>> the
> >>>>>>> rekey
> >>>>>>>> is
> >>>>>>>>>>>>>>>>>> reversed... particularly notable when the FK
> >>> changes.
> >>>>> As
> >>>>>>> such,
> >>>>>>>>> it
> >>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>> needs to track the most recent "version" (the
> >>> offset in
> >>>>>> the
> >>>>>>>>> source
> >>>>>>>>>>>>>>>>>> partition) of each key. Therefore, it should have
> >>> the
> >>>>> same
> >>>>>>>>> number
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>> keys
> >>>>>>>>>>>>>>>>>> as the source table at all times.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I see that you are aware of KIP-258, which I think
> >>>>> might
> >>>>>> be
> >>>>>>>>>> relevant
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> couple of ways. One: it's just about storing the
> >>>>> timestamp
> >>>>>>> in
> >>>>>>>>> the
> >>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> store, but the ultimate idea is to effectively use
> >>> the
> >>>>>>>> timestamp
> >>>>>>>>>> as
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> OCC
> >>>>>>>>>>>>>>>>>> "version" to drop disordered updates. You wouldn't
> >>>>> want to
> >>>>>>> use
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> timestamp for this operation, but if you were to
> >>> use a
> >>>>>>> similar
> >>>>>>>>>>>>>>>>> mechanism to
> >>>>>>>>>>>>>>>>>> store the source offset in the store alongside the
> >>>>>> re-keyed
> >>>>>>>>>> values,
> >>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>> you could avoid a separate table.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 6. You and Jan have been thinking about this for a
> >>> long
> >>>>>>> time,
> >>>>>>>> so
> >>>>>>>>>>>> I've
> >>>>>>>>>>>>>>>>>> probably missed something here, but I'm wondering
> >>> if we
> >>>>>> can
> >>>>>>>>> avoid
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> HWM
> >>>>>>>>>>>>>>>>>> tracking at all and resolve out-of-order during a
> >>> final
> >>>>>> join
> >>>>>>>>>>>>>>> instead...
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Let's say we're joining a left table (Integer K:
> >>> Letter
> >>>>>> FK,
> >>>>>>>>> (other
> >>>>>>>>>>>>>>>>> data))
> >>>>>>>>>>>>>>>>>> to a right table (Letter K: (some data)).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Left table:
> >>>>>>>>>>>>>>>>>> 1: (A, xyz)
> >>>>>>>>>>>>>>>>>> 2: (B, asd)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Right table:
> >>>>>>>>>>>>>>>>>> A: EntityA
> >>>>>>>>>>>>>>>>>> B: EntityB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> We could do a rekey as you proposed with a combined
> >>>>> key,
> >>>>>> but
> >>>>>>>> not
> >>>>>>>>>>>>>>>>>> propagating the value at all..
> >>>>>>>>>>>>>>>>>> Rekey table:
> >>>>>>>>>>>>>>>>>> A-1: (dummy value)
> >>>>>>>>>>>>>>>>>> B-2: (dummy value)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Which we then join with the right table to produce:
> >>>>>>>>>>>>>>>>>> A-1: EntityA
> >>>>>>>>>>>>>>>>>> B-2: EntityB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Which gets rekeyed back:
> >>>>>>>>>>>>>>>>>> 1: A, EntityA
> >>>>>>>>>>>>>>>>>> 2: B, EntityB
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> And finally we do the actual join:
> >>>>>>>>>>>>>>>>>> Result table:
> >>>>>>>>>>>>>>>>>> 1: ((A, xyz), EntityA)
> >>>>>>>>>>>>>>>>>> 2: ((B, asd), EntityB)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The thing is that in that last join, we have the
> >>>>>> opportunity
> >>>>>>>> to
> >>>>>>>>>>>>>>> compare
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> current FK in the left table with the incoming PK of
> >>>>> the
> >>>>>>> right
> >>>>>>>>>>>>>>> table. If
> >>>>>>>>>>>>>>>>>> they don't match, we just drop the event, since it
> >>>>> must be
> >>>>>>>>>> outdated.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> In your KIP, you gave an example in which (1: A,
> >>> xyz)
> >>>>> gets
> >>>>>>>>> updated
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> (1:
> >>>>>>>>>>>>>>>>>> B, xyz), ultimately yielding a conundrum about
> >>> whether
> >>>>> the
> >>>>>>>> final
> >>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> should be (1: null) or (1: joined-on-B). With the
> >>>>>> algorithm
> >>>>>>>>> above,
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> would be considering (1: (B, xyz), (A, null)) vs (1:
> >>>>> (B,
> >>>>>>> xyz),
> >>>>>>>>> (B,
> >>>>>>>>>>>>>>>>>> EntityB)). It seems like this does give you enough
> >>>>>>> information
> >>>>>>>>> to
> >>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> right choice, regardless of disordering.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Will check Adams patch, but this should work. As
> >>>>> mentioned
> >>>>>>>> often
> >>>>>>>>> I
> >>>>>>>>>> am
> >>>>>>>>>>>>>>>>> not convinced on partitioning back for the user
> >>>>>>> automatically.
> >>>>>>>> I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>> this is the real performance eater ;)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 7. Last thought... I'm a little concerned about the
> >>>>>>>> performance
> >>>>>>>>> of
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> range scans when records change in the right table.
> >>>>> You've
> >>>>>>>> said
> >>>>>>>>>> that
> >>>>>>>>>>>>>>>>> you've
> >>>>>>>>>>>>>>>>>> been using the algorithm you presented in production
> >>>>> for a
> >>>>>>>>> while.
> >>>>>>>>>>>> Can
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> give us a sense of the performance characteristics
> >>>>> you've
> >>>>>>>>>> observed?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Make it work, make it fast, make it beautiful. The
> >>>>> topmost
> >>>>>>>> thing
> >>>>>>>>>> here
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> / was correctness. In practice I do not measure the
> >>>>>>> performance
> >>>>>>>>> of
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> range scan. Usual cases I run this with is emitting
> >>>>> 500k -
> >>>>>>> 1kk
> >>>>>>>>> rows
> >>>>>>>>>>>>>>>>> on a left hand side change. The range scan is just
> >>> the
> >>>>> work
> >>>>>>> you
> >>>>>>>>>> gotta
> >>>>>>>>>>>>>>>>> do, also when you pack your data into different
> >>> formats,
> >>>>>>>> usually
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> rocks performance is very tight to the size of the
> >>> data
> >>>>> and
> >>>>>>> we
> >>>>>>>>>> can't
> >>>>>>>>>>>>>>>>> really change that. It is more important for users to
> >>>>>> prevent
> >>>>>>>>>> useless
> >>>>>>>>>>>>>>>>> updates to begin with. My left hand side is guarded
> >>> to
> >>>>> drop
> >>>>>>>>> changes
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> are not going to change my join output.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> usually it's:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> drop unused fields and then don't forward if
> >>>>>> old.equals(new)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> regarding to the performance of creating an iterator
> >>> for
> >>>>>>>> smaller
> >>>>>>>>>>>>>>>>> fanouts, users can still just do a group by first
> >>> then
> >>>>>>> anyways.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I could only think of one alternative, but I'm not
> >>>>> sure if
> >>>>>>>> it's
> >>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>> worse... If the first re-key only needs to preserve
> >>> the
> >>>>>>>> original
> >>>>>>>>>>>> key,
> >>>>>>>>>>>>>>>>> as I
> >>>>>>>>>>>>>>>>>> proposed in #6, then we could store a vector of
> >>> keys in
> >>>>>> the
> >>>>>>>>> value:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Left table:
> >>>>>>>>>>>>>>>>>> 1: A,...
> >>>>>>>>>>>>>>>>>> 2: B,...
> >>>>>>>>>>>>>>>>>> 3: A,...
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Gets re-keyed:
> >>>>>>>>>>>>>>>>>> A: [1, 3]
> >>>>>>>>>>>>>>>>>> B: [2]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Then, the rhs part of the join would only need a
> >>>>> regular
> >>>>>>>>>> single-key
> >>>>>>>>>>>>>>>>> lookup.
> >>>>>>>>>>>>>>>>>> Of course we have to deal with the problem of large
> >>>>>> values,
> >>>>>>> as
> >>>>>>>>>>>>>>> there's
> >>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>> bound on the number of lhs records that can
> >>> reference
> >>>>> rhs
> >>>>>>>>> records.
> >>>>>>>>>>>>>>>>> Offhand,
> >>>>>>>>>>>>>>>>>> I'd say we could page the values, so when one row is
> >>>>> past
> >>>>>>> the
> >>>>>>>>>>>>>>>>> threshold, we
> >>>>>>>>>>>>>>>>>> append the key for the next page. Then in most
> >>> cases,
> >>>>> it
> >>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>> key lookup, but for large fan-out updates, it would
> >>> be
> >>>>> one
> >>>>>>> per
> >>>>>>>>>> (max
> >>>>>>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>>>> size)/(avg lhs key size).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This seems more complex, though... Plus, I think
> >>>>> there's
> >>>>>>> some
> >>>>>>>>>> extra
> >>>>>>>>>>>>>>>>>> tracking we'd need to do to know when to emit a
> >>>>>> retraction.
> >>>>>>>> For
> >>>>>>>>>>>>>>> example,
> >>>>>>>>>>>>>>>>>> when record 1 is deleted, the re-key table would
> >>> just
> >>>>> have
> >>>>>>> (A:
> >>>>>>>>>> [3]).
> >>>>>>>>>>>>>>>>> Some
> >>>>>>>>>>>>>>>>>> kind of tombstone is needed so that the join result
> >>>>> for 1
> >>>>>>> can
> >>>>>>>>> also
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> retracted.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> That's all!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks so much to both Adam and Jan for the
> >>> thoughtful
> >>>>>> KIP.
> >>>>>>>>> Sorry
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> discussion has been slow.
> >>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Fri, Oct 12, 2018 at 2:20 AM Jan Filipiak <
> >>>>>>>>>>>>>>> jan.filip...@trivago.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Id say you can just call the vote.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> that happens all the time, and if something comes
> >>> up,
> >>>>> it
> >>>>>>> just
> >>>>>>>>>> goes
> >>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>> to discuss.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> would not expect to much attention with another
> >>>>> another
> >>>>>>> email
> >>>>>>>>> in
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> best Jan
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 09.10.2018 13:56, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>> Hello Contributors
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I know that 2.1 is about to be released, but I do
> >>>>> need
> >>>>>> to
> >>>>>>>> bump
> >>>>>>>>>>>>>>> this to
> >>>>>>>>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>>>> visibility up. I am still intending to push this
> >>>>> through
> >>>>>>>> once
> >>>>>>>>>>>>>>>>> contributor
> >>>>>>>>>>>>>>>>>>>> feedback is given.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Main points that need addressing:
> >>>>>>>>>>>>>>>>>>>> 1) Any way (or benefit) in structuring the current
> >>>>>>> singular
> >>>>>>>>>> graph
> >>>>>>>>>>>>>>> node
> >>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>> multiple nodes? It has a whopping 25 parameters
> >>> right
> >>>>>>> now. I
> >>>>>>>>> am
> >>>>>>>>>> a
> >>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>> fuzzy
> >>>>>>>>>>>>>>>>>>>> on how the optimizations are supposed to work, so
> >>> I
> >>>>>> would
> >>>>>>>>>>>>>>> appreciate
> >>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> help on this aspect.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2) Overall strategy for joining + resolving. This
> >>>>> thread
> >>>>>>> has
> >>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>> discourse
> >>>>>>>>>>>>>>>>>>>> between Jan and I between the current highwater
> >>> mark
> >>>>>>>> proposal
> >>>>>>>>>> and
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> groupBy
> >>>>>>>>>>>>>>>>>>>> + reduce proposal. I am of the opinion that we
> >>> need
> >>>>> to
> >>>>>>>>> strictly
> >>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> chance of out-of-order data and leave none of it
> >>> up
> >>>>> to
> >>>>>> the
> >>>>>>>>>>>>>>> consumer.
> >>>>>>>>>>>>>>>>> Any
> >>>>>>>>>>>>>>>>>>>> comments or suggestions here would also help.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3) Anything else that you see that would prevent
> >>> this
> >>>>>> from
> >>>>>>>>>> moving
> >>>>>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>> vote?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sun, Sep 30, 2018 at 10:23 AM Adam Bellemare <
> >>>>>>>>>>>>>>>>>>> adam.bellem...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Jan
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> With the Stores.windowStoreBuilder and
> >>>>>>>>>>>>>>> Stores.persistentWindowStore,
> >>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>> actually only need to specify the amount of
> >>> segments
> >>>>>> you
> >>>>>>>> want
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>> large
> >>>>>>>>>>>>>>>>>>>>> they are. To the best of my understanding, what
> >>>>> happens
> >>>>>>> is
> >>>>>>>>> that
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> segments are automatically rolled over as new
> >>> data
> >>>>> with
> >>>>>>> new
> >>>>>>>>>>>>>>>>> timestamps
> >>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> created. We use this exact functionality in some
> >>> of
> >>>>> the
> >>>>>>>> work
> >>>>>>>>>> done
> >>>>>>>>>>>>>>>>>>>>> internally at my company. For reference, this is
> >>> the
> >>>>>>>> hopping
> >>>>>>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>>>> store.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#id21
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> In the code that I have provided, there are going
> >>>>> to be
> >>>>>>> two
> >>>>>>>>> 24h
> >>>>>>>>>>>>>>>>>>> segments.
> >>>>>>>>>>>>>>>>>>>>> When a record is put into the windowStore, it
> >>> will
> >>>>> be
> >>>>>>>>> inserted
> >>>>>>>>>> at
> >>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>> T in
> >>>>>>>>>>>>>>>>>>>>> both segments. The two segments will always
> >>> overlap
> >>>>> by
> >>>>>>> 12h.
> >>>>>>>>> As
> >>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>> goes on
> >>>>>>>>>>>>>>>>>>>>> and new records are added (say at time T+12h+),
> >>> the
> >>>>>>> oldest
> >>>>>>>>>>>> segment
> >>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> automatically deleted and a new segment created.
> >>> The
> >>>>>>>> records
> >>>>>>>>>> are
> >>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>>> inserted with the context.timestamp(), such that
> >>> it
> >>>>> is
> >>>>>>> the
> >>>>>>>>>> record
> >>>>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>> the clock time, which is used.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> To the best of my understanding, the timestamps
> >>> are
> >>>>>>>> retained
> >>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>> restoring from the changelog.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Basically, this is heavy-handed way to deal with
> >>> TTL
> >>>>>> at a
> >>>>>>>>>>>>>>>>> segment-level,
> >>>>>>>>>>>>>>>>>>>>> instead of at an individual record level.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Sep 25, 2018 at 5:18 PM Jan Filipiak <
> >>>>>>>>>>>>>>>>> jan.filip...@trivago.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Will that work? I expected it to blow up with
> >>>>>>>>>> ClassCastException
> >>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>> similar.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> You either would have to specify the window you
> >>>>>>> fetch/put
> >>>>>>>> or
> >>>>>>>>>>>>>>> iterate
> >>>>>>>>>>>>>>>>>>>>>> across all windows the key was found in right?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I just hope the window-store doesn't check
> >>>>> stream-time
> >>>>>>>> under
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> hoods
> >>>>>>>>>>>>>>>>>>>>>> that would be a questionable interface.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If it does: did you see my comment on checking
> >>> all
> >>>>> the
> >>>>>>>>> windows
> >>>>>>>>>>>>>>>>> earlier?
> >>>>>>>>>>>>>>>>>>>>>> that would be needed to actually give reasonable
> >>>>> time
> >>>>>>>>>> gurantees.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Best
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 25.09.2018 13:18, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi Jan
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Check for  " highwaterMat " in the PR. I only
> >>>>> changed
> >>>>>>> the
> >>>>>>>>>> state
> >>>>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> the ProcessorSupplier.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <
> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On 24.09.2018 16:26, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information. This is indeed
> >>>>>> something
> >>>>>>>> that
> >>>>>>>>>>>>>>> will be
> >>>>>>>>>>>>>>>>>>>>>>>>> extremely
> >>>>>>>>>>>>>>>>>>>>>>>>> useful for this KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your explanations. That being
> >>> said, I
> >>>>>> will
> >>>>>>>> not
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>>> moving
> >>>>>>>>>>>>>>>>>>>>>> ahead
> >>>>>>>>>>>>>>>>>>>>>>>>> with an implementation using
> >>> reshuffle/groupBy
> >>>>>>> solution
> >>>>>>>>> as
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>> propose.
> >>>>>>>>>>>>>>>>>>>>>>>>> That being said, if you wish to implement it
> >>>>>> yourself
> >>>>>>>> off
> >>>>>>>>>> of
> >>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>> current PR
> >>>>>>>>>>>>>>>>>>>>>>>>> and submit it as a competitive alternative, I
> >>>>> would
> >>>>>>> be
> >>>>>>>>> more
> >>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>> happy to
> >>>>>>>>>>>>>>>>>>>>>>>>> help vet that as an alternate solution. As it
> >>>>>> stands
> >>>>>>>>> right
> >>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>> I do
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>> really have more time to invest into
> >>>>> alternatives
> >>>>>>>> without
> >>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>> strong indication from the binding voters
> >>> which
> >>>>>> they
> >>>>>>>>> would
> >>>>>>>>>>>>>>>>> prefer.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hey, total no worries. I think I personally
> >>> gave
> >>>>> up
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>>>>>>> streams
> >>>>>>>>>>>>>>>>>>> DSL
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> some time already, otherwise I would have
> >>> pulled
> >>>>>> this
> >>>>>>>> KIP
> >>>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>>>>>>> already.
> >>>>>>>>>>>>>>>>>>>>>>>> I am currently reimplementing my own DSL
> >>> based on
> >>>>>>> PAPI.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I will look at finishing up my PR with the
> >>>>> windowed
> >>>>>>>> state
> >>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>>>>>>> week or so, exercising it via tests, and
> >>> then I
> >>>>>> will
> >>>>>>>> come
> >>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> final
> >>>>>>>>>>>>>>>>>>>>>>>>> discussions. In the meantime, I hope that
> >>> any of
> >>>>>> the
> >>>>>>>>>> binding
> >>>>>>>>>>>>>>>>> voters
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>> take a look at the KIP in the wiki. I have
> >>>>> updated
> >>>>>> it
> >>>>>>>>>>>>>>> according
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> latest plan:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >>>>>>>>>>>>>>>>>>>>>>>>> Support+non-key+joining+in+KTable
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I have also updated the KIP PR to use a
> >>> windowed
> >>>>>>> store.
> >>>>>>>>>> This
> >>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> replaced by the results of KIP-258 whenever
> >>> they
> >>>>>> are
> >>>>>>>>>>>>>>> completed.
> >>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/5527
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Is the HighWatermarkResolverProccessorsupplier
> >>>>>> already
> >>>>>>>>>> updated
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> PR?
> >>>>>>>>>>>>>>>>>>>>>>>> expected it to change to Windowed<K>,Long
> >>> Missing
> >>>>>>>>> something?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang
> >>> Wang <
> >>>>>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Correction on my previous email: KAFKA-5533
> >>> is
> >>>>> the
> >>>>>>>> wrong
> >>>>>>>>>>>> link,
> >>>>>>>>>>>>>>>>> as it
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> corresponding changelog mechanisms. But as
> >>>>> part of
> >>>>>>>>> KIP-258
> >>>>>>>>>>>>>>> we do
> >>>>>>>>>>>>>>>>>>>>>> want to
> >>>>>>>>>>>>>>>>>>>>>>>>>> have "handling out-of-order data for source
> >>>>>> KTable"
> >>>>>>>> such
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> instead of
> >>>>>>>>>>>>>>>>>>>>>>>>>> blindly apply the updates to the
> >>> materialized
> >>>>>> store,
> >>>>>>>>> i.e.
> >>>>>>>>>>>>>>>>> following
> >>>>>>>>>>>>>>>>>>>>>>>>>> offset
> >>>>>>>>>>>>>>>>>>>>>>>>>> ordering, we will reject updates that are
> >>> older
> >>>>>> than
> >>>>>>>> the
> >>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>>>>>> key's
> >>>>>>>>>>>>>>>>>>>>>>>>>> timestamps, i.e. following timestamp
> >>> ordering.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang
> >>>>> Wang <
> >>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hello Adam,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the explanation. Regarding the
> >>>>> final
> >>>>>>> step
> >>>>>>>>>> (i.e.
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> high
> >>>>>>>>>>>>>>>>>>>>>>>>>>> watermark store, now altered to be replaced
> >>>>> with
> >>>>>> a
> >>>>>>>>> window
> >>>>>>>>>>>>>>>>> store),
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>> another current on-going KIP may actually
> >>>>> help:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> This is for adding the timestamp into a
> >>>>> key-value
> >>>>>>>> store
> >>>>>>>>>>>>>>> (i.e.
> >>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>> non-windowed KTable), and then one of its
> >>>>> usage,
> >>>>>> as
> >>>>>>>>>>>>>>> described
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>> https://issues.apache.org/jira/browse/KAFKA-5533
> >>>>>> ,
> >>>>>>> is
> >>>>>>>>>> that
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>>>>>>> "reject" updates from the source topics if
> >>> its
> >>>>>>>>> timestamp
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> smaller
> >>>>>>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the current key's latest update timestamp.
> >>> I
> >>>>>> think
> >>>>>>> it
> >>>>>>>>> is
> >>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>> similar to
> >>>>>>>>>>>>>>>>>>>>>>>>>>> what you have in mind for high watermark
> >>> based
> >>>>>>>>> filtering,
> >>>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>>>> need to make sure that the timestamps of
> >>> the
> >>>>>>> joining
> >>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> correctly
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> inherited though the whole topology to the
> >>>>> final
> >>>>>>>> stage.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Note that this KIP is for key-value store
> >>> and
> >>>>>> hence
> >>>>>>>>>>>>>>>>> non-windowed
> >>>>>>>>>>>>>>>>>>>>>> KTables
> >>>>>>>>>>>>>>>>>>>>>>>>>>> only, but for windowed KTables we do not
> >>>>> really
> >>>>>>> have
> >>>>>>>> a
> >>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>> their joins anyways (
> >>>>>>>>>>>>>>>>>>>>>>
> >>> https://issues.apache.org/jira/browse/KAFKA-7107)
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>> think we can just consider non-windowed
> >>>>>>> KTable-KTable
> >>>>>>>>>>>>>>> non-key
> >>>>>>>>>>>>>>>>>>> joins
> >>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>> now. In which case, KIP-258 should help.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan
> >>> Filipiak
> >>>>> <
> >>>>>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current highwater mark implementation
> >>> would
> >>>>>> grow
> >>>>>>>>>>>> endlessly
> >>>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> primary key of original event. It is a
> >>> pair
> >>>>> of
> >>>>>>>> (<this
> >>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> key>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> <highest offset seen for that key>). This
> >>> is
> >>>>> used
> >>>>>>> to
> >>>>>>>>>>>>>>>>> differentiate
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> late arrivals and new updates. My newest
> >>>>> proposal
> >>>>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> replace
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> with a Windowed state store of Duration N.
> >>>>> This
> >>>>>>> would
> >>>>>>>>>> allow
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> behaviour, but cap the size based on
> >>> time.
> >>>>> This
> >>>>>>>>> should
> >>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> late-arriving events to be processed, and
> >>>>>> should
> >>>>>>> be
> >>>>>>>>>>>>>>>>> customizable
> >>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> user to tailor to their own needs (ie:
> >>>>> perhaps
> >>>>>>> just
> >>>>>>>>> 10
> >>>>>>>>>>>>>>>>> minutes
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> window,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> or perhaps 7 days...).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Adam, using time based retention can
> >>> do
> >>>>> the
> >>>>>>>> trick
> >>>>>>>>>>>> here.
> >>>>>>>>>>>>>>>>> Even
> >>>>>>>>>>>>>>>>>>>>>> if I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> would still like to see the automatic
> >>>>>>> repartitioning
> >>>>>>>>>>>>>>> optional
> >>>>>>>>>>>>>>>>>>>>>> since I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> just reshuffle again. With windowed store I
> >>>>> am a
> >>>>>>>> little
> >>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>>>> sceptical
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> how to determine the window. So esentially
> >>> one
> >>>>>>> could
> >>>>>>>>> run
> >>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>> problems
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the rapid change happens near a window
> >>>>> border. I
> >>>>>>> will
> >>>>>>>>>> check
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation in detail, if its
> >>>>> problematic, we
> >>>>>>>> could
> >>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>> check
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> _all_
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> windows on read with not to bad
> >>> performance
> >>>>>>> impact I
> >>>>>>>>>>>> guess.
> >>>>>>>>>>>>>>>>> Will
> >>>>>>>>>>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> know if the implementation would be
> >>> correct
> >>>>> as
> >>>>>>> is. I
> >>>>>>>>>>>>>>> wouldn't
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> assume that: offset(A) < offset(B) =>
> >>>>>>> timestamp(A)  <
> >>>>>>>>>>>>>>>>>>> timestamp(B).
> >>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> we can't expect that.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I believe I understand what you mean now
> >>> -
> >>>>>> thanks
> >>>>>>>> for
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> diagram, it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> did really help. You are correct that I
> >>> do
> >>>>> not
> >>>>>>> have
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> key available, and I can see that if it was
> >>>>>>> available
> >>>>>>>>>> then
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>> would be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> able to add and remove events from the
> >>> Map.
> >>>>>> That
> >>>>>>>>> being
> >>>>>>>>>>>>>>> said,
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> encourage
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> you to finish your diagrams / charts just
> >>> for
> >>>>>>> clarity
> >>>>>>>>> for
> >>>>>>>>>>>>>>>>> everyone
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> else.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yeah 100%, this giphy thing is just
> >>> really
> >>>>> hard
> >>>>>>>> work.
> >>>>>>>>>> But
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> the benefits for the rest. Sorry about the
> >>>>>>> original
> >>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>> key,
> >>>>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> join and Group by implemented our own in
> >>> PAPI
> >>>>>> and
> >>>>>>>>>>>> basically
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> DSL (Just the abstraction). Completely
> >>> missed
> >>>>>> that
> >>>>>>> in
> >>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>> DSL
> >>>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> there and just assumed it. total brain mess
> >>>>> up on
> >>>>>>> my
> >>>>>>>>> end.
> >>>>>>>>>>>>>>> Will
> >>>>>>>>>>>>>>>>>>>>>> finish
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> chart as soon as i get a quite evening this
> >>>>> week.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> My follow up question for you is, won't
> >>> the
> >>>>> Map
> >>>>>>> stay
> >>>>>>>>>>>> inside
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> State
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Store indefinitely after all of the
> >>> changes
> >>>>>> have
> >>>>>>>>>>>>>>> propagated?
> >>>>>>>>>>>>>>>>>>> Isn't
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> effectively the same as a highwater mark
> >>>>> state
> >>>>>>>> store?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thing is that if the map is empty,
> >>>>> substractor
> >>>>>> is
> >>>>>>>>> gonna
> >>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>>>>>> `null`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the key is removed from the keyspace. But
> >>>>> there
> >>>>>> is
> >>>>>>>>> going
> >>>>>>>>>> to
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> 100%, the good thing is that I can use
> >>> this
> >>>>>> store
> >>>>>>>>>> directly
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> materialize() / enableSendingOldValues()
> >>> is a
> >>>>>>>> regular
> >>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>> satisfying
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> all gurantees needed for further groupby /
> >>>>> join.
> >>>>>>> The
> >>>>>>>>>>>>>>> Windowed
> >>>>>>>>>>>>>>>>>>>>>> store is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> keeping the values, so for the next
> >>> statefull
> >>>>>>>> operation
> >>>>>>>>>> we
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> need to instantiate an extra store. or we
> >>>>> have
> >>>>>> the
> >>>>>>>>>> window
> >>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the values then.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Long story short. if we can flip in a
> >>> custom
> >>>>>> group
> >>>>>>>> by
> >>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> repartitioning to the original primary
> >>> key i
> >>>>>> think
> >>>>>>>> it
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> big time in building efficient apps. Given
> >>> the
> >>>>>>>> original
> >>>>>>>>>>>>>>> primary
> >>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> issue I
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> understand that we do not have a solid
> >>>>> foundation
> >>>>>>> to
> >>>>>>>>>> build
> >>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Leaving primary key carry along to the
> >>> user.
> >>>>>> very
> >>>>>>>>>>>>>>>>> unfortunate. I
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> understand the decision goes like that. I
> >>> do
> >>>>> not
> >>>>>>>> think
> >>>>>>>>>> its
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> decision.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Sep 11, 2018 at 10:07 AM,
> >>> Prajakta
> >>>>>>> Dumbre <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com <mailto:
> >>>>>>>>>>>>>>>>> dumbreprajakta...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            please remove me from this
> >>> group
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            On Tue, Sep 11, 2018 at 1:29 PM
> >>>>> Jan
> >>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <jan.filip...@trivago.com
> >>>>> <mailto:
> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > Hi Adam,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > give me some time, will make
> >>>>> such a
> >>>>>>>>> chart.
> >>>>>>>>>>>> last
> >>>>>>>>>>>>>>>>> time i
> >>>>>>>>>>>>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            get along
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > well with giphy and ruined
> >>> all
> >>>>> your
> >>>>>>>>> charts.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > Hopefully i can get it done
> >>>>> today
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > On 08.09.2018 16:00, Adam
> >>>>> Bellemare
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > Hi Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > I have included a diagram
> >>> of
> >>>>>> what I
> >>>>>>>>>>>> attempted
> >>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> inKTable-GroupBy+Reduce/Aggregate
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > I attempted this back at
> >>> the
> >>>>>> start
> >>>>>>> of
> >>>>>>>>> my
> >>>>>>>>>> own
> >>>>>>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > solution, and since I could
> >>>>> not
> >>>>>> get
> >>>>>>>> it
> >>>>>>>>> to
> >>>>>>>>>>>>>>> work I
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            discarded the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > code. At this point in
> >>> time,
> >>>>> if
> >>>>>> you
> >>>>>>>>> wish
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> continue
> >>>>>>>>>>>>>>>>>>>>>> pursuing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            for your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > groupBy solution, I ask
> >>> that
> >>>>> you
> >>>>>>>> please
> >>>>>>>>>>>>>>> create a
> >>>>>>>>>>>>>>>>>>>>>> diagram on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the KIP
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > carefully explaining your
> >>>>>> solution.
> >>>>>>>>>> Please
> >>>>>>>>>>>>>>> feel
> >>>>>>>>>>>>>>>>> free
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the image I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > just posted as a starting
> >>>>> point.
> >>>>>> I
> >>>>>>> am
> >>>>>>>>>> having
> >>>>>>>>>>>>>>>>> trouble
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            understanding your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > explanations but I think
> >>> that
> >>>>> a
> >>>>>>>>> carefully
> >>>>>>>>>>>>>>>>> constructed
> >>>>>>>>>>>>>>>>>>>>>> diagram
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            will clear
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > up
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > any misunderstandings.
> >>>>>> Alternately,
> >>>>>>>>>> please
> >>>>>>>>>>>>>>> post a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            comprehensive PR with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > your solution. I can only
> >>>>> guess
> >>>>>> at
> >>>>>>>> what
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>>> mean, and
> >>>>>>>>>>>>>>>>>>>>>> since I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            value my
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > own
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > time as much as you value
> >>>>> yours,
> >>>>>> I
> >>>>>>>>>> believe
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            responsibility to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > provide an implementation
> >>>>> instead
> >>>>>>> of
> >>>>>>>> me
> >>>>>>>>>>>>>>> trying to
> >>>>>>>>>>>>>>>>>>> guess.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > On Sat, Sep 8, 2018 at 8:00
> >>>>> AM,
> >>>>>> Jan
> >>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <jan.filip...@trivago.com
> >>>>> <mailto:
> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > > wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> Hi James,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> nice to see you beeing
> >>>>>> interested.
> >>>>>>>>> kafka
> >>>>>>>>>>>>>>>>> streams at
> >>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            point supports
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> all sorts of joins as
> >>> long as
> >>>>>> both
> >>>>>>>>>> streams
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> Adam is currently
> >>>>> implementing a
> >>>>>>>> join
> >>>>>>>>>>>> where a
> >>>>>>>>>>>>>>>>> KTable
> >>>>>>>>>>>>>>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            KTable can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> a one to many relation
> >>> ship
> >>>>>> (1:n).
> >>>>>>>> We
> >>>>>>>>>>>> exploit
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> rocksdb
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> datastore that keeps data
> >>>>> sorted
> >>>>>> (At
> >>>>>>>>> least
> >>>>>>>>>>>>>>>>> exposes an
> >>>>>>>>>>>>>>>>>>>>>> API to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            access the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> stored data in a sorted
> >>>>>> fashion).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> I think the technical
> >>> caveats
> >>>>>> are
> >>>>>>>> well
> >>>>>>>>>>>>>>>>> understood
> >>>>>>>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > basically
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> down to philosophy and API
> >>>>>> Design
> >>>>>>> (
> >>>>>>>>> when
> >>>>>>>>>>>> Adam
> >>>>>>>>>>>>>>>>> sees
> >>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>> newest
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            message).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> I have a lengthy track
> >>>>> record of
> >>>>>>>>> loosing
> >>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>> kinda
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            arguments within
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> streams community and I
> >>> have
> >>>>> no
> >>>>>>> clue
> >>>>>>>>>> why.
> >>>>>>>>>>>> So
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> literally
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            can't wait for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> to churn through this
> >>> thread
> >>>>> and
> >>>>>>>> give
> >>>>>>>>>> you
> >>>>>>>>>>>>>>>>> opinion on
> >>>>>>>>>>>>>>>>>>>>>> how we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            should
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > design
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> the return type of the
> >>>>>>> oneToManyJoin
> >>>>>>>>> and
> >>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>>>> power we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            want to give
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> the user vs "simplicity"
> >>>>> (where
> >>>>>>>>>> simplicity
> >>>>>>>>>>>>>>> isn't
> >>>>>>>>>>>>>>>>>>>>>> really that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            as users
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > still
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> need to understand it I
> >>>>> argue)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> waiting for you to join
> >>> in on
> >>>>>> the
> >>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> Best Jan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >> On 07.09.2018 15:49, James
> >>>>> Kwan
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>> I am new to this group
> >>> and I
> >>>>>>> found
> >>>>>>>>> this
> >>>>>>>>>>>>>>> subject
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            interesting.  Sounds
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>> you guys want to
> >>> implement a
> >>>>>> join
> >>>>>>>>>> table of
> >>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>>>>>> streams? Is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > somewhere
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>> I can see the original
> >>>>>>> requirement
> >>>>>>>> or
> >>>>>>>>>>>>>>> proposal?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>> On Sep 7, 2018, at 8:13
> >>> AM,
> >>>>> Jan
> >>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <jan.filip...@trivago.com
> >>>>> <mailto:
> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> On 05.09.2018 22:17,
> >>> Adam
> >>>>>>>> Bellemare
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> I'm currently testing
> >>>>> using a
> >>>>>>>>>> Windowed
> >>>>>>>>>>>>>>> Store
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> store the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            highwater
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> mark.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> By all indications this
> >>>>>> should
> >>>>>>>> work
> >>>>>>>>>>>> fine,
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> caveat
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            being that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> only resolve
> >>> out-of-order
> >>>>>>> arrival
> >>>>>>>>>> for up
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> size of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the window
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > (ie:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> 24h, 72h, etc). This
> >>> would
> >>>>>>> remove
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> possibility
> >>>>>>>>>>>>>>>>>>>>>> of it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > unbounded
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> size.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> With regards to Jan's
> >>>>>>>> suggestion, I
> >>>>>>>>>>>>>>> believe
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            we will
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> remain in disagreement.
> >>>>>> While I
> >>>>>>>> do
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>>> disagree
> >>>>>>>>>>>>>>>>>>>>>> with your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            statement
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> there likely to be
> >>>>> additional
> >>>>>>>> joins
> >>>>>>>>>> done
> >>>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>>> real-world
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            workflow, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > do
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> see how you can
> >>>>> conclusively
> >>>>>>> deal
> >>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> arrival
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> foreign-key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> changes and subsequent
> >>>>>> joins. I
> >>>>>>>>> have
> >>>>>>>>>>>>>>>>> attempted
> >>>>>>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            think you have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> proposed (without a
> >>>>>> high-water,
> >>>>>>>>> using
> >>>>>>>>>>>>>>>>> groupBy and
> >>>>>>>>>>>>>>>>>>>>>> reduce)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            and found
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> that if
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> the foreign key changes
> >>>>> too
> >>>>>>>>> quickly,
> >>>>>>>>>> or
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> load
> >>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            stream thread
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> too
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> high, the joined
> >>> messages
> >>>>>> will
> >>>>>>>>> arrive
> >>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>> and be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            incorrectly
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> propagated, such that
> >>> an
> >>>>>>>>> intermediate
> >>>>>>>>>>>>>>> event
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> represented
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            as the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > final
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> event.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> Can you shed some light
> >>> on
> >>>>>> your
> >>>>>>>>>> groupBy
> >>>>>>>>>>>>>>>>>>>>>> implementation.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            There must be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> some sort of flaw in it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> I have a suspicion
> >>> where it
> >>>>>> is,
> >>>>>>> I
> >>>>>>>>>> would
> >>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            confirm. The idea
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> is bullet proof and it
> >>>>> must be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> an implementation mess
> >>> up.
> >>>>> I
> >>>>>>> would
> >>>>>>>>>> like
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> clarify
> >>>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            we draw a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> conclusion.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>    Repartitioning the
> >>>>>> scattered
> >>>>>>>>> events
> >>>>>>>>>>>>>>> back to
> >>>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> partitions is the only
> >>> way I
> >>>>>> know
> >>>>>>>> how
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> conclusively
> >>>>>>>>>>>>>>>>>>>>>> deal
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> out-of-order events in
> >>> a
> >>>>>> given
> >>>>>>>> time
> >>>>>>>>>>>> frame,
> >>>>>>>>>>>>>>>>> and to
> >>>>>>>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            that the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > data
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> eventually consistent
> >>> with
> >>>>>> the
> >>>>>>>>> input
> >>>>>>>>>>>>>>> events.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> If you have some code
> >>> to
> >>>>>> share
> >>>>>>>> that
> >>>>>>>>>>>>>>>>> illustrates
> >>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            approach, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> very grateful as it
> >>> would
> >>>>>>> remove
> >>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>>>> misunderstandings
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            that I may
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > have.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> ah okay you were looking
> >>>>> for
> >>>>>> my
> >>>>>>>>> code.
> >>>>>>>>>> I
> >>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            something easily
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> readable here as its
> >>>>> bloated
> >>>>>>> with
> >>>>>>>>>>>>>>> OO-patterns.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> its anyhow trivial:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> @Override
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>      public T apply(K
> >>>>> aggKey,
> >>>>>> V
> >>>>>>>>>> value, T
> >>>>>>>>>>>>>>>>>>> aggregate)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>      {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          Map<U, V>
> >>>>>>>>> currentStateAsMap =
> >>>>>>>>>>>>>>>>>>>>>> asMap(aggregate);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <<
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            imaginary
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          U toModifyKey =
> >>>>>>>>>>>>>>> mapper.apply(value);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              << this is
> >>> the
> >>>>>>> place
> >>>>>>>>>> where
> >>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            gonna have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > issues
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> and why you probably
> >>>>> couldn't
> >>>>>> do
> >>>>>>>> it.
> >>>>>>>>>> we
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>> to find
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            a solution
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> I didn't realize that
> >>> yet.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              << we
> >>>>> propagate
> >>>>>> the
> >>>>>>>>>> field in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> joiner, so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            that we can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > pick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> it up in an aggregate.
> >>>>>> Probably
> >>>>>>>> you
> >>>>>>>>>> have
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>> thought
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            this in your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> approach right?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              << I am
> >>> very
> >>>>> open
> >>>>>>> to
> >>>>>>>>>> find a
> >>>>>>>>>>>>>>>>> generic
> >>>>>>>>>>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            here. In my
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> honest opinion this is
> >>>>> broken
> >>>>>> in
> >>>>>>>>>>>>>>>>>>> KTableImpl.GroupBy
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            looses
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > the keys
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> and only maintains the
> >>>>>> aggregate
> >>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              << I
> >>>>> abstracted
> >>>>>> it
> >>>>>>>> away
> >>>>>>>>>> back
> >>>>>>>>>>>>>>>>> then way
> >>>>>>>>>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> i
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > thinking
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> of oneToMany join. That
> >>> is
> >>>>>> why I
> >>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>> realize
> >>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            significance here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              <<
> >>> Opinions?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          for (V m :
> >>>>> current)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>> currentStateAsMap.put(mapper.apply(m),
> >>>>>>>>>>>> m);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          if (isAdder)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>> currentStateAsMap.put(toModifyKey,
> >>>>>>>>>>>> value);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          else
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>> currentStateAsMap.remove(toModifyKey);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>> if(currentStateAsMap.isEmpty()){
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>                  return
> >>>>> null;
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>              }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>          retrun
> >>>>>>>>>>>>>>>>> asAggregateType(currentStateAsMap)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>      }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> On Wed, Sep 5, 2018 at
> >>>>> 3:35
> >>>>>> PM,
> >>>>>>>> Jan
> >>>>>>>>>>>>>>> Filipiak
> >>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > jan.filip...@trivago.com
> >>>>> <mailto:
> >>>>>>>>>>>>>>>>>>> jan.filip...@trivago.com
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>> Thanks Adam for
> >>> bringing
> >>>>>>> Matthias
> >>>>>>>>> to
> >>>>>>>>>>>>>>> speed!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> about the
> >>> differences. I
> >>>>>> think
> >>>>>>>>>>>> re-keying
> >>>>>>>>>>>>>>>>> back
> >>>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            optional at
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> best.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> I would say we return
> >>> a
> >>>>>>>>>> KScatteredTable
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> reshuffle()
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            returning
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>> KTable<originalKey,Joined>
> >>>>>> to
> >>>>>>>> make
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> backwards
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            repartitioning
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> optional.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> I am also in a big
> >>>>> favour of
> >>>>>>>> doing
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            processing using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> group
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> by instead high water
> >>>>> mark
> >>>>>>>>> tracking.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> Just because unbounded
> >>>>>> growth
> >>>>>>> is
> >>>>>>>>>> just
> >>>>>>>>>>>>>>> scary
> >>>>>>>>>>>>>>>>> + It
> >>>>>>>>>>>>>>>>>>>>>> saves
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            the header
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> stuff.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> I think the
> >>> abstraction
> >>>>> of
> >>>>>>>> always
> >>>>>>>>>>>>>>>>> repartitioning
> >>>>>>>>>>>>>>>>>>>>>> back is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            just not so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> strong. Like the work
> >>> has
> >>>>>> been
> >>>>>>>>> done
> >>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            back and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> grouping
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> by something else
> >>>>> afterwards
> >>>>>>> is
> >>>>>>>>>> really
> >>>>>>>>>>>>>>>>> common.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> On 05.09.2018 13:49,
> >>> Adam
> >>>>>>>>> Bellemare
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>> Hi Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Thank you for your
> >>>>>> feedback,
> >>>>>>> I
> >>>>>>>> do
> >>>>>>>>>>>>>>>>> appreciate
> >>>>>>>>>>>>>>>>>>> it!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> While name spacing
> >>>>> would be
> >>>>>>>>>> possible,
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > deserialize
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> user headers what
> >>>>> implies
> >>>>>> a
> >>>>>>>>>> runtime
> >>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            suggest to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > no
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> namespace for now to
> >>>>> avoid
> >>>>>>> the
> >>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > problem in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> the future, we can
> >>>>> still
> >>>>>> add
> >>>>>>>>> name
> >>>>>>>>>>>>>>> spacing
> >>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Agreed. I will go
> >>> with
> >>>>>>> using a
> >>>>>>>>>>>> reserved
> >>>>>>>>>>>>>>>>> string
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            document it.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> My main concern about
> >>>>> the
> >>>>>>>> design
> >>>>>>>>> it
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> type of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            result KTable:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > If
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> understood the
> >>> proposal
> >>>>>>>>> correctly,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> In your example, you
> >>>>> have
> >>>>>>>> table1
> >>>>>>>>>> and
> >>>>>>>>>>>>>>> table2
> >>>>>>>>>>>>>>>>>>>>>> swapped.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            Here is how it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> works
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> currently:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> 1) table1 has the
> >>>>> records
> >>>>>>> that
> >>>>>>>>>> contain
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> foreign key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            within their
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> value.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> table1 input stream:
> >>>>>>>>>> <a,(fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> <b,(fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> <c,(fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> table2 input stream:
> >>>>> <A,X>,
> >>>>>>>> <B,Y>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> 2) A Value mapper is
> >>>>>> required
> >>>>>>>> to
> >>>>>>>>>>>> extract
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> foreign
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> table1 foreign key
> >>>>> mapper:
> >>>>>> (
> >>>>>>>>> value
> >>>>>>>>>> =>
> >>>>>>>>>>>>>>>>> value.fk
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <http://value.fk> )
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> The mapper is
> >>> applied to
> >>>>>> each
> >>>>>>>>>> element
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> table1,
> >>>>>>>>>>>>>>>>>>>>>> and a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            new combined
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> key is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> made:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> table1 mapped: <A-a,
> >>>>>>>>> (fk=A,bar=1)>,
> >>>>>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (fk=A,bar=2)>,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            <B-c,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> 3) The rekeyed events
> >>>>> are
> >>>>>>>>>>>> copartitioned
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> table2:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> a) Stream Thread with
> >>>>>>> Partition
> >>>>>>>>> 0:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> RepartitionedTable1:
> >>>>> <A-a,
> >>>>>>>>>>>>>>> (fk=A,bar=1)>,
> >>>>>>>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            (fk=A,bar=2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Table2: <A,X>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> b) Stream Thread with
> >>>>>>> Partition
> >>>>>>>>> 1:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> RepartitionedTable1:
> >>>>> <B-c,
> >>>>>>>>>>>> (fk=B,bar=3)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Table2: <B,Y>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> 4) From here, they
> >>> can
> >>>>> be
> >>>>>>>> joined
> >>>>>>>>>>>>>>> together
> >>>>>>>>>>>>>>>>>>> locally
> >>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            applying the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> joiner
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> function.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> At this point, Jan's
> >>>>> design
> >>>>>>> and
> >>>>>>>>> my
> >>>>>>>>>>>>>>> design
> >>>>>>>>>>>>>>>>>>>>>> deviate. My
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            design goes
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > on
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> repartition the data
> >>>>>>> post-join
> >>>>>>>>> and
> >>>>>>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            arrival of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> finally returning the
> >>>>> data
> >>>>>>>> keyed
> >>>>>>>>>> just
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> original key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            I do not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> expose
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> CombinedKey or any of
> >>>>> the
> >>>>>>>>> internals
> >>>>>>>>>>>>>>>>> outside of
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            joinOnForeignKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> function. This does
> >>> make
> >>>>>> for
> >>>>>>>>> larger
> >>>>>>>>>>>>>>>>> footprint,
> >>>>>>>>>>>>>>>>>>>>>> but it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            removes all
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> agency
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> for resolving
> >>>>> out-of-order
> >>>>>>>>> arrivals
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> handling
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            CombinedKeys from
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> user. I believe that
> >>>>> this
> >>>>>>> makes
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> function
> >>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            to use.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Let me know if this
> >>>>> helps
> >>>>>>>> resolve
> >>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>> questions,
> >>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            please feel
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> free to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> add anything else on
> >>>>> your
> >>>>>>> mind.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Thanks again,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Adam
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> On Tue, Sep 4, 2018
> >>> at
> >>>>> 8:36
> >>>>>>> PM,
> >>>>>>>>>>>>>>> Matthias J.
> >>>>>>>>>>>>>>>>>>> Sax <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>> matth...@confluent.io
> >>>>>>> <mailto:
> >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> I am just catching
> >>> up
> >>>>> on
> >>>>>>> this
> >>>>>>>>>>>> thread. I
> >>>>>>>>>>>>>>>>> did
> >>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            everything so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> far,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> but want to share
> >>>>> couple
> >>>>>> of
> >>>>>>>>>> initial
> >>>>>>>>>>>>>>>>> thoughts:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Headers: I think
> >>> there
> >>>>> is
> >>>>>> a
> >>>>>>>>>>>> fundamental
> >>>>>>>>>>>>>>>>>>>>>> difference
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            between header
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> usage
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> in this KIP and
> >>> KP-258.
> >>>>>> For
> >>>>>>>> 258,
> >>>>>>>>>> we
> >>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>> headers
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            changelog topic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> are owned by Kafka
> >>>>> Streams
> >>>>>>> and
> >>>>>>>>>> nobody
> >>>>>>>>>>>>>>>>> else is
> >>>>>>>>>>>>>>>>>>>>>> supposed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            to write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > into
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> them. In fact, no
> >>> user
> >>>>>>> header
> >>>>>>>>> are
> >>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            changelog topic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> thus, there are not
> >>>>>>> conflicts.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Nevertheless, I
> >>> don't
> >>>>> see
> >>>>>> a
> >>>>>>>> big
> >>>>>>>>>> issue
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            headers within
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Streams.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> As long as we
> >>> document
> >>>>> it,
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>> "reserved"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            header keys
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> users are not
> >>> allowed
> >>>>> to
> >>>>>> use
> >>>>>>>>> when
> >>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>>> data with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            Kafka
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > Streams.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> IMHO, this should be
> >>>>> ok.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> I think there is a
> >>> safe
> >>>>>> way
> >>>>>>> to
> >>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>>>> conflicts,
> >>>>>>>>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > headers
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> only needed in
> >>>>> internal
> >>>>>>>> topics
> >>>>>>>>> (I
> >>>>>>>>>>>>>>> think):
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> For internal and
> >>>>>> changelog
> >>>>>>>>>> topics,
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>> namespace
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            all headers:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> * user-defined
> >>> headers
> >>>>>> are
> >>>>>>>>>>>> namespaced
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> "external."
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> +
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            headerKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> * internal headers
> >>> are
> >>>>>>>>>> namespaced as
> >>>>>>>>>>>>>>>>>>>>>> "internal." +
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            headerKey
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> While name spacing
> >>>>> would
> >>>>>> be
> >>>>>>>>>>>> possible,
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> require
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> deserialize
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> user headers what
> >>>>> implies
> >>>>>> a
> >>>>>>>>>> runtime
> >>>>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            suggest to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > no
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> namespace for now to
> >>>>> avoid
> >>>>>>> the
> >>>>>>>>>>>>>>> overhead.
> >>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> becomes a
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > problem in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> the future, we can
> >>>>> still
> >>>>>> add
> >>>>>>>>> name
> >>>>>>>>>>>>>>> spacing
> >>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>> on.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> My main concern
> >>> about
> >>>>> the
> >>>>>>>> design
> >>>>>>>>>> it
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> type
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            result KTable:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> If I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> understood the
> >>> proposal
> >>>>>>>>> correctly,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> KTable<K1,V1>
> >>> table1 =
> >>>>> ...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> KTable<K2,V2>
> >>> table2 =
> >>>>> ...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> KTable<K1,V3>
> >>>>> joinedTable
> >>>>>> =
> >>>>>>>>>>>>>>>>>>>>>> table1.join(table2,...);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> implies that the
> >>>>>>> `joinedTable`
> >>>>>>>>> has
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>> as the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            left input
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> IMHO, this does not
> >>>>> work
> >>>>>>>> because
> >>>>>>>>>> if
> >>>>>>>>>>>>>>> table2
> >>>>>>>>>>>>>>>>>>>>>> contains
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            multiple rows
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> join with a record
> >>> in
> >>>>>> table1
> >>>>>>>>>> (what is
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> main
> >>>>>>>>>>>>>>>>>>>>>> purpose
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > foreign
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> join), the result
> >>> table
> >>>>>>> would
> >>>>>>>>> only
> >>>>>>>>>>>>>>>>> contain a
> >>>>>>>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            join result,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > but
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> multiple.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Example:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> table1 input stream:
> >>>>> <A,X>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> table2 input stream:
> >>>>>>>> <a,(A,1)>,
> >>>>>>>>>>>>>>> <b,(A,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> We use table2 value
> >>> a
> >>>>>>> foreign
> >>>>>>>>> key
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> table1
> >>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            "A" joins).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > If
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> result key is the
> >>> same
> >>>>> key
> >>>>>>> as
> >>>>>>>>> key
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>> table1,
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            implies that the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> result can either be
> >>>>> <A,
> >>>>>>>>>> join(X,1)>
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>> <A,
> >>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            but not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > both.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Because the share
> >>> the
> >>>>> same
> >>>>>>>> key,
> >>>>>>>>>>>>>>> whatever
> >>>>>>>>>>>>>>>>>>> result
> >>>>>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            we emit
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > later,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> overwrite the
> >>> previous
> >>>>>>> result.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> This is the reason
> >>> why
> >>>>> Jan
> >>>>>>>>>> originally
> >>>>>>>>>>>>>>>>> proposed
> >>>>>>>>>>>>>>>>>>>>>> to use
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > combination
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> both primary keys of
> >>>>> the
> >>>>>>> input
> >>>>>>>>>> tables
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            output table.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> makes the keys of
> >>> the
> >>>>>> output
> >>>>>>>>> table
> >>>>>>>>>>>>>>> unique
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            store both in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> output table:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Result would be
> >>> <A-a,
> >>>>>>>>> join(X,1)>,
> >>>>>>>>>>>> <A-b,
> >>>>>>>>>>>>>>>>>>>>>> join(X,2)>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Thoughts?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> On 9/4/18 1:36 PM,
> >>> Jan
> >>>>>>>> Filipiak
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>> Just on remark here.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> The high-watermark
> >>>>> could
> >>>>>> be
> >>>>>>>>>>>>>>> disregarded.
> >>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>> decision
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            about the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> forward
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> depends on the
> >>> size of
> >>>>>> the
> >>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>>> map.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> Only 1 element long
> >>>>> maps
> >>>>>>>> would
> >>>>>>>>> be
> >>>>>>>>>>>>>>>>> unpacked
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            forwarded. 0
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > element
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> maps
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> would be published
> >>> as
> >>>>>>> delete.
> >>>>>>>>> Any
> >>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>> count
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> of map entries is
> >>> in
> >>>>>>> "waiting
> >>>>>>>>> for
> >>>>>>>>>>>>>>> correct
> >>>>>>>>>>>>>>>>>>>>>> deletes to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > arrive"-state.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> On 04.09.2018
> >>> 21:29,
> >>>>> Adam
> >>>>>>>>>> Bellemare
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> It does look like I
> >>>>> could
> >>>>>>>>> replace
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            repartition store
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> highwater store
> >>> with
> >>>>> a
> >>>>>>>> groupBy
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>> reduce.
> >>>>>>>>>>>>>>>>>>>>>> However,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            it looks
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > like
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> still need to
> >>> store
> >>>>> the
> >>>>>>>>>> highwater
> >>>>>>>>>>>>>>> value
> >>>>>>>>>>>>>>>>>>> within
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            materialized
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> compare the
> >>> arrival of
> >>>>>>>>>> out-of-order
> >>>>>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> understanding
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> THIS is
> >>> correct...).
> >>>>> This
> >>>>>>> in
> >>>>>>>>>> effect
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            design I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>> just with the two
> >>>>> tables
> >>>>>>>> merged
> >>>>>>>>>>>>>>> together.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            > >>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>            >
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>

Reply via email to