@Guozhang

Thanks for the information. This is indeed something that will be extremely
useful for this KIP.

@Jan
Thanks for your explanations. That being said, I will not be moving ahead
with an implementation using reshuffle/groupBy solution as you propose.
That being said, if you wish to implement it yourself off of my current PR
and submit it as a competitive alternative, I would be more than happy to
help vet that as an alternate solution. As it stands right now, I do not
really have more time to invest into alternatives without there being a
strong indication from the binding voters which they would prefer.


I will look at finishing up my PR with the windowed state store in the next
week or so, exercising it via tests, and then I will come back for final
discussions. In the meantime, I hope that any of the binding voters could
take a look at the KIP in the wiki. I have updated it according to the
latest plan:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable

I have also updated the KIP PR to use a windowed store. This could be
replaced by the results of KIP-258 whenever they are completed.
https://github.com/apache/kafka/pull/5527

Thanks,

Adam



On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Correction on my previous email: KAFKA-5533 is the wrong link, as it is for
> corresponding changelog mechanisms. But as part of KIP-258 we do want to
> have "handling out-of-order data for source KTable" such that instead of
> blindly apply the updates to the materialized store, i.e. following offset
> ordering, we will reject updates that are older than the current key's
> timestamps, i.e. following timestamp ordering.
>
>
> Guozhang
>
> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hello Adam,
> >
> > Thanks for the explanation. Regarding the final step (i.e. the high
> > watermark store, now altered to be replaced with a window store), I think
> > another current on-going KIP may actually help:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >
> >
> > This is for adding the timestamp into a key-value store (i.e. only for
> > non-windowed KTable), and then one of its usage, as described in
> > https://issues.apache.org/jira/browse/KAFKA-5533, is that we can then
> > "reject" updates from the source topics if its timestamp is smaller than
> > the current key's latest update timestamp. I think it is very similar to
> > what you have in mind for high watermark based filtering, while you only
> > need to make sure that the timestamps of the joining records are
> correctly
> > inherited though the whole topology to the final stage.
> >
> > Note that this KIP is for key-value store and hence non-windowed KTables
> > only, but for windowed KTables we do not really have a good support for
> > their joins anyways (https://issues.apache.org/jira/browse/KAFKA-7107) I
> > think we can just consider non-windowed KTable-KTable non-key joins for
> > now. In which case, KIP-258 should help.
> >
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <jan.filip...@trivago.com>
> > wrote:
> >
> >>
> >> On 11.09.2018 18:00, Adam Bellemare wrote:
> >>
> >>> Hi Guozhang
> >>>
> >>> Current highwater mark implementation would grow endlessly based on
> >>> primary key of original event. It is a pair of (<this table primary
> key>,
> >>> <highest offset seen for that key>). This is used to differentiate
> between
> >>> late arrivals and new updates. My newest proposal would be to replace
> it
> >>> with a Windowed state store of Duration N. This would allow the same
> >>> behaviour, but cap the size based on time. This should allow for all
> >>> late-arriving events to be processed, and should be customizable by the
> >>> user to tailor to their own needs (ie: perhaps just 10 minutes of
> window,
> >>> or perhaps 7 days...).
> >>>
> >> Hi Adam, using time based retention can do the trick here. Even if I
> >> would still like to see the automatic repartitioning optional since I
> would
> >> just reshuffle again. With windowed store I am a little bit sceptical
> about
> >> how to determine the window. So esentially one could run into problems
> when
> >> the rapid change happens near a window border. I will check you
> >> implementation in detail, if its problematic, we could still check _all_
> >> windows on read with not to bad performance impact I guess. Will let you
> >> know if the implementation would be correct as is. I wouldn't not like
> to
> >> assume that: offset(A) < offset(B) => timestamp(A)  < timestamp(B). I
> think
> >> we can't expect that.
> >>
> >>>
> >>>
> >>> @Jan
> >>> I believe I understand what you mean now - thanks for the diagram, it
> >>> did really help. You are correct that I do not have the original
> primary
> >>> key available, and I can see that if it was available then you would be
> >>> able to add and remove events from the Map. That being said, I
> encourage
> >>> you to finish your diagrams / charts just for clarity for everyone
> else.
> >>>
> >>> Yeah 100%, this giphy thing is just really hard work. But I understand
> >> the benefits for the rest. Sorry about the original primary key, We have
> >> join and Group by implemented our own in PAPI and basically not using
> any
> >> DSL (Just the abstraction). Completely missed that in original DSL its
> not
> >> there and just assumed it. total brain mess up on my end. Will finish
> the
> >> chart as soon as i get a quite evening this week.
> >>
> >> My follow up question for you is, won't the Map stay inside the State
> >>> Store indefinitely after all of the changes have propagated? Isn't this
> >>> effectively the same as a highwater mark state store?
> >>>
> >> Thing is that if the map is empty, substractor is gonna return `null`
> and
> >> the key is removed from the keyspace. But there is going to be a store
> >> 100%, the good thing is that I can use this store directly for
> >> materialize() / enableSendingOldValues() is a regular store, satisfying
> >> all gurantees needed for further groupby / join. The Windowed store is
> not
> >> keeping the values, so for the next statefull operation we would
> >> need to instantiate an extra store. or we have the window store also
> have
> >> the values then.
> >>
> >> Long story short. if we can flip in a custom group by before
> >> repartitioning to the original primary key i think it would help the
> users
> >> big time in building efficient apps. Given the original primary key
> issue I
> >> understand that we do not have a solid foundation to build on.
> >> Leaving primary key carry along to the user. very unfortunate. I could
> >> understand the decision goes like that. I do not think its a good
> decision.
> >>
> >>
> >>>
> >>>
> >>>
> >>> Thanks
> >>> Adam
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
> >>> dumbreprajakta...@gmail.com <mailto:dumbreprajakta...@gmail.com>>
> wrote:
> >>>
> >>>     please remove me from this group
> >>>
> >>>     On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
> >>>     <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
> >>>
> >>>     wrote:
> >>>
> >>>     > Hi Adam,
> >>>     >
> >>>     > give me some time, will make such a chart. last time i didn't
> >>>     get along
> >>>     > well with giphy and ruined all your charts.
> >>>     > Hopefully i can get it done today
> >>>     >
> >>>     > On 08.09.2018 16:00, Adam Bellemare wrote:
> >>>     > > Hi Jan
> >>>     > >
> >>>     > > I have included a diagram of what I attempted on the KIP.
> >>>     > >
> >>>     >
> >>>     https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
> >>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
> >>> inKTable-GroupBy+Reduce/Aggregate
> >>>     <https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
> >>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
> >>> ginKTable-GroupBy+Reduce/Aggregate>
> >>>     > >
> >>>     > > I attempted this back at the start of my own implementation of
> >>>     this
> >>>     > > solution, and since I could not get it to work I have since
> >>>     discarded the
> >>>     > > code. At this point in time, if you wish to continue pursuing
> >>>     for your
> >>>     > > groupBy solution, I ask that you please create a diagram on
> >>>     the KIP
> >>>     > > carefully explaining your solution. Please feel free to use
> >>>     the image I
> >>>     > > just posted as a starting point. I am having trouble
> >>>     understanding your
> >>>     > > explanations but I think that a carefully constructed diagram
> >>>     will clear
> >>>     > up
> >>>     > > any misunderstandings. Alternately, please post a
> >>>     comprehensive PR with
> >>>     > > your solution. I can only guess at what you mean, and since I
> >>>     value my
> >>>     > own
> >>>     > > time as much as you value yours, I believe it is your
> >>>     responsibility to
> >>>     > > provide an implementation instead of me trying to guess.
> >>>     > >
> >>>     > > Adam
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > >
> >>>     > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
> >>>     <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
> >>>
> >>>     > > wrote:
> >>>     > >
> >>>     > >> Hi James,
> >>>     > >>
> >>>     > >> nice to see you beeing interested. kafka streams at this
> >>>     point supports
> >>>     > >> all sorts of joins as long as both streams have the same key.
> >>>     > >> Adam is currently implementing a join where a KTable and a
> >>>     KTable can
> >>>     > have
> >>>     > >> a one to many relation ship (1:n). We exploit that rocksdb is
> a
> >>>     > >> datastore that keeps data sorted (At least exposes an API to
> >>>     access the
> >>>     > >> stored data in a sorted fashion).
> >>>     > >>
> >>>     > >> I think the technical caveats are well understood now and we
> are
> >>>     > basically
> >>>     > >> down to philosophy and API Design ( when Adam sees my newest
> >>>     message).
> >>>     > >> I have a lengthy track record of loosing those kinda
> >>>     arguments within
> >>>     > the
> >>>     > >> streams community and I have no clue why. So I literally
> >>>     can't wait for
> >>>     > you
> >>>     > >> to churn through this thread and give you opinion on how we
> >>>     should
> >>>     > design
> >>>     > >> the return type of the oneToManyJoin and how many power we
> >>>     want to give
> >>>     > to
> >>>     > >> the user vs "simplicity" (where simplicity isn't really that
> >>>     as users
> >>>     > still
> >>>     > >> need to understand it I argue)
> >>>     > >>
> >>>     > >> waiting for you to join in on the discussion
> >>>     > >>
> >>>     > >> Best Jan
> >>>     > >>
> >>>     > >>
> >>>     > >>
> >>>     > >> On 07.09.2018 15:49, James Kwan wrote:
> >>>     > >>
> >>>     > >>> I am new to this group and I found this subject
> >>>     interesting.  Sounds
> >>>     > like
> >>>     > >>> you guys want to implement a join table of two streams? Is
> >>> there
> >>>     > somewhere
> >>>     > >>> I can see the original requirement or proposal?
> >>>     > >>>
> >>>     > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
> >>>     <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
> >>>
> >>>     > >>>> wrote:
> >>>     > >>>>
> >>>     > >>>>
> >>>     > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
> >>>     > >>>>
> >>>     > >>>>> I'm currently testing using a Windowed Store to store the
> >>>     highwater
> >>>     > >>>>> mark.
> >>>     > >>>>> By all indications this should work fine, with the caveat
> >>>     being that
> >>>     > it
> >>>     > >>>>> can
> >>>     > >>>>> only resolve out-of-order arrival for up to the size of
> >>>     the window
> >>>     > (ie:
> >>>     > >>>>> 24h, 72h, etc). This would remove the possibility of it
> being
> >>>     > unbounded
> >>>     > >>>>> in
> >>>     > >>>>> size.
> >>>     > >>>>>
> >>>     > >>>>> With regards to Jan's suggestion, I believe this is where
> >>>     we will
> >>>     > have
> >>>     > >>>>> to
> >>>     > >>>>> remain in disagreement. While I do not disagree with your
> >>>     statement
> >>>     > >>>>> about
> >>>     > >>>>> there likely to be additional joins done in a real-world
> >>>     workflow, I
> >>>     > do
> >>>     > >>>>> not
> >>>     > >>>>> see how you can conclusively deal with out-of-order arrival
> >>> of
> >>>     > >>>>> foreign-key
> >>>     > >>>>> changes and subsequent joins. I have attempted what I
> >>>     think you have
> >>>     > >>>>> proposed (without a high-water, using groupBy and reduce)
> >>>     and found
> >>>     > >>>>> that if
> >>>     > >>>>> the foreign key changes too quickly, or the load on a
> >>>     stream thread
> >>>     > is
> >>>     > >>>>> too
> >>>     > >>>>> high, the joined messages will arrive out-of-order and be
> >>>     incorrectly
> >>>     > >>>>> propagated, such that an intermediate event is represented
> >>>     as the
> >>>     > final
> >>>     > >>>>> event.
> >>>     > >>>>>
> >>>     > >>>> Can you shed some light on your groupBy implementation.
> >>>     There must be
> >>>     > >>>> some sort of flaw in it.
> >>>     > >>>> I have a suspicion where it is, I would just like to
> >>>     confirm. The idea
> >>>     > >>>> is bullet proof and it must be
> >>>     > >>>> an implementation mess up. I would like to clarify before
> >>>     we draw a
> >>>     > >>>> conclusion.
> >>>     > >>>>
> >>>     > >>>>    Repartitioning the scattered events back to their
> original
> >>>     > >>>>> partitions is the only way I know how to conclusively deal
> >>>     with
> >>>     > >>>>> out-of-order events in a given time frame, and to ensure
> >>>     that the
> >>>     > data
> >>>     > >>>>> is
> >>>     > >>>>> eventually consistent with the input events.
> >>>     > >>>>>
> >>>     > >>>>> If you have some code to share that illustrates your
> >>>     approach, I
> >>>     > would
> >>>     > >>>>> be
> >>>     > >>>>> very grateful as it would remove any misunderstandings
> >>>     that I may
> >>>     > have.
> >>>     > >>>>>
> >>>     > >>>> ah okay you were looking for my code. I don't have
> >>>     something easily
> >>>     > >>>> readable here as its bloated with OO-patterns.
> >>>     > >>>>
> >>>     > >>>> its anyhow trivial:
> >>>     > >>>>
> >>>     > >>>> @Override
> >>>     > >>>>      public T apply(K aggKey, V value, T aggregate)
> >>>     > >>>>      {
> >>>     > >>>>          Map<U, V> currentStateAsMap = asMap(aggregate); <<
> >>>     imaginary
> >>>     > >>>>          U toModifyKey = mapper.apply(value);
> >>>     > >>>>              << this is the place where people actually
> >>>     gonna have
> >>>     > issues
> >>>     > >>>> and why you probably couldn't do it. we would need to find
> >>>     a solution
> >>>     > here.
> >>>     > >>>> I didn't realize that yet.
> >>>     > >>>>              << we propagate the field in the joiner, so
> >>>     that we can
> >>>     > pick
> >>>     > >>>> it up in an aggregate. Probably you have not thought of
> >>>     this in your
> >>>     > >>>> approach right?
> >>>     > >>>>              << I am very open to find a generic solution
> >>>     here. In my
> >>>     > >>>> honest opinion this is broken in KTableImpl.GroupBy that it
> >>>     looses
> >>>     > the keys
> >>>     > >>>> and only maintains the aggregate key.
> >>>     > >>>>              << I abstracted it away back then way before i
> >>> was
> >>>     > thinking
> >>>     > >>>> of oneToMany join. That is why I didn't realize its
> >>>     significance here.
> >>>     > >>>>              << Opinions?
> >>>     > >>>>
> >>>     > >>>>          for (V m : current)
> >>>     > >>>>          {
> >>>     > >>>> currentStateAsMap.put(mapper.apply(m), m);
> >>>     > >>>>          }
> >>>     > >>>>          if (isAdder)
> >>>     > >>>>          {
> >>>     > >>>> currentStateAsMap.put(toModifyKey, value);
> >>>     > >>>>          }
> >>>     > >>>>          else
> >>>     > >>>>          {
> >>>     > >>>> currentStateAsMap.remove(toModifyKey);
> >>>     > >>>> if(currentStateAsMap.isEmpty()){
> >>>     > >>>>                  return null;
> >>>     > >>>>              }
> >>>     > >>>>          }
> >>>     > >>>>          retrun asAggregateType(currentStateAsMap)
> >>>     > >>>>      }
> >>>     > >>>>
> >>>     > >>>>
> >>>     > >>>>
> >>>     > >>>>
> >>>     > >>>>
> >>>     > >>>> Thanks,
> >>>     > >>>>> Adam
> >>>     > >>>>>
> >>>     > >>>>>
> >>>     > >>>>>
> >>>     > >>>>>
> >>>     > >>>>>
> >>>     > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
> >>>     > jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
> >>>
> >>>     > >>>>> wrote:
> >>>     > >>>>>
> >>>     > >>>>> Thanks Adam for bringing Matthias to speed!
> >>>     > >>>>>> about the differences. I think re-keying back should be
> >>>     optional at
> >>>     > >>>>>> best.
> >>>     > >>>>>> I would say we return a KScatteredTable with reshuffle()
> >>>     returning
> >>>     > >>>>>> KTable<originalKey,Joined> to make the backwards
> >>>     repartitioning
> >>>     > >>>>>> optional.
> >>>     > >>>>>> I am also in a big favour of doing the out of order
> >>>     processing using
> >>>     > >>>>>> group
> >>>     > >>>>>> by instead high water mark tracking.
> >>>     > >>>>>> Just because unbounded growth is just scary + It saves us
> >>>     the header
> >>>     > >>>>>> stuff.
> >>>     > >>>>>>
> >>>     > >>>>>> I think the abstraction of always repartitioning back is
> >>>     just not so
> >>>     > >>>>>> strong. Like the work has been done before we partition
> >>>     back and
> >>>     > >>>>>> grouping
> >>>     > >>>>>> by something else afterwards is really common.
> >>>     > >>>>>>
> >>>     > >>>>>>
> >>>     > >>>>>>
> >>>     > >>>>>>
> >>>     > >>>>>>
> >>>     > >>>>>>
> >>>     > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
> >>>     > >>>>>>
> >>>     > >>>>>> Hi Matthias
> >>>     > >>>>>>> Thank you for your feedback, I do appreciate it!
> >>>     > >>>>>>>
> >>>     > >>>>>>> While name spacing would be possible, it would require to
> >>>     > deserialize
> >>>     > >>>>>>>
> >>>     > >>>>>>>> user headers what implies a runtime overhead. I would
> >>>     suggest to
> >>>     > no
> >>>     > >>>>>>>> namespace for now to avoid the overhead. If this
> becomes a
> >>>     > problem in
> >>>     > >>>>>>>> the future, we can still add name spacing later on.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Agreed. I will go with using a reserved string and
> >>>     document it.
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>> My main concern about the design it the type of the
> >>>     result KTable:
> >>>     > If
> >>>     > >>>>>>> I
> >>>     > >>>>>>> understood the proposal correctly,
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>> In your example, you have table1 and table2 swapped.
> >>>     Here is how it
> >>>     > >>>>>>> works
> >>>     > >>>>>>> currently:
> >>>     > >>>>>>>
> >>>     > >>>>>>> 1) table1 has the records that contain the foreign key
> >>>     within their
> >>>     > >>>>>>> value.
> >>>     > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>,
> >>>     > >>>>>>> <c,(fk=B,bar=3)>
> >>>     > >>>>>>> table2 input stream: <A,X>, <B,Y>
> >>>     > >>>>>>>
> >>>     > >>>>>>> 2) A Value mapper is required to extract the foreign key.
> >>>     > >>>>>>> table1 foreign key mapper: ( value => value.fk
> >>>     <http://value.fk> )
> >>>
> >>>     > >>>>>>>
> >>>     > >>>>>>> The mapper is applied to each element in table1, and a
> >>>     new combined
> >>>     > >>>>>>> key is
> >>>     > >>>>>>> made:
> >>>     > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>,
> >>>     <B-c,
> >>>     > >>>>>>> (fk=B,bar=3)>
> >>>     > >>>>>>>
> >>>     > >>>>>>> 3) The rekeyed events are copartitioned with table2:
> >>>     > >>>>>>>
> >>>     > >>>>>>> a) Stream Thread with Partition 0:
> >>>     > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b,
> >>>     (fk=A,bar=2)>
> >>>     > >>>>>>> Table2: <A,X>
> >>>     > >>>>>>>
> >>>     > >>>>>>> b) Stream Thread with Partition 1:
> >>>     > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
> >>>     > >>>>>>> Table2: <B,Y>
> >>>     > >>>>>>>
> >>>     > >>>>>>> 4) From here, they can be joined together locally by
> >>>     applying the
> >>>     > >>>>>>> joiner
> >>>     > >>>>>>> function.
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>> At this point, Jan's design and my design deviate. My
> >>>     design goes
> >>>     > on
> >>>     > >>>>>>> to
> >>>     > >>>>>>> repartition the data post-join and resolve out-of-order
> >>>     arrival of
> >>>     > >>>>>>> records,
> >>>     > >>>>>>> finally returning the data keyed just the original key.
> >>>     I do not
> >>>     > >>>>>>> expose
> >>>     > >>>>>>> the
> >>>     > >>>>>>> CombinedKey or any of the internals outside of the
> >>>     joinOnForeignKey
> >>>     > >>>>>>> function. This does make for larger footprint, but it
> >>>     removes all
> >>>     > >>>>>>> agency
> >>>     > >>>>>>> for resolving out-of-order arrivals and handling
> >>>     CombinedKeys from
> >>>     > the
> >>>     > >>>>>>> user. I believe that this makes the function much easier
> >>>     to use.
> >>>     > >>>>>>>
> >>>     > >>>>>>> Let me know if this helps resolve your questions, and
> >>>     please feel
> >>>     > >>>>>>> free to
> >>>     > >>>>>>> add anything else on your mind.
> >>>     > >>>>>>>
> >>>     > >>>>>>> Thanks again,
> >>>     > >>>>>>> Adam
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>>
> >>>     > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
> >>>     > >>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>
> >>>
> >>>     > >>>>>>> wrote:
> >>>     > >>>>>>>
> >>>     > >>>>>>> Hi,
> >>>     > >>>>>>>
> >>>     > >>>>>>>> I am just catching up on this thread. I did not read
> >>>     everything so
> >>>     > >>>>>>>> far,
> >>>     > >>>>>>>> but want to share couple of initial thoughts:
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Headers: I think there is a fundamental difference
> >>>     between header
> >>>     > >>>>>>>> usage
> >>>     > >>>>>>>> in this KIP and KP-258. For 258, we add headers to
> >>>     changelog topic
> >>>     > >>>>>>>> that
> >>>     > >>>>>>>> are owned by Kafka Streams and nobody else is supposed
> >>>     to write
> >>>     > into
> >>>     > >>>>>>>> them. In fact, no user header are written into the
> >>>     changelog topic
> >>>     > >>>>>>>> and
> >>>     > >>>>>>>> thus, there are not conflicts.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Nevertheless, I don't see a big issue with using
> >>>     headers within
> >>>     > >>>>>>>> Streams.
> >>>     > >>>>>>>> As long as we document it, we can have some "reserved"
> >>>     header keys
> >>>     > >>>>>>>> and
> >>>     > >>>>>>>> users are not allowed to use when processing data with
> >>>     Kafka
> >>>     > Streams.
> >>>     > >>>>>>>> IMHO, this should be ok.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> I think there is a safe way to avoid conflicts, since
> >>> these
> >>>     > headers
> >>>     > >>>>>>>> are
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>> only needed in internal topics (I think):
> >>>     > >>>>>>>>> For internal and changelog topics, we can namespace
> >>>     all headers:
> >>>     > >>>>>>>>> * user-defined headers are namespaced as "external." +
> >>>     headerKey
> >>>     > >>>>>>>>> * internal headers are namespaced as "internal." +
> >>>     headerKey
> >>>     > >>>>>>>>>
> >>>     > >>>>>>>>> While name spacing would be possible, it would require
> to
> >>>     > >>>>>>>> deserialize
> >>>     > >>>>>>>> user headers what implies a runtime overhead. I would
> >>>     suggest to
> >>>     > no
> >>>     > >>>>>>>> namespace for now to avoid the overhead. If this
> becomes a
> >>>     > problem in
> >>>     > >>>>>>>> the future, we can still add name spacing later on.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> My main concern about the design it the type of the
> >>>     result KTable:
> >>>     > >>>>>>>> If I
> >>>     > >>>>>>>> understood the proposal correctly,
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> KTable<K1,V1> table1 = ...
> >>>     > >>>>>>>> KTable<K2,V2> table2 = ...
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> implies that the `joinedTable` has the same key as the
> >>>     left input
> >>>     > >>>>>>>> table.
> >>>     > >>>>>>>> IMHO, this does not work because if table2 contains
> >>>     multiple rows
> >>>     > >>>>>>>> that
> >>>     > >>>>>>>> join with a record in table1 (what is the main purpose
> of
> >>> a
> >>>     > foreign
> >>>     > >>>>>>>> key
> >>>     > >>>>>>>> join), the result table would only contain a single
> >>>     join result,
> >>>     > but
> >>>     > >>>>>>>> not
> >>>     > >>>>>>>> multiple.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Example:
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> table1 input stream: <A,X>
> >>>     > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> We use table2 value a foreign key to table1 key (ie,
> >>>     "A" joins).
> >>>     > If
> >>>     > >>>>>>>> the
> >>>     > >>>>>>>> result key is the same key as key of table1, this
> >>>     implies that the
> >>>     > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)>
> >>>     but not
> >>>     > both.
> >>>     > >>>>>>>> Because the share the same key, whatever result record
> >>>     we emit
> >>>     > later,
> >>>     > >>>>>>>> overwrite the previous result.
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> This is the reason why Jan originally proposed to use a
> >>>     > combination
> >>>     > >>>>>>>> of
> >>>     > >>>>>>>> both primary keys of the input tables as key of the
> >>>     output table.
> >>>     > >>>>>>>> This
> >>>     > >>>>>>>> makes the keys of the output table unique and we can
> >>>     store both in
> >>>     > >>>>>>>> the
> >>>     > >>>>>>>> output table:
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Thoughts?
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> -Matthias
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> >>>     > >>>>>>>>
> >>>     > >>>>>>>> Just on remark here.
> >>>     > >>>>>>>>> The high-watermark could be disregarded. The decision
> >>>     about the
> >>>     > >>>>>>>>> forward
> >>>     > >>>>>>>>> depends on the size of the aggregated map.
> >>>     > >>>>>>>>> Only 1 element long maps would be unpacked and
> >>>     forwarded. 0
> >>>     > element
> >>>     > >>>>>>>>> maps
> >>>     > >>>>>>>>> would be published as delete. Any other count
> >>>     > >>>>>>>>> of map entries is in "waiting for correct deletes to
> >>>     > arrive"-state.
> >>>     > >>>>>>>>>
> >>>     > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
> >>>     > >>>>>>>>>
> >>>     > >>>>>>>>> It does look like I could replace the second
> >>>     repartition store
> >>>     > and
> >>>     > >>>>>>>>>> highwater store with a groupBy and reduce.  However,
> >>>     it looks
> >>>     > like
> >>>     > >>>>>>>>>> I
> >>>     > >>>>>>>>>> would
> >>>     > >>>>>>>>>> still need to store the highwater value within the
> >>>     materialized
> >>>     > >>>>>>>>>> store,
> >>>     > >>>>>>>>>>
> >>>     > >>>>>>>>>> to
> >>>     > >>>>>>>>> compare the arrival of out-of-order records (assuming
> my
> >>>     > >>>>>>>>> understanding
> >>>     > >>>>>>>>> of
> >>>     > >>>>>>>>> THIS is correct...). This in effect is the same as the
> >>>     design I
> >>>     > have
> >>>     > >>>>>>>>> now,
> >>>     > >>>>>>>>> just with the two tables merged together.
> >>>     > >>>>>>>>>
> >>>     >
> >>>     >
> >>>
> >>>
> >>>
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to