I'm currently testing using a Windowed Store to store the highwater mark. By all indications this should work fine, with the caveat being that it can only resolve out-of-order arrival for up to the size of the window (ie: 24h, 72h, etc). This would remove the possibility of it being unbounded in size.
With regards to Jan's suggestion, I believe this is where we will have to remain in disagreement. While I do not disagree with your statement about there likely to be additional joins done in a real-world workflow, I do not see how you can conclusively deal with out-of-order arrival of foreign-key changes and subsequent joins. I have attempted what I think you have proposed (without a high-water, using groupBy and reduce) and found that if the foreign key changes too quickly, or the load on a stream thread is too high, the joined messages will arrive out-of-order and be incorrectly propagated, such that an intermediate event is represented as the final event. Repartitioning the scattered events back to their original partitions is the only way I know how to conclusively deal with out-of-order events in a given time frame, and to ensure that the data is eventually consistent with the input events. If you have some code to share that illustrates your approach, I would be very grateful as it would remove any misunderstandings that I may have. Thanks, Adam On Wed, Sep 5, 2018 at 3:35 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Thanks Adam for bringing Matthias to speed! > > about the differences. I think re-keying back should be optional at best. > I would say we return a KScatteredTable with reshuffle() returning > KTable<originalKey,Joined> to make the backwards repartitioning optional. > I am also in a big favour of doing the out of order processing using group > by instead high water mark tracking. > Just because unbounded growth is just scary + It saves us the header stuff. > > I think the abstraction of always repartitioning back is just not so > strong. Like the work has been done before we partition back and grouping > by something else afterwards is really common. > > > > > > > On 05.09.2018 13:49, Adam Bellemare wrote: > >> Hi Matthias >> >> Thank you for your feedback, I do appreciate it! >> >> While name spacing would be possible, it would require to deserialize >>> user headers what implies a runtime overhead. I would suggest to no >>> namespace for now to avoid the overhead. If this becomes a problem in >>> the future, we can still add name spacing later on. >>> >> Agreed. I will go with using a reserved string and document it. >> >> >> >> My main concern about the design it the type of the result KTable: If I >>> >> understood the proposal correctly, >> >> >> In your example, you have table1 and table2 swapped. Here is how it works >> currently: >> >> 1) table1 has the records that contain the foreign key within their value. >> table1 input stream: <a,(fk=A,bar=1)>, <b,(fk=A,bar=2)>, <c,(fk=B,bar=3)> >> table2 input stream: <A,X>, <B,Y> >> >> 2) A Value mapper is required to extract the foreign key. >> table1 foreign key mapper: ( value => value.fk ) >> >> The mapper is applied to each element in table1, and a new combined key is >> made: >> table1 mapped: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)>, <B-c, >> (fk=B,bar=3)> >> >> 3) The rekeyed events are copartitioned with table2: >> >> a) Stream Thread with Partition 0: >> RepartitionedTable1: <A-a, (fk=A,bar=1)>, <A-b, (fk=A,bar=2)> >> Table2: <A,X> >> >> b) Stream Thread with Partition 1: >> RepartitionedTable1: <B-c, (fk=B,bar=3)> >> Table2: <B,Y> >> >> 4) From here, they can be joined together locally by applying the joiner >> function. >> >> >> >> At this point, Jan's design and my design deviate. My design goes on to >> repartition the data post-join and resolve out-of-order arrival of >> records, >> finally returning the data keyed just the original key. I do not expose >> the >> CombinedKey or any of the internals outside of the joinOnForeignKey >> function. This does make for larger footprint, but it removes all agency >> for resolving out-of-order arrivals and handling CombinedKeys from the >> user. I believe that this makes the function much easier to use. >> >> Let me know if this helps resolve your questions, and please feel free to >> add anything else on your mind. >> >> Thanks again, >> Adam >> >> >> >> >> On Tue, Sep 4, 2018 at 8:36 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >> Hi, >>> >>> I am just catching up on this thread. I did not read everything so far, >>> but want to share couple of initial thoughts: >>> >>> >>> >>> Headers: I think there is a fundamental difference between header usage >>> in this KIP and KP-258. For 258, we add headers to changelog topic that >>> are owned by Kafka Streams and nobody else is supposed to write into >>> them. In fact, no user header are written into the changelog topic and >>> thus, there are not conflicts. >>> >>> Nevertheless, I don't see a big issue with using headers within Streams. >>> As long as we document it, we can have some "reserved" header keys and >>> users are not allowed to use when processing data with Kafka Streams. >>> IMHO, this should be ok. >>> >>> I think there is a safe way to avoid conflicts, since these headers are >>>> only needed in internal topics (I think): >>>> For internal and changelog topics, we can namespace all headers: >>>> * user-defined headers are namespaced as "external." + headerKey >>>> * internal headers are namespaced as "internal." + headerKey >>>> >>> While name spacing would be possible, it would require to deserialize >>> user headers what implies a runtime overhead. I would suggest to no >>> namespace for now to avoid the overhead. If this becomes a problem in >>> the future, we can still add name spacing later on. >>> >>> >>> >>> My main concern about the design it the type of the result KTable: If I >>> understood the proposal correctly, >>> >>> KTable<K1,V1> table1 = ... >>> KTable<K2,V2> table2 = ... >>> >>> KTable<K1,V3> joinedTable = table1.join(table2,...); >>> >>> implies that the `joinedTable` has the same key as the left input table. >>> IMHO, this does not work because if table2 contains multiple rows that >>> join with a record in table1 (what is the main purpose of a foreign key >>> join), the result table would only contain a single join result, but not >>> multiple. >>> >>> Example: >>> >>> table1 input stream: <A,X> >>> table2 input stream: <a,(A,1)>, <b,(A,2)> >>> >>> We use table2 value a foreign key to table1 key (ie, "A" joins). If the >>> result key is the same key as key of table1, this implies that the >>> result can either be <A, join(X,1)> or <A, join(X,2)> but not both. >>> Because the share the same key, whatever result record we emit later, >>> overwrite the previous result. >>> >>> This is the reason why Jan originally proposed to use a combination of >>> both primary keys of the input tables as key of the output table. This >>> makes the keys of the output table unique and we can store both in the >>> output table: >>> >>> Result would be <A-a, join(X,1)>, <A-b, join(X,2)> >>> >>> >>> Thoughts? >>> >>> >>> -Matthias >>> >>> >>> >>> >>> On 9/4/18 1:36 PM, Jan Filipiak wrote: >>> >>>> Just on remark here. >>>> The high-watermark could be disregarded. The decision about the forward >>>> depends on the size of the aggregated map. >>>> Only 1 element long maps would be unpacked and forwarded. 0 element maps >>>> would be published as delete. Any other count >>>> of map entries is in "waiting for correct deletes to arrive"-state. >>>> >>>> On 04.09.2018 21:29, Adam Bellemare wrote: >>>> >>>>> It does look like I could replace the second repartition store and >>>>> highwater store with a groupBy and reduce. However, it looks like I >>>>> would >>>>> still need to store the highwater value within the materialized store, >>>>> >>>> to >>> >>>> compare the arrival of out-of-order records (assuming my understanding >>>>> >>>> of >>> >>>> THIS is correct...). This in effect is the same as the design I have >>>>> >>>> now, >>> >>>> just with the two tables merged together. >>>>> >>>> >>> >