please remove me from this group

On Tue, Sep 11, 2018 at 1:29 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi Adam,
>
> give me some time, will make such a chart. last time i didn't get along
> well with giphy and ruined all your charts.
> Hopefully i can get it done today
>
> On 08.09.2018 16:00, Adam Bellemare wrote:
> > Hi Jan
> >
> > I have included a diagram of what I attempted on the KIP.
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-GroupBy+Reduce/Aggregate
> >
> > I attempted this back at the start of my own implementation of this
> > solution, and since I could not get it to work I have since discarded the
> > code. At this point in time, if you wish to continue pursuing for your
> > groupBy solution, I ask that you please create a diagram on the KIP
> > carefully explaining your solution. Please feel free to use the image I
> > just posted as a starting point. I am having trouble understanding your
> > explanations but I think that a carefully constructed diagram will clear
> up
> > any misunderstandings. Alternately, please post a comprehensive PR with
> > your solution. I can only guess at what you mean, and since I value my
> own
> > time as much as you value yours, I believe it is your responsibility to
> > provide an implementation instead of me trying to guess.
> >
> > Adam
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak <jan.filip...@trivago.com>
> > wrote:
> >
> >> Hi James,
> >>
> >> nice to see you beeing interested. kafka streams at this point supports
> >> all sorts of joins as long as both streams have the same key.
> >> Adam is currently implementing a join where a KTable and a KTable can
> have
> >> a one to many relation ship (1:n). We exploit that rocksdb is a
> >> datastore that keeps data sorted (At least exposes an API to access the
> >> stored data in a sorted fashion).
> >>
> >> I think the technical caveats are well understood now and we are
> basically
> >> down to philosophy and API Design ( when Adam sees my newest message).
> >> I have a lengthy track record of loosing those kinda arguments within
> the
> >> streams community and I have no clue why. So I literally can't wait for
> you
> >> to churn through this thread and give you opinion on how we should
> design
> >> the return type of the oneToManyJoin and how many power we want to give
> to
> >> the user vs "simplicity" (where simplicity isn't really that as users
> still
> >> need to understand it I argue)
> >>
> >> waiting for you to join in on the discussion
> >>
> >> Best Jan
> >>
> >>
> >>
> >> On 07.09.2018 15:49, James Kwan wrote:
> >>
> >>> I am new to this group and I found this subject interesting.  Sounds
> like
> >>> you guys want to implement a join table of two streams? Is there
> somewhere
> >>> I can see the original requirement or proposal?
> >>>
> >>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com>
> >>>> wrote:
> >>>>
> >>>>
> >>>> On 05.09.2018 22:17, Adam Bellemare wrote:
> >>>>
> >>>>> I'm currently testing using a Windowed Store to store the highwater
> >>>>> mark.
> >>>>> By all indications this should work fine, with the caveat being that
> it
> >>>>> can
> >>>>> only resolve out-of-order arrival for up to the size of the window
> (ie:
> >>>>> 24h, 72h, etc). This would remove the possibility of it being
> unbounded
> >>>>> in
> >>>>> size.
> >>>>>
> >>>>> With regards to Jan's suggestion, I believe this is where we will
> have
> >>>>> to
> >>>>> remain in disagreement. While I do not disagree with your statement
> >>>>> about
> >>>>> there likely to be additional joins done in a real-world workflow, I
> do
> >>>>> not
> >>>>> see how you can conclusively deal with out-of-order arrival of
> >>>>> foreign-key
> >>>>> changes and subsequent joins. I have attempted what I think you have
> >>>>> proposed (without a high-water, using groupBy and reduce) and found
> >>>>> that if
> >>>>> the foreign key changes too quickly, or the load on a stream thread
> is
> >>>>> too
> >>>>> high, the joined messages will arrive out-of-order and be incorrectly
> >>>>> propagated, such that an intermediate event is represented as the
> final
> >>>>> event.
> >>>>>
> >>>> Can you shed some light on your groupBy implementation. There must be
> >>>> some sort of flaw in it.
> >>>> I have a suspicion where it is, I would just like to confirm. The idea
> >>>> is bullet proof and it must be
> >>>> an implementation mess up. I would like to clarify before we draw a
> >>>> conclusion.
> >>>>
> >>>>    Repartitioning the scattered events back to their original
> >>>>> partitions is the only way I know how to conclusively deal with
> >>>>> out-of-order events in a given time frame, and to ensure that the
> data
> >>>>> is
> >>>>> eventually consistent with the input events.
> >>>>>
> >>>>> If you have some code to share that illustrates your approach, I
> would
> >>>>> be
> >>>>> very grateful as it would remove any misunderstandings that I may
> have.
> >>>>>
> >>>> ah okay you were looking for my code. I don't have something easily
> >>>> readable here as its bloated with OO-patterns.
> >>>>
> >>>> its anyhow trivial:
> >>>>
> >>>> @Override
> >>>>      public T apply(K aggKey, V value, T aggregate)
> >>>>      {
> >>>>          Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary
> >>>>          U toModifyKey = mapper.apply(value);
> >>>>              << this is the place where people actually gonna have
> issues
> >>>> and why you probably couldn't do it. we would need to find a solution
> here.
> >>>> I didn't realize that yet.
> >>>>              << we propagate the field in the joiner, so that we can
> pick
> >>>> it up in an aggregate. Probably you have not thought of this in your
> >>>> approach right?
> >>>>              << I am very open to find a generic solution here. In my
> >>>> honest opinion this is broken in KTableImpl.GroupBy that it looses
> the keys
> >>>> and only maintains the aggregate key.
> >>>>              << I abstracted it away back then way before i was
> thinking
> >>>> of oneToMany join. That is why I didn't realize its significance here.
> >>>>              << Opinions?
> >>>>
> >>>>          for (V m : current)
> >>>>          {
> >>>>              currentStateAsMap.put(mapper.apply(m), m);
> >>>>          }
> >>>>          if (isAdder)
> >>>>          {
> >>>>              currentStateAsMap.put(toModifyKey, value);
> >>>>          }
> >>>>          else
> >>>>          {
> >>>>              currentStateAsMap.remove(toModifyKey);
> >>>>              if(currentStateAsMap.isEmpty()){
> >>>>                  return null;
> >>>>              }
> >>>>          }
> >>>>          retrun asAggregateType(currentStateAsMap)
> >>>>      }
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>> Adam
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <
> jan.filip...@trivago.com>
> >>>>> wrote:
> >>>>>
> >>>>> Thanks Adam for bringing Matthias to speed!
> >>>>>> about the differences. I think re-keying back should be optional at
> >>>>>> best.
> >>>>>> I would say we return a KScatteredTable with reshuffle() returning
> >>>>>> KTable<originalKey,Joined> to make the backwards repartitioning
> >>>>>> optional.
> >>>>>> I am also in a big favour of doing the out of order processing using
> >>>>>> group
> >>>>>> by instead high water mark tracking.
> >>>>>> Just because unbounded growth is just scary + It saves us the header
> >>>>>> stuff.
> >>>>>>
> >>>>>> I think the abstraction of always repartitioning back is just not so
> >>>>>> strong. Like the work has been done before we partition back and
> >>>>>> grouping
> >>>>>> by something else afterwards is really common.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
> >>>>>>
> >>>>>> Hi Matthias
> >>>>>>> Thank you for your feedback, I do appreciate it!
> >>>>>>>
> >>>>>>> While name spacing would be possible, it would require to
> deserialize
> >>>>>>>
> >>>>>>>> user headers what implies a runtime overhead. I would suggest to
> no
> >>>>>>>> namespace for now to avoid the overhead. If this becomes a
> problem in
> >>>>>>>> the future, we can still add name spacing later on.
> >>>>>>>>
> >>>>>>>> Agreed. I will go with using a reserved string and document it.
> >>>>>>>
> >>>>>>>
> >>>>>>> My main concern about the design it the type of the result KTable:
> If
> >>>>>>> I
> >>>>>>> understood the proposal correctly,
> >>>>>>>
> >>>>>>>
> >>>>>>> In your example, you have table1 and table2 swapped. Here is how it
> >>>>>>> works
> >>>>>>> currently:
> >>>>>>>
> >>>>>>> 1) table1 has the records that contain the foreign key within their
> >>>>>>> value.
> >>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>,
> >>>>>>> <c,(fk=B,bar=3)>
> >>>>>>> table2 input stream: <A,X>, <B,Y>
> >>>>>>>
> >>>>>>> 2) A Value mapper is required to extract the foreign key.
> >>>>>>> table1 foreign key mapper: ( value => value.fk )
> >>>>>>>
> >>>>>>> The mapper is applied to each element in table1, and a new combined
> >>>>>>> key is
> >>>>>>> made:
> >>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
> >>>>>>> (fk=B,bar=3)>
> >>>>>>>
> >>>>>>> 3) The rekeyed events are copartitioned with table2:
> >>>>>>>
> >>>>>>> a) Stream Thread with Partition 0:
> >>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
> >>>>>>> Table2: <A,X>
> >>>>>>>
> >>>>>>> b) Stream Thread with Partition 1:
> >>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
> >>>>>>> Table2: <B,Y>
> >>>>>>>
> >>>>>>> 4) From here, they can be joined together locally by applying the
> >>>>>>> joiner
> >>>>>>> function.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> At this point, Jan's design and my design deviate. My design goes
> on
> >>>>>>> to
> >>>>>>> repartition the data post-join and resolve out-of-order arrival of
> >>>>>>> records,
> >>>>>>> finally returning the data keyed just the original key. I do not
> >>>>>>> expose
> >>>>>>> the
> >>>>>>> CombinedKey or any of the internals outside of the joinOnForeignKey
> >>>>>>> function. This does make for larger footprint, but it removes all
> >>>>>>> agency
> >>>>>>> for resolving out-of-order arrivals and handling CombinedKeys from
> the
> >>>>>>> user. I believe that this makes the function much easier to use.
> >>>>>>>
> >>>>>>> Let me know if this helps resolve your questions, and please feel
> >>>>>>> free to
> >>>>>>> add anything else on your mind.
> >>>>>>>
> >>>>>>> Thanks again,
> >>>>>>> Adam
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
> >>>>>>> matth...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>>> I am just catching up on this thread. I did not read everything so
> >>>>>>>> far,
> >>>>>>>> but want to share couple of initial thoughts:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Headers: I think there is a fundamental difference between header
> >>>>>>>> usage
> >>>>>>>> in this KIP and KP-258. For 258, we add headers to changelog topic
> >>>>>>>> that
> >>>>>>>> are owned by Kafka Streams and nobody else is supposed to write
> into
> >>>>>>>> them. In fact, no user header are written into the changelog topic
> >>>>>>>> and
> >>>>>>>> thus, there are not conflicts.
> >>>>>>>>
> >>>>>>>> Nevertheless, I don't see a big issue with using headers within
> >>>>>>>> Streams.
> >>>>>>>> As long as we document it, we can have some "reserved" header keys
> >>>>>>>> and
> >>>>>>>> users are not allowed to use when processing data with Kafka
> Streams.
> >>>>>>>> IMHO, this should be ok.
> >>>>>>>>
> >>>>>>>> I think there is a safe way to avoid conflicts, since these
> headers
> >>>>>>>> are
> >>>>>>>>
> >>>>>>>>> only needed in internal topics (I think):
> >>>>>>>>> For internal and changelog topics, we can namespace all headers:
> >>>>>>>>> * user-defined headers are namespaced as "external." + headerKey
> >>>>>>>>> * internal headers are namespaced as "internal." + headerKey
> >>>>>>>>>
> >>>>>>>>> While name spacing would be possible, it would require to
> >>>>>>>> deserialize
> >>>>>>>> user headers what implies a runtime overhead. I would suggest to
> no
> >>>>>>>> namespace for now to avoid the overhead. If this becomes a
> problem in
> >>>>>>>> the future, we can still add name spacing later on.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> My main concern about the design it the type of the result KTable:
> >>>>>>>> If I
> >>>>>>>> understood the proposal correctly,
> >>>>>>>>
> >>>>>>>> KTable<K1,V1> table1 = ...
> >>>>>>>> KTable<K2,V2> table2 = ...
> >>>>>>>>
> >>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
> >>>>>>>>
> >>>>>>>> implies that the `joinedTable` has the same key as the left input
> >>>>>>>> table.
> >>>>>>>> IMHO, this does not work because if table2 contains multiple rows
> >>>>>>>> that
> >>>>>>>> join with a record in table1 (what is the main purpose of a
> foreign
> >>>>>>>> key
> >>>>>>>> join), the result table would only contain a single join result,
> but
> >>>>>>>> not
> >>>>>>>> multiple.
> >>>>>>>>
> >>>>>>>> Example:
> >>>>>>>>
> >>>>>>>> table1 input stream: <A,X>
> >>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
> >>>>>>>>
> >>>>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins).
> If
> >>>>>>>> the
> >>>>>>>> result key is the same key as key of table1, this implies that the
> >>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not
> both.
> >>>>>>>> Because the share the same key, whatever result record we emit
> later,
> >>>>>>>> overwrite the previous result.
> >>>>>>>>
> >>>>>>>> This is the reason why Jan originally proposed to use a
> combination
> >>>>>>>> of
> >>>>>>>> both primary keys of the input tables as key of the output table.
> >>>>>>>> This
> >>>>>>>> makes the keys of the output table unique and we can store both in
> >>>>>>>> the
> >>>>>>>> output table:
> >>>>>>>>
> >>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> >>>>>>>>
> >>>>>>>> Just on remark here.
> >>>>>>>>> The high-watermark could be disregarded. The decision about the
> >>>>>>>>> forward
> >>>>>>>>> depends on the size of the aggregated map.
> >>>>>>>>> Only 1 element long maps would be unpacked and forwarded. 0
> element
> >>>>>>>>> maps
> >>>>>>>>> would be published as delete. Any other count
> >>>>>>>>> of map entries is in "waiting for correct deletes to
> arrive"-state.
> >>>>>>>>>
> >>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
> >>>>>>>>>
> >>>>>>>>> It does look like I could replace the second repartition store
> and
> >>>>>>>>>> highwater store with a groupBy and reduce.  However, it looks
> like
> >>>>>>>>>> I
> >>>>>>>>>> would
> >>>>>>>>>> still need to store the highwater value within the materialized
> >>>>>>>>>> store,
> >>>>>>>>>>
> >>>>>>>>>> to
> >>>>>>>>> compare the arrival of out-of-order records (assuming my
> >>>>>>>>> understanding
> >>>>>>>>> of
> >>>>>>>>> THIS is correct...). This in effect is the same as the design I
> have
> >>>>>>>>> now,
> >>>>>>>>> just with the two tables merged together.
> >>>>>>>>>
>
>

Reply via email to