Adam,

I finally had the time to review the KIP and catch up on the mailing
list discussion. Thanks a lot for putting this together! Great work! Of
course also big thanks to Jan who started the KIP initially.

This is a long email, because I re-read the discussion for multiple
month and reply to many things... I don't think there is a need to reply
to every point I mention. Just want to add my 2 cents to a couple of
points that were discussed.


(0) Overall the design makes sense to me. The API is intuitive and clean
now. The API in the original proposal leaked a lot of implementation
details, what was a major concern to me. I also believe that it's
important to partition the data of the result KTable correctly (the
KScatteredTable does violate this; ie, the "key is useless" as Jan
phrased it), thus the last step seems to be mandatory to me. Also adding
a KScatteredKTable adds a lot of new public API that is basically just
duplicating existing APIs (filter, mapValue, groupBy, toStream, join).
Lastly, I am happy that we don't need to "watermark/header" stuff to fix
the ordering race condition.

(1) About the optimization potential for multiple consecutive join: I
think we could tackle this with the optimization framework we have in
place now.

(2) I was also thinking about left/outer join, and I believe that we
could add a left-join easily (as follow up work; even if I think it's
not a big addition to the current design). However, an outer-join does
not make too much sense because we don't have a key for the result
KTable of "right hand side" records that don't join (ie, the
right-outer-join part cannot be done).

(3) About the "emit on change" vs "emit on update" discussion. I think
this is orthogonal to this KIP and I would stick with "emit on update"
because this is the current behavior of all existing operators. If we
want to change it, we should consider to do this for all operators. I
also believe, even if it does not change the API, it should be backed
with a KIP, because it is a semantics (ie, major) change.



@Jan:

> I have a lengthy track record of loosing those kinda arguments within the 
> streams community and I have no clue why

Because you are a power user, that has different goals in mind. We tend
to optimize the API that it's easy to use for non-power users what is
the majority of people. The KScatteredTable is a hard to grog concept...

> where simplicity isn't really that as users still need to understand it I 
> argue

I disagree here. If we do a good job designing the APIs, user don't need
to understand the nitty-gritty details, and it "just works".


For the performance discussion, ie, which side is "larger": this does
not really matter (number of keys is irrelevant) IHMO. The question is,
which side is _updated_ more often and what is "n" (the factor "n" would
not be relevant for Jan's proposal though). For every left hand side
update, we send 2 messages to the right hand side and get 2 messages
back. For every right hand side update we send n messages to the left
hand side.

I agree with Jan we can't know this though (not the irrelevant "size" of
each side, nor the "n", nor the update rate).





Finally, couple of questions/comments on the KIP (please reply to this
part :)):

 - For the materialized combined-key store, why do we need to disable
caching? And why do we need to flush the store?

 - About resolving order:

(a) for each LHS update, we need to send two records to the RHS (one to
"unsubscribe" the old FK, and on to "subscribe" to the new FK). The KIP
further proposes to send two records back: `null` for the unsubscribe
and a new join "result" for the new FK. This can result in ordering
issues that we want to resolve with the FK lookup in the final step.

> The thing is that in that last join, we have the opportunity to compare the
> current FK in the left table with the incoming PK of the right table. If
> they don't match, we just drop the event, since it must be outdated.

Jan criticized this as "swallowing" updates if they arrive out-of-order
and the delete is not reflected in the result KTable (if I understood
him correctly). I disagree with Jan, and actually think, we should
always avoid the delete on the result KTable to begin with:

If both records arrive in the correct order on the LHS, we would still
produce two output messages downstream. This is intuitive, because we
_know_ that a single update to the LHS table, should result in _one_
update to the result KTable. And we also know, that the update is based
on (ie, correct for) the new FK.

Thus, I am wondering why we would need to send the `null` message back
(from RHS to LHS) in the first place?

Instead, we could encode if the RHS should send something back or not.
This way, an "unsubscribe" message will only update the store for the
CominedKey (ie, delete the corresponding entry) and only the new FK will
trigger a join lookup in the RHS table to compute a "result" that is
sent back. If the LHS input is a tombstone, we send on "unsubscribe" as
always, plus a `null` "subscribe" message: this ensures that we still
get a join result tombstone back to update (ie, delete the entry from)
the result KTable.

