I'm currently testing using a Windowed Store to store the highwater mark.
By all indications this should work fine, with the caveat being that it can
only resolve out-of-order arrival for up to the size of the window (ie:
24h, 72h, etc). This would remove the possibility of it being unbounded in
size.

With regards to Jan's suggestion, I believe this is where we will have to
remain in disagreement. While I do not disagree with your statement about
there likely to be additional joins done in a real-world workflow, I do not
see how you can conclusively deal with out-of-order arrival of foreign-key
changes and subsequent joins. I have attempted what I think you have
proposed (without a high-water, using groupBy and reduce) and found that if
the foreign key changes too quickly, or the load on a stream thread is too
high, the joined messages will arrive out-of-order and be incorrectly
propagated, such that an intermediate event is represented as the final
event. Repartitioning the scattered events back to their original
partitions is the only way I know how to conclusively deal with
out-of-order events in a given time frame, and to ensure that the data is
eventually consistent with the input events.

If you have some code to share that illustrates your approach, I would be
very grateful as it would remove any misunderstandings that I may have.

Thanks,

Adam





On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Thanks Adam for bringing Matthias to speed!
>
> about the differences. I think re-keying back should be optional at best.
> I would say we return a KScatteredTable with reshuffle() returning
> KTable<originalKey,Joined> to make the backwards repartitioning optional.
> I am also in a big favour of doing the out of order processing using group
> by instead high water mark tracking.
> Just because unbounded growth is just scary + It saves us the header stuff.
>
> I think the abstraction of always repartitioning back is just not so
> strong. Like the work has been done before we partition back and grouping
> by something else afterwards is really common.
>
>
>
>
>
>
> On 05.09.2018 13:49, Adam Bellemare wrote:
>
>> Hi Matthias
>>
>> Thank you for your feedback, I do appreciate it!
>>
>> While name spacing would be possible, it would require to deserialize
>>> user headers what implies a runtime overhead. I would suggest to no
>>> namespace for now to avoid the overhead. If this becomes a problem in
>>> the future, we can still add name spacing later on.
>>>
>> Agreed. I will go with using a reserved string and document it.
>>
>>
>>
>> My main concern about the design it the type of the result KTable: If I
>>>
>> understood the proposal correctly,
>>
>>
>> In your example, you have table1 and table2 swapped. Here is how it works
>> currently:
>>
>> 1) table1 has the records that contain the foreign key within their value.
>> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)>
>> table2 input stream: <A,X>, <B,Y>
>>
>> 2) A Value mapper is required to extract the foreign key.
>> table1 foreign key mapper: ( value => value.fk )
>>
>> The mapper is applied to each element in table1, and a new combined key is
>> made:
>> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c,
>> (fk=B,bar=3)>
>>
>> 3) The rekeyed events are copartitioned with table2:
>>
>> a) Stream Thread with Partition 0:
>> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>
>> Table2: <A,X>
>>
>> b) Stream Thread with Partition 1:
>> RepartitionedTable1: <B-c, (fk=B,bar=3)>
>> Table2: <B,Y>
>>
>> 4) From here, they can be joined together locally by applying the joiner
>> function.
>>
>>
>>
>> At this point, Jan's design and my design deviate. My design goes on to
>> repartition the data post-join and resolve out-of-order arrival of
>> records,
>> finally returning the data keyed just the original key. I do not expose
>> the
>> CombinedKey or any of the internals outside of the joinOnForeignKey
>> function. This does make for larger footprint, but it removes all agency
>> for resolving out-of-order arrivals and handling CombinedKeys from the
>> user. I believe that this makes the function much easier to use.
>>
>> Let me know if this helps resolve your questions, and please feel free to
>> add anything else on your mind.
>>
>> Thanks again,
>> Adam
>>
>>
>>
>>
>> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>> Hi,
>>>
>>> I am just catching up on this thread. I did not read everything so far,
>>> but want to share couple of initial thoughts:
>>>
>>>
>>>
>>> Headers: I think there is a fundamental difference between header usage
>>> in this KIP and KP-258. For 258, we add headers to changelog topic that
>>> are owned by Kafka Streams and nobody else is supposed to write into
>>> them. In fact, no user header are written into the changelog topic and
>>> thus, there are not conflicts.
>>>
>>> Nevertheless, I don't see a big issue with using headers within Streams.
>>> As long as we document it, we can have some "reserved" header keys and
>>> users are not allowed to use when processing data with Kafka Streams.
>>> IMHO, this should be ok.
>>>
>>> I think there is a safe way to avoid conflicts, since these headers are
>>>> only needed in internal topics (I think):
>>>> For internal and changelog topics, we can namespace all headers:
>>>> * user-defined headers are namespaced as "external." + headerKey
>>>> * internal headers are namespaced as "internal." + headerKey
>>>>
>>> While name spacing would be possible, it would require to deserialize
>>> user headers what implies a runtime overhead. I would suggest to no
>>> namespace for now to avoid the overhead. If this becomes a problem in
>>> the future, we can still add name spacing later on.
>>>
>>>
>>>
>>> My main concern about the design it the type of the result KTable: If I
>>> understood the proposal correctly,
>>>
>>> KTable<K1,V1> table1 = ...
>>> KTable<K2,V2> table2 = ...
>>>
>>> KTable<K1,V3> joinedTable = table1.join(table2,...);
>>>
>>> implies that the `joinedTable` has the same key as the left input table.
>>> IMHO, this does not work because if table2 contains multiple rows that
>>> join with a record in table1 (what is the main purpose of a foreign key
>>> join), the result table would only contain a single join result, but not
>>> multiple.
>>>
>>> Example:
>>>
>>> table1 input stream: <A,X>
>>> table2 input stream: <a,(A,1)>, <b,(A,2)>
>>>
>>> We use table2 value a foreign key to table1 key (ie, "A" joins). If the
>>> result key is the same key as key of table1, this implies that the
>>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both.
>>> Because the share the same key, whatever result record we emit later,
>>> overwrite the previous result.
>>>
>>> This is the reason why Jan originally proposed to use a combination of
>>> both primary keys of the input tables as key of the output table. This
>>> makes the keys of the output table unique and we can store both in the
>>> output table:
>>>
>>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)>
>>>
>>>
>>> Thoughts?
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>>
>>> On 9/4/18 1:36 PM, Jan Filipiak wrote:
>>>
>>>> Just on remark here.
>>>> The high-watermark could be disregarded. The decision about the forward
>>>> depends on the size of the aggregated map.
>>>> Only 1 element long maps would be unpacked and forwarded. 0 element maps
>>>> would be published as delete. Any other count
>>>> of map entries is in "waiting for correct deletes to arrive"-state.
>>>>
>>>> On 04.09.2018 21:29, Adam Bellemare wrote:
>>>>
>>>>> It does look like I could replace the second repartition store and
>>>>> highwater store with a groupBy and reduce.  However, it looks like I
>>>>> would
>>>>> still need to store the highwater value within the materialized store,
>>>>>
>>>> to
>>>
>>>> compare the arrival of out-of-order records (assuming my understanding
>>>>>
>>>> of
>>>
>>>> THIS is correct...). This in effect is the same as the design I have
>>>>>
>>>> now,
>>>
>>>> just with the two tables merged together.
>>>>>
>>>>
>>>
>

Reply via email to