Hi Jan

If you do not use headers or other metadata, how do you ensure that changes
to the foreign-key value are not resolved out-of-order?
ie: If an event has FK = A, but you change it to FK = B, you need to
propagate both a delete (FK=A -> null) and an addition (FK=B). In my
solution, without maintaining any metadata, it is possible for the final
output to be in either order - the correctly updated joined value, or the
null for the delete.

(key, null)
(key, <joined On FK =B>)

or

(key, <joined On FK =B>)
(key, null)

I looked back through your code and through the discussion threads, and
didn't see any information on how you resolved this. I have a version of my
code working for 2.0, I am just adding more integration tests and will
update the KIP accordingly. Any insight you could provide on resolving
out-of-order semantics without metadata would be helpful.

Thanks
Adam


On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi,
>
> Happy to see that you want to make an effort here.
>
> Regarding the ProcessSuppliers I couldn't find a way to not rewrite the
> joiners + the merger.
> The re-partitioners can be reused in theory. I don't know if repartition
> is optimized in 2.0 now.
>
> I made this
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+
> KTable+repartition+with+compacted+Topics
> back then and we are running KIP-213 with KIP-241 in combination.
>
> For us it is vital as it minimized the size we had in our repartition
> topics plus it removed the factor of 2 in events on every message.
> I know about this new  "delete once consumer has read it".  I don't think
> 241 is vital for all usecases, for ours it is. I wanted
> to use 213 to sneak in the foundations for 241 aswell.
>
> I don't quite understand what a PropagationWrapper is, but I am certain
> that you do not need RecordHeaders
> for 213 and I would try to leave them out. They either belong to the DSL
> or to the user, having a mixed use is
> to be avoided. We run the join with 0.8 logformat and I don't think one
> needs more.
>
> This KIP will be very valuable for the streams project! I couldn't never
> convince myself to invest into the 1.0+ DSL
> as I used almost all my energy to fight against it. Maybe this can also
> help me see the good sides a little bit more.
>
> If there is anything unclear with all the text that has been written, feel
> free to just directly cc me so I don't miss it on
> the mailing list.
>
> Best Jan
>
>
>
>
>
> On 08.08.2018 15:26, Adam Bellemare wrote:
>
>> More followup, and +dev as Guozhang replied to me directly previously.
>>
>> I am currently porting the code over to trunk. One of the major changes
>> since 1.0 is the usage of GraphNodes. I have a question about this:
>>
>> For a foreignKey joiner, should it have its own dedicated node type? Or
>> would it be advisable to construct it from existing GraphNode components?
>> For instance, I believe I could construct it from several
>> OptimizableRepartitionNode, some SinkNode, some SourceNode, and several
>> StatefulProcessorNode. That being said, there is some underlying
>> complexity
>> to each approach.
>>
>> I will be switching the KIP-213 to use the RecordHeaders in Kafka Streams
>> instead of the PropagationWrapper, but conceptually it should be the same.
>>
>> Again, any feedback is welcomed...
>>
>>
>> On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com
>> >
>> wrote:
>>
>> Hi Guozhang et al
>>>
>>> I was just reading the 2.0 release notes and noticed a section on Record
>>> Headers.
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API
>>>
>>> I am not yet sure if the contents of a RecordHeader is propagated all the
>>> way through the Sinks and Sources, but if it is, and if it remains
>>> attached
>>> to the record (including null records) I may be able to ditch the
>>> propagationWrapper for an implementation using RecordHeader. I am not yet
>>> sure if this is doable, so if anyone understands RecordHeader impl better
>>> than I, I would be happy to hear from you.
>>>
>>> In the meantime, let me know of any questions. I believe this PR has a
>>> lot
>>> of potential to solve problems for other people, as I have encountered a
>>> number of other companies in the wild all home-brewing their own
>>> solutions
>>> to come up with a method of handling relational data in streams.
>>>
>>> Adam
>>>
>>>
>>> On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>> Hello Adam,
>>>>
>>>> Thanks for rebooting the discussion of this KIP ! Let me finish my pass
>>>> on the wiki and get back to you soon. Sorry for the delays..
>>>>
>>>> Guozhang
>>>>
>>>> On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
>>>> adam.bellem...@gmail.com
>>>>
>>>>> wrote:
>>>>> Let me kick this off with a few starting points that I would like to
>>>>> generate some discussion on.
>>>>>
>>>>> 1) It seems to me that I will need to repartition the data twice - once
>>>>> on
>>>>> the foreign key, and once back to the primary key. Is there anything I
>>>>> am
>>>>> missing here?
>>>>>
>>>>> 2) I believe I will also need to materialize 3 state stores: the
>>>>> prefixScan
>>>>> SS, the highwater mark SS (for out-of-order resolution) and the final
>>>>> state
>>>>> store, due to the workflow I have laid out. I have not thought of a
>>>>> better
>>>>> way yet, but would appreciate any input on this matter. I have gone
>>>>> back
>>>>> through the mailing list for the previous discussions on this KIP, and
>>>>> I
>>>>> did not see anything relating to resolving out-of-order compute. I
>>>>> cannot
>>>>> see a way around the current three-SS structure that I have.
>>>>>
>>>>> 3) Caching is disabled on the prefixScan SS, as I do not know how to
>>>>> resolve the iterator obtained from rocksDB with that of the cache. In
>>>>> addition, I must ensure everything is flushed before scanning. Since
>>>>> the
>>>>> materialized prefixScan SS is under "control" of the function, I do not
>>>>> anticipate this to be a problem. Performance throughput will need to be
>>>>> tested, but as Jan observed in his initial overview of this issue, it
>>>>> is
>>>>> generally a surge of output events which affect performance moreso than
>>>>> the
>>>>> flush or prefixScan itself.
>>>>>
>>>>> Thoughts on any of these are greatly appreciated, since these elements
>>>>> are
>>>>> really the cornerstone of the whole design. I can put up the code I
>>>>> have
>>>>> written against 1.0.2 if we so desire, but first I was hoping to just
>>>>> tackle some of the fundamental design proposals.
>>>>>
>>>>> Thanks,
>>>>> Adam
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
>>>>> adam.bellem...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Here is the new discussion thread for KIP-213. I picked back up on the
>>>>>>
>>>>> KIP
>>>>>
>>>>>> as this is something that we too at Flipp are now running in
>>>>>>
>>>>> production.
>>>>>
>>>>>> Jan started this last year, and I know that Trivago is also using
>>>>>>
>>>>> something
>>>>>
>>>>>> similar in production, at least in terms of APIs and functionality.
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 213+Support+non-key+joining+in+KTable
>>>>>>
>>>>>> I do have an implementation of the code for Kafka 1.0.2 (our local
>>>>>> production version) but I won't post it yet as I would like to focus
>>>>>>
>>>>> on the
>>>>>
>>>>>> workflow and design first. That being said, I also need to add some
>>>>>>
>>>>> clearer
>>>>>
>>>>>> integration tests (I did a lot of testing using a non-Kafka Streams
>>>>>> framework) and clean up the code a bit more before putting it in a PR
>>>>>> against trunk (I can do so later this week likely).
>>>>>>
>>>>>> Please take a look,
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Adam Bellemare
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>>
>>>
>

Reply via email to