Example: (we start with two empty tables)

1- RHS is updated to Y|foo
2- RHS is updated to Z|bar
3- LHS is updates to A|Y
   -> sends Y|A+ subscribe message to RHS (no unsubscribe necessary)
4- RHS processes Y|A+ subscribe message
   -> sends A|Y,foo message back
5- LHS processed A|Y,foo and produces result record A|Y,foo
6- LSH is updates to A|Z
   -> sends Y|A- unsubscribe message to RHS
   -> sends Z|A+ subscribe message to RHS
7- RHS processes Y|A- unsubscribe message (update store only)
8- RHS processes Z|A+ subscribe message
   -> sends A|Z,bar message back
9- LHS processed A|Z,bar and produces result record A|Z,bar


delete case (cont example):

10- LSH is updates to A|null (tombstone)
    -> sends Z|A* subscribe message to RHS
    (* indicates tombstone, we still need to encode A to be able to
delete on RHS)
11- RHS processes Z|A* subscribe message (update store, ie, delete)
    -> sends A|null message back
13- LHS processed A|null and produces result record A|null


Maybe we could even shortcut this further, by sending only the old FK
"unsubscribe" message and emit a tombstone to the result KTable
directly. If there are "stuck" join results for the same LHS record on
the RHS that arrive later, we can detect this case, because there is no
LHS record anymore, and thus drop those records. However, I am not 100%
sure if this would be correct (cf. point (c) below).

delete case (optimized):

10- LSH is updates to A|null (tombstone)
    -> sends Z|A- unsubscribe message to RHS
    -> produces result record A|null directly
11- RHS processes Z|A- unsubscribe message (update store only)


Also note that we still need the logic to resolve un-order, because
there might also be un-order to consecutive LHS updates to the same
record between subscribe messages, too. My proposal above only gets rid
of the race condition within a single LHS update (ie, race between
unsubscribe and subscribe).


(b) About using "offsets" to resolve ordering issue: I don't think this
would work. The join input table would be created as

    stream.flatMapValues().groupByKey().aggregate()

For this case, multiple KTable updates have the same input record and
thus the same offset. Hence, there is no guarantee that offsets are
unique and thus we cannot use them to resolve update conflicts.


(c) Using the current method to avoid races, may not be correct though
(or maybe the scenario below is a case of eventual consistency and not a
correctness issue -- I am not sure how to judge it).

We start with two empty tables:

1- RHS is updated to Y|bar
2- LHS is updated to A|Y,1
   -> sends Y|A+ subscription message to RHS
3- LHS is updated to A|Y,2
   -> sends Y|A- unsubscribe message to RHS
   -> sends Y|A+ subscription to RHS
4- RHS processes first Y|A+ message
   -> sends A|Y,bar back
5- LHS processes A|Y,bar and produces result record A|Y,2,bar
6- RHS processes Y|A- unsubscribe message (update store only)
7- RHS processes second Y|A+ subscribe message
   -> sends A|Y,bar back
8- LHS processes A|Y,bar and produces result record A|Y,2,bar

Thus, the first result record, that should have been `A|Y,1,bar`, is now
`A|Y,2,bar`. Furthermore, we update result table from `A|Y,2,bar` to
`A|Y,2,bar`.

It's unclear to me, if this should be considered an incorrect
(intermediate) result or not? The point being, the result is "eventually
correct too early" because we join the second LHS right twice now
(instead of joining each LHS record once).

From a user point of view, LHS is updated with A|Y,1 and A|Y,2, and
first result record does not produce any output, while second result
record produces a "duplicate" result.

Because all this happens on the same LHS key, I am wondering if this
violated correctness (even if we end up with correct final result).


Same "issue" for delete optimization as mentioned above:

If we shortcut the round trip and only send one unsubscribe message and
emit a tombstone directly on the LHS, there might be in-flight updates
to the same LHS record "stuck" on the right hand side. If we get a new
update (for the same key) to the LHS after the LHS delete, and
afterwards process the "stuck" right hand side updates, we would not be
able to drop those records (because the LHS table is not empty any
longer). Again, we end up with the correct final result, however, I am
not sure if those intermediate results should be consider "incorrect" or
only "wrong in an eventual consistent way".


