I was completely brain-dead in my mail before.
Completly missed that you already repartition back for the user and only apply the high water mark filtering after the second repartition source.
I missed the sink / source bounce while poking, sorry for the confusion.

Yes a group by can give the same semantics w/o the ever growing highwatermark store. I made it optional so that when users wanna group by something different after the join they still can and it will shuffle 1 times less.

I felt that was / is usefully and we sometimes exploit this facts in some of our joins, sometimes we do aggregate back to
the "original key" aswell.

I have a hard time estimating the implications of the ever growing highwatermark store. From the top of my head there wouldn't be a current use case where it would be a concern for us. That might be different for other users though.

Sorry for the confusion!

Best Jan

PS planning to put the comments regarding java stuff on the PR.




On 04.09.2018 21:29, Adam Bellemare wrote:
Yep, I definitely misunderstood some of the groupBy and groupByKey
functionality. I would say disregard what I said in my previous email
w.r.t. my assumptions about record size. I was looking into the code more
today and I did not understand it correctly the first time I read it.

It does look like I could replace the second repartition store and
highwater store with a groupBy and reduce.  However, it looks like I would
still need to store the highwater value within the materialized store, to
compare the arrival of out-of-order records (assuming my understanding of
THIS is correct...). This in effect is the same as the design I have now,
just with the two tables merged together.

I will keep looking at this but I am not seeing a great simplification.
Advice and comments are welcomed as always.

On Tue, Sep 4, 2018 at 9:38 AM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

As I was looking more into RocksDB TTL, I see that we currently do not
support it in Kafka Streams due to a number of technical reasons. As I
don't think that I will be tackling that JIRA at the moment, the current
implementation is indeed unbounded in the highwater table growth.

An alternate option may be to replace the highwater mark table with a
groupBy and then perform a reduce/aggregate. My main concern here is that
technically we could have an unbounded amount of data to group together by
key, and the grouped size could exceed the Kafka maximum record size. When
I built the highwater mark table my intent was to work around this
possibility, as each record is evaluated independently and record sizing
issues do not come into play. If I am incorrect in this assumption, please
correct me, because I am a bit fuzzy on exactly how the groupBy currently
works.

Any thoughts on this are appreciated. I will revisit it again when I have
a bit more time.

Thanks



On Mon, Sep 3, 2018 at 4:55 PM, Adam Bellemare <adam.bellem...@gmail.com>
wrote:

Hi Jan

Thank you for taking the time to look into my PR. I have updated it
accordingly along with the suggestions from John. Please note that I am by
no means an expert on Java, so I really do appreciate any Java-specific
feedback you may have. Do not worry about being overly verbose on it.

You are correct with regards to the highwater mark growing unbounded. One
option would be to implement the rocksDB TTL to expire records. I am open
to other options as well.

I have tried to detail the reasoning behind it in the KIP - I have added
additional comments and I hope that it is clearer now.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable#KIP-213Supportnon-keyjo
ininginKTable-MultipleRapidForeign-KeyValueChanges-Whyahighw
atertableisrequired.

Please keep in mind that there may be something about ordering guarantees
that I am not aware of. As far as I know, once you begin to operate on
events in parallel across different nodes within the processor API, there
are no ordering guarantees and everything is simple first-come,
first-served(processed). If this is not the case then I am unaware of that
fact.



Thanks

Adam





On Mon, Sep 3, 2018 at 8:38 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Finished my deeper scan on your approach.
Most of the comments I put at the PR are minor code style things.
One forward call seems to be a bug though, would be great if you could
double check.

the one problem I see is that the high watermark store grows unbounded.
A key being deleted from the source table does not lead to deletion in
the watermark store.

I also don't quite grasp the concept why it's needed.  I think the whole
offset part can go away?
It seems to deal with node failures of some kind but everything should
turn out okay without it?

Best Jan


On 01.09.2018 20:44, Guozhang Wang wrote:

Yes Adam, that makes sense.

I think it may be better to have a working PR to review before we
complete
the VOTE thread. In my previous experience a large feature like this are
mostly definitely going to miss some devils in the details in the design
and wiki discussion phases.

That would unfortunately mean that your implementations may need to be
modified / updated along with the review and further KIP discussion. I
can
understand this can be painful, but that may be the best option we can
do
to avoid as much work to be wasted as possible.


