Hi Jan

Check for  " highwaterMat " in the PR. I only changed the state store, not
the ProcessorSupplier.

Thanks,
Adam

On Mon, Sep 24, 2018 at 2:47 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

>
>
> On 24.09.2018 16:26, Adam Bellemare wrote:
>
>> @Guozhang
>>
>> Thanks for the information. This is indeed something that will be
>> extremely
>> useful for this KIP.
>>
>> @Jan
>> Thanks for your explanations. That being said, I will not be moving ahead
>> with an implementation using reshuffle/groupBy solution as you propose.
>> That being said, if you wish to implement it yourself off of my current PR
>> and submit it as a competitive alternative, I would be more than happy to
>> help vet that as an alternate solution. As it stands right now, I do not
>> really have more time to invest into alternatives without there being a
>> strong indication from the binding voters which they would prefer.
>>
>>
> Hey, total no worries. I think I personally gave up on the streams DSL for
> some time already, otherwise I would have pulled this KIP through already.
> I am currently reimplementing my own DSL based on PAPI.
>
>
>> I will look at finishing up my PR with the windowed state store in the
>> next
>> week or so, exercising it via tests, and then I will come back for final
>> discussions. In the meantime, I hope that any of the binding voters could
>> take a look at the KIP in the wiki. I have updated it according to the
>> latest plan:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> Support+non-key+joining+in+KTable
>>
>> I have also updated the KIP PR to use a windowed store. This could be
>> replaced by the results of KIP-258 whenever they are completed.
>> https://github.com/apache/kafka/pull/5527
>>
>> Thanks,
>>
>> Adam
>>
>
> Is the HighWatermarkResolverProccessorsupplier already updated in the PR?
> expected it to change to Windowed<K>,Long Missing something?
>
>
>
>>
>>
>> On Fri, Sep 14, 2018 at 2:24 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> Correction on my previous email: KAFKA-5533 is the wrong link, as it is
>>> for
>>> corresponding changelog mechanisms. But as part of KIP-258 we do want to
>>> have "handling out-of-order data for source KTable" such that instead of
>>> blindly apply the updates to the materialized store, i.e. following
>>> offset
>>> ordering, we will reject updates that are older than the current key's
>>> timestamps, i.e. following timestamp ordering.
>>>
>>>
>>> Guozhang
>>>
>>> On Fri, Sep 14, 2018 at 11:21 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>> Hello Adam,
>>>>
>>>> Thanks for the explanation. Regarding the final step (i.e. the high
>>>> watermark store, now altered to be replaced with a window store), I
>>>> think
>>>> another current on-going KIP may actually help:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>
>>>>
>>>> This is for adding the timestamp into a key-value store (i.e. only for
>>>> non-windowed KTable), and then one of its usage, as described in
>>>> https://issues.apache.org/jira/browse/KAFKA-5533, is that we can then
>>>> "reject" updates from the source topics if its timestamp is smaller than
>>>> the current key's latest update timestamp. I think it is very similar to
>>>> what you have in mind for high watermark based filtering, while you only
>>>> need to make sure that the timestamps of the joining records are
>>>>
>>> correctly
>>>
>>>> inherited though the whole topology to the final stage.
>>>>
>>>> Note that this KIP is for key-value store and hence non-windowed KTables
>>>> only, but for windowed KTables we do not really have a good support for
>>>> their joins anyways (https://issues.apache.org/jira/browse/KAFKA-7107)
>>>> I
>>>> think we can just consider non-windowed KTable-KTable non-key joins for
>>>> now. In which case, KIP-258 should help.
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>> On Wed, Sep 12, 2018 at 9:20 PM, Jan Filipiak <jan.filip...@trivago.com
>>>> >
>>>> wrote:
>>>>
>>>>
>>>>> On 11.09.2018 18:00, Adam Bellemare wrote:
>>>>>
>>>>> Hi Guozhang
>>>>>>
>>>>>> Current highwater mark implementation would grow endlessly based on
>>>>>> primary key of original event. It is a pair of (<this table primary
>>>>>>
>>>>> key>,
>>>
>>>> <highest offset seen for that key>). This is used to differentiate
>>>>>>
>>>>> between
>>>
>>>> late arrivals and new updates. My newest proposal would be to replace
>>>>>>
>>>>> it
>>>
>>>> with a Windowed state store of Duration N. This would allow the same
>>>>>> behaviour, but cap the size based on time. This should allow for all
>>>>>> late-arriving events to be processed, and should be customizable by
>>>>>> the
>>>>>> user to tailor to their own needs (ie: perhaps just 10 minutes of
>>>>>>
>>>>> window,
>>>
>>>> or perhaps 7 days...).
>>>>>>
>>>>>> Hi Adam, using time based retention can do the trick here. Even if I
>>>>> would still like to see the automatic repartitioning optional since I
>>>>>
>>>> would
>>>
>>>> just reshuffle again. With windowed store I am a little bit sceptical
>>>>>
>>>> about
>>>
>>>> how to determine the window. So esentially one could run into problems
>>>>>
>>>> when
>>>
>>>> the rapid change happens near a window border. I will check you
>>>>> implementation in detail, if its problematic, we could still check
>>>>> _all_
>>>>> windows on read with not to bad performance impact I guess. Will let
>>>>> you
>>>>> know if the implementation would be correct as is. I wouldn't not like
>>>>>
>>>> to
>>>
>>>> assume that: offset(A) < offset(B) => timestamp(A)  < timestamp(B). I
>>>>>
>>>> think
>>>
>>>> we can't expect that.
>>>>>
>>>>>
>>>>>>
>>>>>> @Jan
>>>>>> I believe I understand what you mean now - thanks for the diagram, it
>>>>>> did really help. You are correct that I do not have the original
>>>>>>
>>>>> primary
>>>
>>>> key available, and I can see that if it was available then you would be
>>>>>> able to add and remove events from the Map. That being said, I
>>>>>>
>>>>> encourage
>>>
>>>> you to finish your diagrams / charts just for clarity for everyone
>>>>>>
>>>>> else.
>>>
>>>>
>>>>>> Yeah 100%, this giphy thing is just really hard work. But I understand
>>>>>>
>>>>> the benefits for the rest. Sorry about the original primary key, We
>>>>> have
>>>>> join and Group by implemented our own in PAPI and basically not using
>>>>>
>>>> any
>>>
>>>> DSL (Just the abstraction). Completely missed that in original DSL its
>>>>>
>>>> not
>>>
>>>> there and just assumed it. total brain mess up on my end. Will finish
>>>>>
>>>> the
>>>
>>>> chart as soon as i get a quite evening this week.
>>>>>
>>>>> My follow up question for you is, won't the Map stay inside the State
>>>>>
>>>>>> Store indefinitely after all of the changes have propagated? Isn't
>>>>>> this
>>>>>> effectively the same as a highwater mark state store?
>>>>>>
>>>>>> Thing is that if the map is empty, substractor is gonna return `null`
>>>>>
>>>> and
>>>
>>>> the key is removed from the keyspace. But there is going to be a store
>>>>> 100%, the good thing is that I can use this store directly for
>>>>> materialize() / enableSendingOldValues() is a regular store, satisfying
>>>>> all gurantees needed for further groupby / join. The Windowed store is
>>>>>
>>>> not
>>>
>>>> keeping the values, so for the next statefull operation we would
>>>>> need to instantiate an extra store. or we have the window store also
>>>>>
>>>> have
>>>
>>>> the values then.
>>>>>
>>>>> Long story short. if we can flip in a custom group by before
>>>>> repartitioning to the original primary key i think it would help the
>>>>>
>>>> users
>>>
>>>> big time in building efficient apps. Given the original primary key
>>>>>
>>>> issue I
>>>
>>>> understand that we do not have a solid foundation to build on.
>>>>> Leaving primary key carry along to the user. very unfortunate. I could
>>>>> understand the decision goes like that. I do not think its a good
>>>>>
>>>> decision.
>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Adam
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 11, 2018 at 10:07 AM, Prajakta Dumbre <
>>>>>> dumbreprajakta...@gmail.com <mailto:dumbreprajakta...@gmail.com>>
>>>>>>
>>>>> wrote:
>>>
>>>>
>>>>>>      please remove me from this group
>>>>>>
>>>>>>      On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak
>>>>>>      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
>>>>>>
>>>>>>      wrote:
>>>>>>
>>>>>>      > Hi Adam,
>>>>>>      >
>>>>>>      > give me some time, will make such a chart. last time i didn't
>>>>>>      get along
>>>>>>      > well with giphy and ruined all your charts.
>>>>>>      > Hopefully i can get it done today
>>>>>>      >
>>>>>>      > On 08.09.2018 16:00, Adam Bellemare wrote:
>>>>>>      > > Hi Jan
>>>>>>      > >
>>>>>>      > > I have included a diagram of what I attempted on the KIP.
>>>>>>      > >
>>>>>>      >
>>>>>>      https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Su
>>>>>> pport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoining
>>>>>> inKTable-GroupBy+Reduce/Aggregate
>>>>>>      <https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+S
>>>>>> upport+non-key+joining+in+KTable#KIP-213Supportnon-keyjoinin
>>>>>> ginKTable-GroupBy+Reduce/Aggregate>
>>>>>>      > >
>>>>>>      > > I attempted this back at the start of my own implementation
>>>>>> of
>>>>>>      this
>>>>>>      > > solution, and since I could not get it to work I have since
>>>>>>      discarded the
>>>>>>      > > code. At this point in time, if you wish to continue pursuing
>>>>>>      for your
>>>>>>      > > groupBy solution, I ask that you please create a diagram on
>>>>>>      the KIP
>>>>>>      > > carefully explaining your solution. Please feel free to use
>>>>>>      the image I
>>>>>>      > > just posted as a starting point. I am having trouble
>>>>>>      understanding your
>>>>>>      > > explanations but I think that a carefully constructed diagram
>>>>>>      will clear
>>>>>>      > up
>>>>>>      > > any misunderstandings. Alternately, please post a
>>>>>>      comprehensive PR with
>>>>>>      > > your solution. I can only guess at what you mean, and since I
>>>>>>      value my
>>>>>>      > own
>>>>>>      > > time as much as you value yours, I believe it is your
>>>>>>      responsibility to
>>>>>>      > > provide an implementation instead of me trying to guess.
>>>>>>      > >
>>>>>>      > > Adam
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > >
>>>>>>      > > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak
>>>>>>      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
>>>>>>
>>>>>>      > > wrote:
>>>>>>      > >
>>>>>>      > >> Hi James,
>>>>>>      > >>
>>>>>>      > >> nice to see you beeing interested. kafka streams at this
>>>>>>      point supports
>>>>>>      > >> all sorts of joins as long as both streams have the same
>>>>>> key.
>>>>>>      > >> Adam is currently implementing a join where a KTable and a
>>>>>>      KTable can
>>>>>>      > have
>>>>>>      > >> a one to many relation ship (1:n). We exploit that rocksdb
>>>>>> is
>>>>>>
>>>>> a
>>>
>>>>      > >> datastore that keeps data sorted (At least exposes an API to
>>>>>>      access the
>>>>>>      > >> stored data in a sorted fashion).
>>>>>>      > >>
>>>>>>      > >> I think the technical caveats are well understood now and we
>>>>>>
>>>>> are
>>>
>>>>      > basically
>>>>>>      > >> down to philosophy and API Design ( when Adam sees my newest
>>>>>>      message).
>>>>>>      > >> I have a lengthy track record of loosing those kinda
>>>>>>      arguments within
>>>>>>      > the
>>>>>>      > >> streams community and I have no clue why. So I literally
>>>>>>      can't wait for
>>>>>>      > you
>>>>>>      > >> to churn through this thread and give you opinion on how we
>>>>>>      should
>>>>>>      > design
>>>>>>      > >> the return type of the oneToManyJoin and how many power we
>>>>>>      want to give
>>>>>>      > to
>>>>>>      > >> the user vs "simplicity" (where simplicity isn't really that
>>>>>>      as users
>>>>>>      > still
>>>>>>      > >> need to understand it I argue)
>>>>>>      > >>
>>>>>>      > >> waiting for you to join in on the discussion
>>>>>>      > >>
>>>>>>      > >> Best Jan
>>>>>>      > >>
>>>>>>      > >>
>>>>>>      > >>
>>>>>>      > >> On 07.09.2018 15:49, James Kwan wrote:
>>>>>>      > >>
>>>>>>      > >>> I am new to this group and I found this subject
>>>>>>      interesting.  Sounds
>>>>>>      > like
>>>>>>      > >>> you guys want to implement a join table of two streams? Is
>>>>>> there
>>>>>>      > somewhere
>>>>>>      > >>> I can see the original requirement or proposal?
>>>>>>      > >>>
>>>>>>      > >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak
>>>>>>      <jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
>>>>>>
>>>>>>      > >>>> wrote:
>>>>>>      > >>>>
>>>>>>      > >>>>
>>>>>>      > >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
>>>>>>      > >>>>
>>>>>>      > >>>>> I'm currently testing using a Windowed Store to store the
>>>>>>      highwater
>>>>>>      > >>>>> mark.
>>>>>>      > >>>>> By all indications this should work fine, with the caveat
>>>>>>      being that
>>>>>>      > it
>>>>>>      > >>>>> can
>>>>>>      > >>>>> only resolve out-of-order arrival for up to the size of
>>>>>>      the window
>>>>>>      > (ie:
>>>>>>      > >>>>> 24h, 72h, etc). This would remove the possibility of it
>>>>>>
>>>>> being
>>>
>>>>      > unbounded
>>>>>>      > >>>>> in
>>>>>>      > >>>>> size.
>>>>>>      > >>>>>
>>>>>>      > >>>>> With regards to Jan's suggestion, I believe this is where
>>>>>>      we will
>>>>>>      > have
>>>>>>      > >>>>> to
>>>>>>      > >>>>> remain in disagreement. While I do not disagree with your
>>>>>>      statement
>>>>>>      > >>>>> about
>>>>>>      > >>>>> there likely to be additional joins done in a real-world
>>>>>>      workflow, I
>>>>>>      > do
>>>>>>      > >>>>> not
>>>>>>      > >>>>> see how you can conclusively deal with out-of-order
>>>>>> arrival
>>>>>> of
>>>>>>      > >>>>> foreign-key
>>>>>>      > >>>>> changes and subsequent joins. I have attempted what I
>>>>>>      think you have
>>>>>>      > >>>>> proposed (without a high-water, using groupBy and reduce)
>>>>>>      and found
>>>>>>      > >>>>> that if
>>>>>>      > >>>>> the foreign key changes too quickly, or the load on a
>>>>>>      stream thread
>>>>>>      > is
>>>>>>      > >>>>> too
>>>>>>      > >>>>> high, the joined messages will arrive out-of-order and be
>>>>>>      incorrectly
>>>>>>      > >>>>> propagated, such that an intermediate event is
>>>>>> represented
>>>>>>      as the
>>>>>>      > final
>>>>>>      > >>>>> event.
>>>>>>      > >>>>>
>>>>>>      > >>>> Can you shed some light on your groupBy implementation.
>>>>>>      There must be
>>>>>>      > >>>> some sort of flaw in it.
>>>>>>      > >>>> I have a suspicion where it is, I would just like to
>>>>>>      confirm. The idea
>>>>>>      > >>>> is bullet proof and it must be
>>>>>>      > >>>> an implementation mess up. I would like to clarify before
>>>>>>      we draw a
>>>>>>      > >>>> conclusion.
>>>>>>      > >>>>
>>>>>>      > >>>>    Repartitioning the scattered events back to their
>>>>>>
>>>>> original
>>>
>>>>      > >>>>> partitions is the only way I know how to conclusively deal
>>>>>>      with
>>>>>>      > >>>>> out-of-order events in a given time frame, and to ensure
>>>>>>      that the
>>>>>>      > data
>>>>>>      > >>>>> is
>>>>>>      > >>>>> eventually consistent with the input events.
>>>>>>      > >>>>>
>>>>>>      > >>>>> If you have some code to share that illustrates your
>>>>>>      approach, I
>>>>>>      > would
>>>>>>      > >>>>> be
>>>>>>      > >>>>> very grateful as it would remove any misunderstandings
>>>>>>      that I may
>>>>>>      > have.
>>>>>>      > >>>>>
>>>>>>      > >>>> ah okay you were looking for my code. I don't have
>>>>>>      something easily
>>>>>>      > >>>> readable here as its bloated with OO-patterns.
>>>>>>      > >>>>
>>>>>>      > >>>> its anyhow trivial:
>>>>>>      > >>>>
>>>>>>      > >>>> @Override
>>>>>>      > >>>>      public T apply(K aggKey, V value, T aggregate)
>>>>>>      > >>>>      {
>>>>>>      > >>>>          Map<U, V> currentStateAsMap = asMap(aggregate);
>>>>>> <<
>>>>>>      imaginary
>>>>>>      > >>>>          U toModifyKey = mapper.apply(value);
>>>>>>      > >>>>              << this is the place where people actually
>>>>>>      gonna have
>>>>>>      > issues
>>>>>>      > >>>> and why you probably couldn't do it. we would need to find
>>>>>>      a solution
>>>>>>      > here.
>>>>>>      > >>>> I didn't realize that yet.
>>>>>>      > >>>>              << we propagate the field in the joiner, so
>>>>>>      that we can
>>>>>>      > pick
>>>>>>      > >>>> it up in an aggregate. Probably you have not thought of
>>>>>>      this in your
>>>>>>      > >>>> approach right?
>>>>>>      > >>>>              << I am very open to find a generic solution
>>>>>>      here. In my
>>>>>>      > >>>> honest opinion this is broken in KTableImpl.GroupBy that
>>>>>> it
>>>>>>      looses
>>>>>>      > the keys
>>>>>>      > >>>> and only maintains the aggregate key.
>>>>>>      > >>>>              << I abstracted it away back then way before
>>>>>> i
>>>>>> was
>>>>>>      > thinking
>>>>>>      > >>>> of oneToMany join. That is why I didn't realize its
>>>>>>      significance here.
>>>>>>      > >>>>              << Opinions?
>>>>>>      > >>>>
>>>>>>      > >>>>          for (V m : current)
>>>>>>      > >>>>          {
>>>>>>      > >>>> currentStateAsMap.put(mapper.apply(m), m);
>>>>>>      > >>>>          }
>>>>>>      > >>>>          if (isAdder)
>>>>>>      > >>>>          {
>>>>>>      > >>>> currentStateAsMap.put(toModifyKey, value);
>>>>>>      > >>>>          }
>>>>>>      > >>>>          else
>>>>>>      > >>>>          {
>>>>>>      > >>>> currentStateAsMap.remove(toModifyKey);
>>>>>>      > >>>> if(currentStateAsMap.isEmpty()){
>>>>>>      > >>>>                  return null;
>>>>>>      > >>>>              }
>>>>>>      > >>>>          }
>>>>>>      > >>>>          retrun asAggregateType(currentStateAsMap)
>>>>>>      > >>>>      }
>>>>>>      > >>>>
>>>>>>      > >>>>
>>>>>>      > >>>>
>>>>>>      > >>>>
>>>>>>      > >>>>
>>>>>>      > >>>> Thanks,
>>>>>>      > >>>>> Adam
>>>>>>      > >>>>>
>>>>>>      > >>>>>
>>>>>>      > >>>>>
>>>>>>      > >>>>>
>>>>>>      > >>>>>
>>>>>>      > >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
>>>>>>      > jan.filip...@trivago.com <mailto:jan.filip...@trivago.com>>
>>>>>>
>>>>>>      > >>>>> wrote:
>>>>>>      > >>>>>
>>>>>>      > >>>>> Thanks Adam for bringing Matthias to speed!
>>>>>>      > >>>>>> about the differences. I think re-keying back should be
>>>>>>      optional at
>>>>>>      > >>>>>> best.
>>>>>>      > >>>>>> I would say we return a KScatteredTable with reshuffle()
>>>>>>      returning
>>>>>>      > >>>>>> KTable<originalKey,Joined> to make the backwards
>>>>>>      repartitioning
>>>>>>      > >>>>>> optional.
>>>>>>      > >>>>>> I am also in a big favour of doing the out of order
>>>>>>      processing using
>>>>>>      > >>>>>> group
>>>>>>      > >>>>>> by instead high water mark tracking.
>>>>>>      > >>>>>> Just because unbounded growth is just scary + It saves
>>>>>> us
>>>>>>      the header
>>>>>>      > >>>>>> stuff.
>>>>>>      > >>>>>>
>>>>>>      > >>>>>> I think the abstraction of always repartitioning back is
>>>>>>      just not so
>>>>>>      > >>>>>> strong. Like the work has been done before we partition
>>>>>>      back and
>>>>>>      > >>>>>> grouping
>>>>>>      > >>>>>> by something else afterwards is really common.
>>>>>>      > >>>>>>
>>>>>>      > >>>>>>
>>>>>>      > >>>>>>
>>>>>>      > >>>>>>
>>>>>>      > >>>>>>
>>>>>>      > >>>>>>
>>>>>>      > >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
>>>>>>      > >>>>>>
>>>>>>      > >>>>>> Hi Matthias
>>>>>>      > >>>>>>> Thank you for your feedback, I do appreciate it!
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> While name spacing would be possible, it would require
>>>>>> to
>>>>>>      > deserialize
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>> user headers what implies a runtime overhead. I would
>>>>>>      suggest to
>>>>>>      > no
>>>>>>      > >>>>>>>> namespace for now to avoid the overhead. If this
>>>>>>
>>>>> becomes a
>>>
>>>>      > problem in
>>>>>>      > >>>>>>>> the future, we can still add name spacing later on.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Agreed. I will go with using a reserved string and
>>>>>>      document it.
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> My main concern about the design it the type of the
>>>>>>      result KTable:
>>>>>>      > If
>>>>>>      > >>>>>>> I
>>>>>>      > >>>>>>> understood the proposal correctly,
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> In your example, you have table1 and table2 swapped.
>>>>>>      Here is how it
>>>>>>      > >>>>>>> works
>>>>>>      > >>>>>>> currently:
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> 1) table1 has the records that contain the foreign key
>>>>>>      within their
>>>>>>      > >>>>>>> value.
>>>>>>      > >>>>>>> table1 input stream: <a,(fk=A,bar=1)>,
>>>>>> <b,(fk=A,bar=2)>,
>>>>>>      > >>>>>>> <c,(fk=B,bar=3)>
>>>>>>      > >>>>>>> table2 input stream: <A,X>, <B,Y>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> 2) A Value mapper is required to extract the foreign
>>>>>> key.
>>>>>>      > >>>>>>> table1 foreign key mapper: ( value => value.fk
>>>>>>      <http://value.fk> )
>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> The mapper is applied to each element in table1, and a
>>>>>>      new combined
>>>>>>      > >>>>>>> key is
>>>>>>      > >>>>>>> made:
>>>>>>      > >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b,
>>>>>> (fk=A,bar=2)>,
>>>>>>      <B-c,
>>>>>>      > >>>>>>> (fk=B,bar=3)>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> 3) The rekeyed events are copartitioned with table2:
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> a) Stream Thread with Partition 0:
>>>>>>      > >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b,
>>>>>>      (fk=A,bar=2)>
>>>>>>      > >>>>>>> Table2: <A,X>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> b) Stream Thread with Partition 1:
>>>>>>      > >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>>>>>>      > >>>>>>> Table2: <B,Y>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> 4) From here, they can be joined together locally by
>>>>>>      applying the
>>>>>>      > >>>>>>> joiner
>>>>>>      > >>>>>>> function.
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> At this point, Jan's design and my design deviate. My
>>>>>>      design goes
>>>>>>      > on
>>>>>>      > >>>>>>> to
>>>>>>      > >>>>>>> repartition the data post-join and resolve out-of-order
>>>>>>      arrival of
>>>>>>      > >>>>>>> records,
>>>>>>      > >>>>>>> finally returning the data keyed just the original key.
>>>>>>      I do not
>>>>>>      > >>>>>>> expose
>>>>>>      > >>>>>>> the
>>>>>>      > >>>>>>> CombinedKey or any of the internals outside of the
>>>>>>      joinOnForeignKey
>>>>>>      > >>>>>>> function. This does make for larger footprint, but it
>>>>>>      removes all
>>>>>>      > >>>>>>> agency
>>>>>>      > >>>>>>> for resolving out-of-order arrivals and handling
>>>>>>      CombinedKeys from
>>>>>>      > the
>>>>>>      > >>>>>>> user. I believe that this makes the function much
>>>>>> easier
>>>>>>      to use.
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> Let me know if this helps resolve your questions, and
>>>>>>      please feel
>>>>>>      > >>>>>>> free to
>>>>>>      > >>>>>>> add anything else on your mind.
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> Thanks again,
>>>>>>      > >>>>>>> Adam
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
>>>>>>      > >>>>>>> matth...@confluent.io <mailto:matth...@confluent.io>>
>>>>>>
>>>>>>      > >>>>>>> wrote:
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>> Hi,
>>>>>>      > >>>>>>>
>>>>>>      > >>>>>>>> I am just catching up on this thread. I did not read
>>>>>>      everything so
>>>>>>      > >>>>>>>> far,
>>>>>>      > >>>>>>>> but want to share couple of initial thoughts:
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Headers: I think there is a fundamental difference
>>>>>>      between header
>>>>>>      > >>>>>>>> usage
>>>>>>      > >>>>>>>> in this KIP and KP-258. For 258, we add headers to
>>>>>>      changelog topic
>>>>>>      > >>>>>>>> that
>>>>>>      > >>>>>>>> are owned by Kafka Streams and nobody else is supposed
>>>>>>      to write
>>>>>>      > into
>>>>>>      > >>>>>>>> them. In fact, no user header are written into the
>>>>>>      changelog topic
>>>>>>      > >>>>>>>> and
>>>>>>      > >>>>>>>> thus, there are not conflicts.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Nevertheless, I don't see a big issue with using
>>>>>>      headers within
>>>>>>      > >>>>>>>> Streams.
>>>>>>      > >>>>>>>> As long as we document it, we can have some "reserved"
>>>>>>      header keys
>>>>>>      > >>>>>>>> and
>>>>>>      > >>>>>>>> users are not allowed to use when processing data with
>>>>>>      Kafka
>>>>>>      > Streams.
>>>>>>      > >>>>>>>> IMHO, this should be ok.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> I think there is a safe way to avoid conflicts, since
>>>>>> these
>>>>>>      > headers
>>>>>>      > >>>>>>>> are
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>> only needed in internal topics (I think):
>>>>>>      > >>>>>>>>> For internal and changelog topics, we can namespace
>>>>>>      all headers:
>>>>>>      > >>>>>>>>> * user-defined headers are namespaced as "external."
>>>>>> +
>>>>>>      headerKey
>>>>>>      > >>>>>>>>> * internal headers are namespaced as "internal." +
>>>>>>      headerKey
>>>>>>      > >>>>>>>>>
>>>>>>      > >>>>>>>>> While name spacing would be possible, it would
>>>>>> require
>>>>>>
>>>>> to
>>>
>>>>      > >>>>>>>> deserialize
>>>>>>      > >>>>>>>> user headers what implies a runtime overhead. I would
>>>>>>      suggest to
>>>>>>      > no
>>>>>>      > >>>>>>>> namespace for now to avoid the overhead. If this
>>>>>>
>>>>> becomes a
>>>
>>>>      > problem in
>>>>>>      > >>>>>>>> the future, we can still add name spacing later on.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> My main concern about the design it the type of the
>>>>>>      result KTable:
>>>>>>      > >>>>>>>> If I
>>>>>>      > >>>>>>>> understood the proposal correctly,
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> KTable<K1,V1> table1 = ...
>>>>>>      > >>>>>>>> KTable<K2,V2> table2 = ...
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> implies that the `joinedTable` has the same key as the
>>>>>>      left input
>>>>>>      > >>>>>>>> table.
>>>>>>      > >>>>>>>> IMHO, this does not work because if table2 contains
>>>>>>      multiple rows
>>>>>>      > >>>>>>>> that
>>>>>>      > >>>>>>>> join with a record in table1 (what is the main purpose
>>>>>>
>>>>> of
>>>
>>>> a
>>>>>>      > foreign
>>>>>>      > >>>>>>>> key
>>>>>>      > >>>>>>>> join), the result table would only contain a single
>>>>>>      join result,
>>>>>>      > but
>>>>>>      > >>>>>>>> not
>>>>>>      > >>>>>>>> multiple.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Example:
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> table1 input stream: <A,X>
>>>>>>      > >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> We use table2 value a foreign key to table1 key (ie,
>>>>>>      "A" joins).
>>>>>>      > If
>>>>>>      > >>>>>>>> the
>>>>>>      > >>>>>>>> result key is the same key as key of table1, this
>>>>>>      implies that the
>>>>>>      > >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)>
>>>>>>      but not
>>>>>>      > both.
>>>>>>      > >>>>>>>> Because the share the same key, whatever result record
>>>>>>      we emit
>>>>>>      > later,
>>>>>>      > >>>>>>>> overwrite the previous result.
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> This is the reason why Jan originally proposed to use
>>>>>> a
>>>>>>      > combination
>>>>>>      > >>>>>>>> of
>>>>>>      > >>>>>>>> both primary keys of the input tables as key of the
>>>>>>      output table.
>>>>>>      > >>>>>>>> This
>>>>>>      > >>>>>>>> makes the keys of the output table unique and we can
>>>>>>      store both in
>>>>>>      > >>>>>>>> the
>>>>>>      > >>>>>>>> output table:
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Thoughts?
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> -Matthias
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>>>>>>      > >>>>>>>>
>>>>>>      > >>>>>>>> Just on remark here.
>>>>>>      > >>>>>>>>> The high-watermark could be disregarded. The decision
>>>>>>      about the
>>>>>>      > >>>>>>>>> forward
>>>>>>      > >>>>>>>>> depends on the size of the aggregated map.
>>>>>>      > >>>>>>>>> Only 1 element long maps would be unpacked and
>>>>>>      forwarded. 0
>>>>>>      > element
>>>>>>      > >>>>>>>>> maps
>>>>>>      > >>>>>>>>> would be published as delete. Any other count
>>>>>>      > >>>>>>>>> of map entries is in "waiting for correct deletes to
>>>>>>      > arrive"-state.
>>>>>>      > >>>>>>>>>
>>>>>>      > >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
>>>>>>      > >>>>>>>>>
>>>>>>      > >>>>>>>>> It does look like I could replace the second
>>>>>>      repartition store
>>>>>>      > and
>>>>>>      > >>>>>>>>>> highwater store with a groupBy and reduce.  However,
>>>>>>      it looks
>>>>>>      > like
>>>>>>      > >>>>>>>>>> I
>>>>>>      > >>>>>>>>>> would
>>>>>>      > >>>>>>>>>> still need to store the highwater value within the
>>>>>>      materialized
>>>>>>      > >>>>>>>>>> store,
>>>>>>      > >>>>>>>>>>
>>>>>>      > >>>>>>>>>> to
>>>>>>      > >>>>>>>>> compare the arrival of out-of-order records (assuming
>>>>>>
>>>>> my
>>>
>>>>      > >>>>>>>>> understanding
>>>>>>      > >>>>>>>>> of
>>>>>>      > >>>>>>>>> THIS is correct...). This in effect is the same as
>>>>>> the
>>>>>>      design I
>>>>>>      > have
>>>>>>      > >>>>>>>>> now,
>>>>>>      > >>>>>>>>> just with the two tables merged together.
>>>>>>      > >>>>>>>>>
>>>>>>      >
>>>>>>      >
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>>
>>
>

Reply via email to