Hi Matthias

Thank you for your feedback, I do appreciate it!

> While name spacing would be possible, it would require to deserialize
> user headers what implies a runtime overhead. I would suggest to no
> namespace for now to avoid the overhead. If this becomes a problem in
> the future, we can still add name spacing later on.

Agreed. I will go with using a reserved string and document it.



> My main concern about the design it the type of the result KTable: If I
understood the proposal correctly,


In your example, you have table1 and table2 swapped. Here is how it works
currently:

1) table1 has the records that contain the foreign key within their value.
table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)>
table2 input stream: <A,X>, <B,Y>

2) A Value mapper is required to extract the foreign key.
table1 foreign key mapper: ( value => value.fk )

The mapper is applied to each element in table1, and a new combined key is
made:
table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, (fk=B,bar=3)>

3) The rekeyed events are copartitioned with table2:

a) Stream Thread with Partition 0:
RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
Table2: <A,X>

b) Stream Thread with Partition 1:
RepartitionedTable1: <B-c, (fk=B,bar=3)>
Table2: <B,Y>

4) From here, they can be joined together locally by applying the joiner
function.



At this point, Jan's design and my design deviate. My design goes on to
repartition the data post-join and resolve out-of-order arrival of records,
finally returning the data keyed just the original key. I do not expose the
CombinedKey or any of the internals outside of the joinOnForeignKey
function. This does make for larger footprint, but it removes all agency
for resolving out-of-order arrivals and handling CombinedKeys from the
user. I believe that this makes the function much easier to use.

Let me know if this helps resolve your questions, and please feel free to
add anything else on your mind.

Thanks again,
Adam




On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi,
>
> I am just catching up on this thread. I did not read everything so far,
> but want to share couple of initial thoughts:
>
>
>
> Headers: I think there is a fundamental difference between header usage
> in this KIP and KP-258. For 258, we add headers to changelog topic that
> are owned by Kafka Streams and nobody else is supposed to write into
> them. In fact, no user header are written into the changelog topic and
> thus, there are not conflicts.
>
> Nevertheless, I don't see a big issue with using headers within Streams.
> As long as we document it, we can have some "reserved" header keys and
> users are not allowed to use when processing data with Kafka Streams.
> IMHO, this should be ok.
>
> > I think there is a safe way to avoid conflicts, since these headers are
> > only needed in internal topics (I think):
> > For internal and changelog topics, we can namespace all headers:
> > * user-defined headers are namespaced as "external." + headerKey
> > * internal headers are namespaced as "internal." + headerKey
>
> While name spacing would be possible, it would require to deserialize
> user headers what implies a runtime overhead. I would suggest to no
> namespace for now to avoid the overhead. If this becomes a problem in
> the future, we can still add name spacing later on.
>
>
>
> My main concern about the design it the type of the result KTable: If I
> understood the proposal correctly,
>
> KTable<K1,V1> table1 = ...
> KTable<K2,V2> table2 = ...
>
> KTable<K1,V3> joinedTable = table1.join(table2,...);
>
> implies that the `joinedTable` has the same key as the left input table.
> IMHO, this does not work because if table2 contains multiple rows that
> join with a record in table1 (what is the main purpose of a foreign key
> join), the result table would only contain a single join result, but not
> multiple.
>
> Example:
>
> table1 input stream: <A,X>
> table2 input stream: <a,(A,1)>, <b,(A,2)>
>
> We use table2 value a foreign key to table1 key (ie, "A" joins). If the
> result key is the same key as key of table1, this implies that the
> result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
> Because the share the same key, whatever result record we emit later,
> overwrite the previous result.
>
> This is the reason why Jan originally proposed to use a combination of
> both primary keys of the input tables as key of the output table. This
> makes the keys of the output table unique and we can store both in the
> output table:
>
> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
>
>
> Thoughts?
>
>
> -Matthias
>
>
>
>
> On 9/4/18 1:36 PM, Jan Filipiak wrote:
> > Just on remark here.
> > The high-watermark could be disregarded. The decision about the forward
> > depends on the size of the aggregated map.
> > Only 1 element long maps would be unpacked and forwarded. 0 element maps
> > would be published as delete. Any other count
> > of map entries is in "waiting for correct deletes to arrive"-state.
> >
> > On 04.09.2018 21:29, Adam Bellemare wrote:
> >> It does look like I could replace the second repartition store and
> >> highwater store with a groupBy and reduce.  However, it looks like I
> >> would
> >> still need to store the highwater value within the materialized store,
> to
> >> compare the arrival of out-of-order records (assuming my understanding
> of
> >> THIS is correct...). This in effect is the same as the design I have
> now,
> >> just with the two tables merged together.
> >
>
>

Reply via email to