Guozhang


On Wed, Aug 29, 2018 at 10:06 AM, Adam Bellemare <
adam.bellem...@gmail.com>
wrote:

Hi Guozhang
By workflow I mean just the overall process of how the KIP is
implemented.
Any ideas on the ways to reduce the topic count, materializations, if
there
is a better way to resolve out-of-order than a highwater mark table,
if the
design philosophy of “keep everything encapsulated within the join
function” is appropriate, etc. I can implement the changes that John
suggested, but if my overall workflow is not acceptable I would rather
address that before making minor changes.

If this requires a full candidate PR ready to go to prod then I can
make
those changes. Hope that clears things up.

Thanks

Adam

On Aug 29, 2018, at 12:42 PM, Guozhang Wang <wangg...@gmail.com>
wrote:

Hi Adam,

What do you mean by "additional comments on the workflow.", do you
mean

to

let other review your PR https://github.com/apache/kafka/pull/5527 ?
Is

is

ready for reviews?


Guozhang

On Tue, Aug 28, 2018 at 5:00 AM, Adam Bellemare <

adam.bellem...@gmail.com>

wrote:

Okay, I will implement John's suggestion of namespacing the external
headers prior to processing, and then removing the namespacing prior
to
emitting. A potential future KIP could be to provide this namespacing
automatically.

I would also appreciate any other additional comments on the
workflow.

My
goal is suss out agreement prior to moving to a vote.
On Mon, Aug 27, 2018 at 3:19 PM, Guozhang Wang <wangg...@gmail.com>
wrote:
I like John's idea as well: for this KIP specifically as we do not
expect
any other consumers to read the repartition topics externally, we can
slightly prefix the header to be safe, while keeping the additional

cost
(note the header field is per-record, so any additional byte is
per-record

as well) low.


Guozhang

On Tue, Aug 21, 2018 at 11:58 AM, Adam Bellemare <

adam.bellem...@gmail.com

wrote:

Hi John
That is an excellent idea. The header usage I propose would be
limited
entirely to internal topics, and this could very well be the
solution

to
potential conflicts. If we do not officially reserve a prefix "__"
then I
think this would be the safest idea, as it would entirely avoid any
accidents (perhaps if a company is using its own "__" prefix for
other
reasons).

Thanks

Adam


On Tue, Aug 21, 2018 at 2:24 PM, John Roesler <j...@confluent.io>

wrote:
Just a quick thought regarding headers:
I think there is no absolute-safe ways to avoid conflicts, but we

can
still
consider using some name patterns to reduce the likelihood as
much

as
possible.. e.g. consider sth. like the internal topics naming: e.g.
"__internal_[name]"?

I think there is a safe way to avoid conflicts, since these
headers

are
only needed in internal topics (I think):
For internal and changelog topics, we can namespace all headers:
* user-defined headers are namespaced as "external." + headerKey
* internal headers are namespaced as "internal." + headerKey

This is a lot of characters, so we could use a sigil instead
(e.g.,

"_"
for
internal, "~" for external)

We simply apply the namespacing when we read user headers from

external
topics into the topology and then de-namespace them before we emit
them
to
an external topic (via "to" or "through").
Now, it is not possible to collide with user-defined headers.

That said, I'd also be fine with just reserving "__" as a header

prefix
and
not worrying about collisions.

Thanks for the KIP,
-John

On Tue, Aug 21, 2018 at 9:48 AM Jan Filipiak <

jan.filip...@trivago.com
wrote:
Still havent completly grabbed it.
sorry will read more

On 17.08.2018 21:23, Jan Filipiak wrote:
Cool stuff.

I made some random remarks. Did not touch the core of the

algorithm
yet.
Will do Monday 100%
I don't see Interactive Queries :) like that!




On 17.08.2018 20:28, Adam Bellemare wrote:
I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for

discussion?
On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:
even before message headers, the option for me always existed
to
just wrap
the messages into my own custom envelop.
So I of course thought this through. One sentence in your last

email
triggered all the thought process I put in the back then
again to design it in the, what i think is the "kafka-way". It

ended
up

ranting a little about what happened in the past.
I see plenty of colleagues of mine falling into traps in the

API,
that I
did warn about in the 1.0 DSL rewrite. I have the same
feeling again. So I hope it gives you some insights into my