(I hope I got all examples right... *urgs*)


If you made it this far, I am very proud of you!!


-Matthias






On 1/11/19 12:29 PM, John Roesler wrote:
> Hi Jan,
> 
> Thanks for the reply.
> 
> It sounds like your larger point is that if we provide a building block
> instead of the whole operation, then it's not too hard for users to
> implement the whole operation, and maybe the building block is
> independently useful.
> 
> This is a very fair point. In fact, it's not exclusive with the current
> plan,
> in that we can always add the "building block" version in addition to,
> rather than instead of, the full operation. It very well might be a mistake,
> but I still prefer to begin by introducing the fully encapsulated operation
> and subsequently consider adding the "building block" version if it turns
> out that the encapsulated version is insufficient.
> 
> IMHO, one of Streams's strengths over other processing frameworks
> is a simple API, so simplicity as a design goal seems to suggest that:
>> a.tomanyJoin(B)
> is preferable to
>> a.map(retain(key and FK)).tomanyJoin(B).groupBy(a.key()).join(A)
> at least to start with.
> 
> To answer your question about my latter potential optimization,
> no I don't have any code to look at. But, yes, the implementation
> would bring B into A's tasks and keep them in a state store for joining.
> Thanks for that reference, it does indeed sound similar to what
> MapJoin does in Hive.
> 
> Thanks again,
> -John
> 
> On Mon, Jan 7, 2019 at 5:06 PM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
> 
>>
>>
>> On 02.01.2019 23:44, John Roesler wrote:
>>> However, you seem to have a strong intuition that the scatter/gather
>>> approach is better.
>>> Is this informed by your actual applications at work? Perhaps you can
>>> provide an example
>>> data set and sequence of operations so we can all do the math and agree
>>> with you.
>>> It seems like we should have a convincing efficiency argument before
>>> choosing a more
>>> complicated API over a simpler one.
>>
>> The way I see this is simple. If we only provide the basic
>> implementation of 1:n join (repartition by FK, Range scan on Foreign
>> table update). Then this is such a fundamental building block.
>>
>> I do A join B.
>>
>> a.map(retain(key and FK)).tomanyJoin(B).groupBy(a.key()).join(A). This
>> pretty much performs all your "wire saving optimisations". I don't know!
>> to be honest if someone did put this ContextAwareMapper() that was
>> discussed at some point. Then I could actually do the high watermark
>> thing. a.contextMap(reatain(key, fk and offset).
>> omanyJoin(B).aggregate(a.key(), oldest offset wins).join(A).
>> I don't find the KIP though. I guess it didn't make it.
>>
>> After the repartition and the range read the abstraction just becomes to
>> weak. I just showed that your implementation is my implementation with
>> stuff around it.
>>
>> I don't know if your scatter gather thing is in code somewhere. If the
>> join will only be applied after the gather phase I really wonder where
>> we get the other record from? do you also persist the foreign table on
>> the original side? If that is put into code somewhere already?
>>
>> This would essentially bring B to each of the A's tasks. Factors for
>> this in my case a rather easy and dramatic. Nevertheless an approach I
>> would appreciate. In Hive this could be something closely be related to
>> the concept of a MapJoin. Something I whish we had in streams. I often
>> stated that at some point we need unbounded ammount off offsets per
>> topicpartition and group :D Sooooo good.
>>
>> Long story short. I hope you can follow my line of thought. I hope you
>> can clarify my missunderstanding how the join is performed on A side
>> without materializing B there.
>>
>> I would love if streams would get it right. The basic rule I always say
>> is do what Hive does. done.
>>
>>
>>>
>>> Last thought:
>>>> Regarding what will be observed. I consider it a plus that all events
>>>> that are in the inputs have an respective output. Whereas your solution
>>>> might "swallow" events.
>>>
>>> I didn't follow this. Following Adam's example, we have two join
>> results: a
>>> "dead" one and
>>> a "live" one. If we get the dead one first, both solutions emit it,
>>> followed by the live result.
>>
>> there might be multiple dead once in flight right? But it doesn't really
>> matter, I never did something with the extra benefit i mentioned.
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to