Hi Jan

I have included a diagram of what I attempted on the KIP.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjoininginKTable-GroupBy+Reduce/Aggregate

I attempted this back at the start of my own implementation of this
solution, and since I could not get it to work I have since discarded the
code. At this point in time, if you wish to continue pursuing for your
groupBy solution, I ask that you please create a diagram on the KIP
carefully explaining your solution. Please feel free to use the image I
just posted as a starting point. I am having trouble understanding your
explanations but I think that a carefully constructed diagram will clear up
any misunderstandings. Alternately, please post a comprehensive PR with
your solution. I can only guess at what you mean, and since I value my own
time as much as you value yours, I believe it is your responsibility to
provide an implementation instead of me trying to guess.

Adam









On Sat, Sep 8, 2018 at 8:00 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi James,
>
> nice to see you beeing interested. kafka streams at this point supports
> all sorts of joins as long as both streams have the same key.
> Adam is currently implementing a join where a KTable and a KTable can have
> a one to many relation ship (1:n). We exploit that rocksdb is a
> datastore that keeps data sorted (At least exposes an API to access the
> stored data in a sorted fashion).
>
> I think the technical caveats are well understood now and we are basically
> down to philosophy and API Design ( when Adam sees my newest message).
> I have a lengthy track record of loosing those kinda arguments within the
> streams community and I have no clue why. So I literally can't wait for you
> to churn through this thread and give you opinion on how we should design
> the return type of the oneToManyJoin and how many power we want to give to
> the user vs "simplicity" (where simplicity isn't really that as users still
> need to understand it I argue)
>
> waiting for you to join in on the discussion
>
> Best Jan
>
>
>
> On 07.09.2018 15:49, James Kwan wrote:
>
>> I am new to this group and I found this subject interesting.  Sounds like
>> you guys want to implement a join table of two streams? Is there somewhere
>> I can see the original requirement or proposal?
>>
>> On Sep 7, 2018, at 8:13 AM, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>>
>>> On 05.09.2018 22:17, Adam Bellemare wrote:
>>>
>>>> I'm currently testing using a Windowed Store to store the highwater
>>>> mark.
>>>> By all indications this should work fine, with the caveat being that it
>>>> can
>>>> only resolve out-of-order arrival for up to the size of the window (ie:
>>>> 24h, 72h, etc). This would remove the possibility of it being unbounded
>>>> in
>>>> size.
>>>>
>>>> With regards to Jan's suggestion, I believe this is where we will have
>>>> to
>>>> remain in disagreement. While I do not disagree with your statement
>>>> about
>>>> there likely to be additional joins done in a real-world workflow, I do
>>>> not
>>>> see how you can conclusively deal with out-of-order arrival of
>>>> foreign-key
>>>> changes and subsequent joins. I have attempted what I think you have
>>>> proposed (without a high-water, using groupBy and reduce) and found
>>>> that if
>>>> the foreign key changes too quickly, or the load on a stream thread is
>>>> too
>>>> high, the joined messages will arrive out-of-order and be incorrectly
>>>> propagated, such that an intermediate event is represented as the final
>>>> event.
>>>>
>>> Can you shed some light on your groupBy implementation. There must be
>>> some sort of flaw in it.
>>> I have a suspicion where it is, I would just like to confirm. The idea
>>> is bullet proof and it must be
>>> an implementation mess up. I would like to clarify before we draw a
>>> conclusion.
>>>
>>>   Repartitioning the scattered events back to their original
>>>> partitions is the only way I know how to conclusively deal with
>>>> out-of-order events in a given time frame, and to ensure that the data
>>>> is
>>>> eventually consistent with the input events.
>>>>
>>>> If you have some code to share that illustrates your approach, I would
>>>> be
>>>> very grateful as it would remove any misunderstandings that I may have.
>>>>
>>> ah okay you were looking for my code. I don't have something easily
>>> readable here as its bloated with OO-patterns.
>>>
>>> its anyhow trivial:
>>>
>>> @Override
>>>     public T apply(K aggKey, V value, T aggregate)
>>>     {
>>>         Map<U, V> currentStateAsMap = asMap(aggregate); << imaginary
>>>         U toModifyKey = mapper.apply(value);
>>>             << this is the place where people actually gonna have issues
>>> and why you probably couldn't do it. we would need to find a solution here.
>>> I didn't realize that yet.
>>>             << we propagate the field in the joiner, so that we can pick
>>> it up in an aggregate. Probably you have not thought of this in your
>>> approach right?
>>>             << I am very open to find a generic solution here. In my
>>> honest opinion this is broken in KTableImpl.GroupBy that it looses the keys
>>> and only maintains the aggregate key.
>>>             << I abstracted it away back then way before i was thinking
>>> of oneToMany join. That is why I didn't realize its significance here.
>>>             << Opinions?
>>>
>>>         for (V m : current)
>>>         {
>>>             currentStateAsMap.put(mapper.apply(m), m);
>>>         }
>>>         if (isAdder)
>>>         {
>>>             currentStateAsMap.put(toModifyKey, value);
>>>         }
>>>         else
>>>         {
>>>             currentStateAsMap.remove(toModifyKey);
>>>             if(currentStateAsMap.isEmpty()){
>>>                 return null;
>>>             }
>>>         }
>>>         retrun asAggregateType(currentStateAsMap)
>>>     }
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>>
>>>> Adam
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com>
>>>> wrote:
>>>>
>>>> Thanks Adam for bringing Matthias to speed!
>>>>>
>>>>> about the differences. I think re-keying back should be optional at
>>>>> best.
>>>>> I would say we return a KScatteredTable with reshuffle() returning
>>>>> KTable<originalKey,Joined> to make the backwards repartitioning
>>>>> optional.
>>>>> I am also in a big favour of doing the out of order processing using
>>>>> group
>>>>> by instead high water mark tracking.
>>>>> Just because unbounded growth is just scary + It saves us the header
>>>>> stuff.
>>>>>
>>>>> I think the abstraction of always repartitioning back is just not so
>>>>> strong. Like the work has been done before we partition back and
>>>>> grouping
>>>>> by something else afterwards is really common.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 05.09.2018 13:49, Adam Bellemare wrote:
>>>>>
>>>>> Hi Matthias
>>>>>>
>>>>>> Thank you for your feedback, I do appreciate it!
>>>>>>
>>>>>> While name spacing would be possible, it would require to deserialize
>>>>>>
>>>>>>> user headers what implies a runtime overhead. I would suggest to no
>>>>>>> namespace for now to avoid the overhead. If this becomes a problem in
>>>>>>> the future, we can still add name spacing later on.
>>>>>>>
>>>>>>> Agreed. I will go with using a reserved string and document it.
>>>>>>
>>>>>>
>>>>>>
>>>>>> My main concern about the design it the type of the result KTable: If
>>>>>> I
>>>>>> understood the proposal correctly,
>>>>>>
>>>>>>
>>>>>> In your example, you have table1 and table2 swapped. Here is how it
>>>>>> works
>>>>>> currently:
>>>>>>
>>>>>> 1) table1 has the records that contain the foreign key within their
>>>>>> value.
>>>>>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>,
>>>>>> <c,(fk=B,bar=3)>
>>>>>> table2 input stream: <A,X>, <B,Y>
>>>>>>
>>>>>> 2) A Value mapper is required to extract the foreign key.
>>>>>> table1 foreign key mapper: ( value => value.fk )
>>>>>>
>>>>>> The mapper is applied to each element in table1, and a new combined
>>>>>> key is
>>>>>> made:
>>>>>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
>>>>>> (fk=B,bar=3)>
>>>>>>
>>>>>> 3) The rekeyed events are copartitioned with table2:
>>>>>>
>>>>>> a) Stream Thread with Partition 0:
>>>>>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
>>>>>> Table2: <A,X>
>>>>>>
>>>>>> b) Stream Thread with Partition 1:
>>>>>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>>>>>> Table2: <B,Y>
>>>>>>
>>>>>> 4) From here, they can be joined together locally by applying the
>>>>>> joiner
>>>>>> function.
>>>>>>
>>>>>>
>>>>>>
>>>>>> At this point, Jan's design and my design deviate. My design goes on
>>>>>> to
>>>>>> repartition the data post-join and resolve out-of-order arrival of
>>>>>> records,
>>>>>> finally returning the data keyed just the original key. I do not
>>>>>> expose
>>>>>> the
>>>>>> CombinedKey or any of the internals outside of the joinOnForeignKey
>>>>>> function. This does make for larger footprint, but it removes all
>>>>>> agency
>>>>>> for resolving out-of-order arrivals and handling CombinedKeys from the
>>>>>> user. I believe that this makes the function much easier to use.
>>>>>>
>>>>>> Let me know if this helps resolve your questions, and please feel
>>>>>> free to
>>>>>> add anything else on your mind.
>>>>>>
>>>>>> Thanks again,
>>>>>> Adam
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>> I am just catching up on this thread. I did not read everything so
>>>>>>> far,
>>>>>>> but want to share couple of initial thoughts:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Headers: I think there is a fundamental difference between header
>>>>>>> usage
>>>>>>> in this KIP and KP-258. For 258, we add headers to changelog topic
>>>>>>> that
>>>>>>> are owned by Kafka Streams and nobody else is supposed to write into
>>>>>>> them. In fact, no user header are written into the changelog topic
>>>>>>> and
>>>>>>> thus, there are not conflicts.
>>>>>>>
>>>>>>> Nevertheless, I don't see a big issue with using headers within
>>>>>>> Streams.
>>>>>>> As long as we document it, we can have some "reserved" header keys
>>>>>>> and
>>>>>>> users are not allowed to use when processing data with Kafka Streams.
>>>>>>> IMHO, this should be ok.
>>>>>>>
>>>>>>> I think there is a safe way to avoid conflicts, since these headers
>>>>>>> are
>>>>>>>
>>>>>>>> only needed in internal topics (I think):
>>>>>>>> For internal and changelog topics, we can namespace all headers:
>>>>>>>> * user-defined headers are namespaced as "external." + headerKey
>>>>>>>> * internal headers are namespaced as "internal." + headerKey
>>>>>>>>
>>>>>>>> While name spacing would be possible, it would require to
>>>>>>> deserialize
>>>>>>> user headers what implies a runtime overhead. I would suggest to no
>>>>>>> namespace for now to avoid the overhead. If this becomes a problem in
>>>>>>> the future, we can still add name spacing later on.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> My main concern about the design it the type of the result KTable:
>>>>>>> If I
>>>>>>> understood the proposal correctly,
>>>>>>>
>>>>>>> KTable<K1,V1> table1 = ...
>>>>>>> KTable<K2,V2> table2 = ...
>>>>>>>
>>>>>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
>>>>>>>
>>>>>>> implies that the `joinedTable` has the same key as the left input
>>>>>>> table.
>>>>>>> IMHO, this does not work because if table2 contains multiple rows
>>>>>>> that
>>>>>>> join with a record in table1 (what is the main purpose of a foreign
>>>>>>> key
>>>>>>> join), the result table would only contain a single join result, but
>>>>>>> not
>>>>>>> multiple.
>>>>>>>
>>>>>>> Example:
>>>>>>>
>>>>>>> table1 input stream: <A,X>
>>>>>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>>>>>>>
>>>>>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If
>>>>>>> the
>>>>>>> result key is the same key as key of table1, this implies that the
>>>>>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
>>>>>>> Because the share the same key, whatever result record we emit later,
>>>>>>> overwrite the previous result.
>>>>>>>
>>>>>>> This is the reason why Jan originally proposed to use a combination
>>>>>>> of
>>>>>>> both primary keys of the input tables as key of the output table.
>>>>>>> This
>>>>>>> makes the keys of the output table unique and we can store both in
>>>>>>> the
>>>>>>> output table:
>>>>>>>
>>>>>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
>>>>>>>
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>>>>>>>
>>>>>>> Just on remark here.
>>>>>>>> The high-watermark could be disregarded. The decision about the
>>>>>>>> forward
>>>>>>>> depends on the size of the aggregated map.
>>>>>>>> Only 1 element long maps would be unpacked and forwarded. 0 element
>>>>>>>> maps
>>>>>>>> would be published as delete. Any other count
>>>>>>>> of map entries is in "waiting for correct deletes to arrive"-state.
>>>>>>>>
>>>>>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
>>>>>>>>
>>>>>>>> It does look like I could replace the second repartition store and
>>>>>>>>> highwater store with a groupBy and reduce.  However, it looks like
>>>>>>>>> I
>>>>>>>>> would
>>>>>>>>> still need to store the highwater value within the materialized
>>>>>>>>> store,
>>>>>>>>>
>>>>>>>>> to
>>>>>>>> compare the arrival of out-of-order records (assuming my
>>>>>>>> understanding
>>>>>>>> of
>>>>>>>> THIS is correct...). This in effect is the same as the design I have
>>>>>>>> now,
>>>>>>>> just with the two tables merged together.
>>>>>>>>
>>>>>>>
>

Reply via email to