though
process. I am aware that since i never ported 213 to higher
streams version, I don't really have a steak here and

initially I
didn't
feel like actually sending it. But maybe you can pull
something good from it.

   Best jan



On 15.08.2018 04:44, Adam Bellemare wrote:
@Jan
Thanks Jan. I take it you mean "key-widening" somehow
includes
information
about which record is processed first? I understand about a
CombinedKey
with both the Foreign and Primary key, but I don't see how
you

track
ordering metadata in there unless you actually included a
metadata
field
in
the key type as well.

@Guozhang
As Jan mentioned earlier, is Record Headers mean to strictly

be
used in
just the user-space? It seems that it is possible that a

collision
on the
(key,value) tuple I wish to add to it could occur. For

instance,
if
I

wanted to add a ("foreignKeyOffset",10) to the Headers but the
user
already
specified their own header with the same key name, then it

appears
there
would be a collision. (This is one of the issues I brought up

in
the KIP).
--------------------------------

I will be posting a prototype PR against trunk within the
next

day
or two.
One thing I need to point out is that my design very strictly

wraps
the
entire foreignKeyJoin process entirely within the DSL

function.
There is
no
exposure of CombinedKeys or widened keys, nothing to resolve

with
regards
to out-of-order processing and no need for the DSL user to

even
know
what's
going on inside of the function. The code simply returns the
results of
the
join, keyed by the original key. Currently my API mirrors
identically the
format of the data returned by the regular join function, and

I
believe
that this is very useful to many users of the DSL. It is my
understanding
that one of the main design goals of the DSL is to provide

higher
level
functionality without requiring the users to know exactly

what's
going on
under the hood. With this in mind, I thought it best to solve
ordering and
partitioning problems within the function and eliminate the
requirement
for
users to do additional work after the fact to resolve the

results
of their
join. Basically, I am assuming that most users of the DSL
just
"want it to
work" and want it to be easy. I did this operating under the
assumption
that if a user truly wants to optimize their own workflow
down

to
the
finest details then they will break from strictly using the
DSL
and
move
down to the processors API.

I think. The abstraction is not powerful enough
to not have kafka specifics leak up The leak I currently think

this
has is
that you can not reliable prevent the delete coming out first,
before you emit the correct new record. As it is an
abstraction
entirely
around kafka.
I can only recommend to not to. Honesty and simplicity should

always
be

first prio
trying to hide this just makes it more complex, less

understandable
and
will lead to mistakes
in usage.

Exactly why I am also in big disfavour of GraphNodes and later
optimization stages.
Can someone give me an example of an optimisation that really

can't
be
handled by the user
constructing his topology differently?
Having reusable Processor API components accessible by the DSL

and
composable as
one likes is exactly where DSL should max out and KSQL should

do
the
next
step.
I find it very unprofessional from a software engineering

approach
to run
software where
you can not at least senseful reason about the inner workings

of
the
libraries used.
Gives this people have to read and understand in anyway, why

try
to

hide
it?

It really miss the beauty of 0.10 version DSL.
Apparently not a thing I can influence but just warn about.

@gouzhang
you can't imagine how many extra IQ-Statestores I constantly

prune
from
stream app's
because people just keep passing Materialized's into all the
operations.
:D :'-(
I regret that I couldn't convince you guys back then. Plus
this

whole
entire topology as a floating
interface chain, never seen it anywhere :-/ :'(

I don't know. I guess this is just me regretting to only have

24h/day.
I updated the KIP today with some points worth talking about,

should
anyone
be so inclined to check it out. Currently we are running this

code
in
production to handle relational joins from our Kafka Connect
topics, as
per
the original motivation of the KIP.










I believe the foreignKeyJoin should be responsible for. In my



On Tue, Aug 14, 2018 at 5:22 PM, Guozhang Wang<

wangg...@gmail.com
wrote:
Hello Adam,

As for your question regarding GraphNodes, it is for

extending
Streams
optimization framework. You can find more details on
https://issues.apache.org/jira/browse/KAFKA-6761.

The main idea is that instead of directly building up the

"physical
topology" (represented as Topology in the public package, and
internally
built as the ProcessorTopology class) while users are

