Hi Adam,

I love how you are on to this already! I resolve this by "key-widening" I treat the result of FKA,and FKB differently. As you can see the output of my join has a Combined Key and therefore I can resolve the "race condition" in a group by
if I so desire.

I think this reflects more what happens under the hood and makes it more clear to the user what is going on. The Idea of hiding this behind metadata and handle it in the DSL is from my POV unideal.

To write into your example:

key + A, null)
(key +B, <joined On FK =B>)

is what my output would look like.


Hope that makes sense :D

Best Jan




On 13.08.2018 18:16, Adam Bellemare wrote:
Hi Jan

If you do not use headers or other metadata, how do you ensure that changes
to the foreign-key value are not resolved out-of-order?
ie: If an event has FK = A, but you change it to FK = B, you need to
propagate both a delete (FK=A -> null) and an addition (FK=B). In my
solution, without maintaining any metadata, it is possible for the final
output to be in either order - the correctly updated joined value, or the
null for the delete.

(key, null)
(key, <joined On FK =B>)

or

(key, <joined On FK =B>)
(key, null)

I looked back through your code and through the discussion threads, and
didn't see any information on how you resolved this. I have a version of my
code working for 2.0, I am just adding more integration tests and will
update the KIP accordingly. Any insight you could provide on resolving
out-of-order semantics without metadata would be helpful.

Thanks
Adam


On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi,

Happy to see that you want to make an effort here.

Regarding the ProcessSuppliers I couldn't find a way to not rewrite the
joiners + the merger.
The re-partitioners can be reused in theory. I don't know if repartition
is optimized in 2.0 now.

I made this
https://cwiki.apache.org/confluence/display/KAFKA/KIP-241+
KTable+repartition+with+compacted+Topics
back then and we are running KIP-213 with KIP-241 in combination.

For us it is vital as it minimized the size we had in our repartition
topics plus it removed the factor of 2 in events on every message.
I know about this new  "delete once consumer has read it".  I don't think
241 is vital for all usecases, for ours it is. I wanted
to use 213 to sneak in the foundations for 241 aswell.

I don't quite understand what a PropagationWrapper is, but I am certain
that you do not need RecordHeaders
for 213 and I would try to leave them out. They either belong to the DSL
or to the user, having a mixed use is
to be avoided. We run the join with 0.8 logformat and I don't think one
needs more.

This KIP will be very valuable for the streams project! I couldn't never
convince myself to invest into the 1.0+ DSL
as I used almost all my energy to fight against it. Maybe this can also
help me see the good sides a little bit more.

If there is anything unclear with all the text that has been written, feel
free to just directly cc me so I don't miss it on
the mailing list.

Best Jan





On 08.08.2018 15:26, Adam Bellemare wrote:

More followup, and +dev as Guozhang replied to me directly previously.

I am currently porting the code over to trunk. One of the major changes
since 1.0 is the usage of GraphNodes. I have a question about this:

For a foreignKey joiner, should it have its own dedicated node type? Or
would it be advisable to construct it from existing GraphNode components?
For instance, I believe I could construct it from several
OptimizableRepartitionNode, some SinkNode, some SourceNode, and several
StatefulProcessorNode. That being said, there is some underlying
complexity
to each approach.

I will be switching the KIP-213 to use the RecordHeaders in Kafka Streams
instead of the PropagationWrapper, but conceptually it should be the same.

Again, any feedback is welcomed...


On Mon, Jul 30, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com
wrote:

Hi Guozhang et al
I was just reading the 2.0 release notes and noticed a section on Record
Headers.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API

I am not yet sure if the contents of a RecordHeader is propagated all the
way through the Sinks and Sources, but if it is, and if it remains
attached
to the record (including null records) I may be able to ditch the
propagationWrapper for an implementation using RecordHeader. I am not yet
sure if this is doable, so if anyone understands RecordHeader impl better
than I, I would be happy to hear from you.

In the meantime, let me know of any questions. I believe this PR has a
lot
of potential to solve problems for other people, as I have encountered a
number of other companies in the wild all home-brewing their own
solutions
to come up with a method of handling relational data in streams.

Adam


On Fri, Jul 27, 2018 at 1:45 AM, Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Adam,
Thanks for rebooting the discussion of this KIP ! Let me finish my pass
on the wiki and get back to you soon. Sorry for the delays..

Guozhang

On Tue, Jul 24, 2018 at 6:08 AM, Adam Bellemare <
adam.bellem...@gmail.com

wrote:
Let me kick this off with a few starting points that I would like to
generate some discussion on.

1) It seems to me that I will need to repartition the data twice - once
on
the foreign key, and once back to the primary key. Is there anything I
am
missing here?

2) I believe I will also need to materialize 3 state stores: the
prefixScan
SS, the highwater mark SS (for out-of-order resolution) and the final
state
store, due to the workflow I have laid out. I have not thought of a
better
way yet, but would appreciate any input on this matter. I have gone
back
through the mailing list for the previous discussions on this KIP, and
I
did not see anything relating to resolving out-of-order compute. I
cannot
see a way around the current three-SS structure that I have.

3) Caching is disabled on the prefixScan SS, as I do not know how to
resolve the iterator obtained from rocksDB with that of the cache. In
addition, I must ensure everything is flushed before scanning. Since
the
materialized prefixScan SS is under "control" of the function, I do not
anticipate this to be a problem. Performance throughput will need to be
tested, but as Jan observed in his initial overview of this issue, it
is
generally a surge of output events which affect performance moreso than
the
flush or prefixScan itself.

Thoughts on any of these are greatly appreciated, since these elements
are
really the cornerstone of the whole design. I can put up the code I
have
written against 1.0.2 if we so desire, but first I was hoping to just
tackle some of the fundamental design proposals.

Thanks,
Adam



On Mon, Jul 23, 2018 at 10:05 AM, Adam Bellemare <
adam.bellem...@gmail.com>
wrote:

Here is the new discussion thread for KIP-213. I picked back up on the
KIP

as this is something that we too at Flipp are now running in

production.

Jan started this last year, and I know that Trivago is also using

something

similar in production, at least in terms of APIs and functionality.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
213+Support+non-key+joining+in+KTable

I do have an implementation of the code for Kafka 1.0.2 (our local
production version) but I won't post it yet as I would like to focus

on the

workflow and design first. That being said, I also need to add some

clearer

integration tests (I did a lot of testing using a non-Kafka Streams
framework) and clean up the code a bit more before putting it in a PR
against trunk (I can do so later this week likely).

Please take a look,

Thanks

Adam Bellemare



--
-- Guozhang



Reply via email to