specifying
the
transformation operators, we first keep it as a "logical
topology"
(represented as GraphNode inside InternalStreamsBuilder). And
then
only
execute the optimization and the construction of the

"physical"
Topology
when StreamsBuilder.build() is called.

Back to your question, I think it makes more sense to add a

new
type of
StreamsGraphNode (maybe you can consider inheriting from the
BaseJoinProcessorNode). Note that although in the Topology
we

will
have
multiple connected ProcessorNodes to represent a

(foreign-key)
join, we
still want to keep it as a single StreamsGraphNode, or just
a
couple of
them in the logical representation so that in the future we

can
construct
the physical topology differently (e.g. having another way

than
the
current
distributed hash-join).

-------------------------------------------------------

Back to your questions to KIP-213, I think Jan has
summarized

it
pretty-well. Note that back then we do not have headers
support
so
we

have
to do such "key-widening" approach to ensure ordering.


Guozhang



On Mon, Aug 13, 2018 at 11:39 PM, Jan
Filipiak<jan.filip...@trivago.com>
wrote:

Hi Adam,

I love how you are on to this already! I resolve this by
"key-widening"
I
treat the result of FKA,and FKB differently.
As you can see the output of my join has a Combined Key and
therefore I
can resolve the "race condition" in a group by
if I so desire.

I think this reflects more what happens under the hood and

makes
it more
clear to the user what is going on. The Idea
of hiding this behind metadata and handle it in the DSL is

from
my POV
unideal.

To write into your example:

key + A, null)
(key +B, <joined On FK =B>)

is what my output would look like.


Hope that makes sense :D

Best Jan





On 13.08.2018 18:16, Adam Bellemare wrote:

Hi Jan

If you do not use headers or other metadata, how do you

ensure
that
changes
to the foreign-key value are not resolved out-of-order?
ie: If an event has FK = A, but you change it to FK = B,

you
need to
propagate both a delete (FK=A -> null) and an addition

(FK=B).
In my
solution, without maintaining any metadata, it is possible

for
the
final
output to be in either order - the correctly updated
joined
value, or

the
null for the delete.

(key, null)
(key, <joined On FK =B>)

or

(key, <joined On FK =B>)
(key, null)

I looked back through your code and through the discussion
threads, and
didn't see any information on how you resolved this. I

have a
version
of
my
code working for 2.0, I am just adding more integration

tests
and will
update the KIP accordingly. Any insight you could provide

on
resolving
out-of-order semantics without metadata would be helpful.

Thanks
Adam


On Mon, Aug 13, 2018 at 3:34 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi,

Happy to see that you want to make an effort here.
Regarding the ProcessSuppliers I couldn't find a way to

not
rewrite
the
joiners + the merger.
The re-partitioners can be reused in theory. I don't know

if
repartition
is optimized in 2.0 now.

I made this
https://cwiki.apache.org/confluence/display/KAFKA/KIP-

241+
KTable+repartition+with+compacted+Topics
back then and we are running KIP-213 with KIP-241 in

combination.
For us it is vital as it minimized the size we had in our
repartition
topics plus it removed the factor of 2 in events on every

message.
I know about this new  "delete once consumer has read it".
I
don't
think
241 is vital for all usecases, for ours it is. I wanted

to use 213 to sneak in the foundations for 241 aswell.
I don't quite understand what a PropagationWrapper is,

but I
am
certain
that you do not need RecordHeaders
for 213 and I would try to leave them out. They either

belong
to the
DSL
or to the user, having a mixed use is

to be avoided. We run the join with 0.8 logformat and I
don't
think
one
needs more.

This KIP will be very valuable for the streams project! I

couldn't
never
convince myself to invest into the 1.0+ DSL

as I used almost all my energy to fight against it. Maybe
this
can
also
help me see the good sides a little bit more.

If there is anything unclear with all the text that has

been
written,
feel
free to just directly cc me so I don't miss it on
the mailing list.

Best Jan





On 08.08.2018 15:26, Adam Bellemare wrote:

More followup, and +dev as Guozhang replied to me
directly
previously.

I am currently porting the code over to trunk. One of the
major
changes
since 1.0 is the usage of GraphNodes. I have a question
about
this:
For a foreignKey joiner, should it have its own dedicated
node
type?
Or
would it be advisable to construct it from existing
GraphNode
components?



Reply via email to