One last question. What should happen for the following case:

KStream myStream = build.stream(...).map(...);
myStream.markAsPartiitoned().groupByKey().aggregate(...);
myStream.join(...)

The question is about the "fan-out" pattern. `myStream`, which is marked for partitioning, is fed into two downstream operations. Thus, it's clear that the aggregation won't trigger a rebalance. However, the fan-out happens before `markAsRepartiitoned` and thus I would assume that the join would trigger a repartitioning?

This question is important, because if we follow what I said above, `markAsRepartiitoned` returns a new KStream object, but does mutate the upstream KStream object, what is semantically two different things. It also has an impact on how we need to implement the feature. The KIP should explicitly explain this case.


-Matthias

On 7/26/23 4:58 PM, Shay Lin wrote:
Hi John,

Thanks for your reply. I updated the KIP to reflect the changes we
discussed in the thread today.
#1 is duly noted, I learned from the examples Sophie sent earlier! =)

In the new version, I also talked about why IQ and joins will not work with
the interface and talked about the mitigation. The proposal
now specifically states we are solving the unneeded partition problem when
IQ or join does not coexist in the kafka streams. In the concerns section,
the proposal talks about having a reverse mapping would make this new
interface compatible with IQ and join again but is subject to demand.

Let me know what you think. Thanks!
Shay



On Wed, Jul 26, 2023 at 2:35 PM John Roesler <vvcep...@apache.org> wrote:

Hello Shay,

Thanks for the KIP!

I just took a look in preparation to vote, and there are two small-ish
things that I'd like to fix first. Apologies if this stuff has already come
up in the discussion thread; I only skimmed it.

1. The KIP only mentions the name of the method instead of providing a
code snippet showing exactly what the method signature will be in the
interface. Normally, KIPs do the latter because it removes all ambiguity
from the proposal. It also gives you an opportunity to write down the
Javadoc you would add to the method instead of just mentioning the points
that you plan to document.

2. The KIP lists some concerns, but not what you will do to mitigate them.
For example, the concern about IQ not behaving correctly. Will you disable
the use of the implicit partitioner downstream of one of these
cancellations? Or provide a new interface to supply the "reverse mapping"
you mentioned? Or include documentation in the Javadoc for how to deal with
the situation? I think there are a range of options for each of those
concerns, and we should state up front what we plan to do.

Thanks again!
-John

On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
Thanks Shay! You and Matthias have convinced me, I'm happy with the
current
proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote:

Hi Sophie and Matthias, thanks for your comments and replies.

1. Scope of change: KStreams only or KStreams/KTable
I took some time to digest your points, looking through how KStreams
triggers repartitions today. I noticed that `repartitionRequired`is a
flag
in KStreamImpl etc and not in KTableImpl etc. When I look further, in
the
case of KTable, instead of passing in a boolean flag, a repartition
node `
TableRepartitionMapNode` is directly created. I went back and
referenced
the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
focused on KStreams, i.e. not to change the partition why the input
streams
are already correctly keyed. Is it possible that in the case of KTable,
users always intend to repartition (change key) when they call on
aggregate? -- (this was written before I saw Matthias's comment)

Overall, based on the tickets, I see the benefit of doing a contained
change focusing on KStreams, i.e. repartitionRequired, which would
solve
the pain points nicely. If we ran into similar complaints/optimization
requests for KTable down the line, we can address them on top of
this(let
me know if we have these requests already, I might just be negligent).

2. API: markAsPartitioned() vs config
If we go with the KStreams only scope, markAsPartition() is more
adequate, i.e. maps nicely to repartitionRequired. There is a list of
NamedOperations that may or may not trigger repartition based on its
context(KStreams or KTable) which would make the implementation more
confusing.

3. KIP documentation: Thanks for providing the links to previous KIPs.
I
will be adding the three use cases and javadoc. I will also document
the
risks when it relates to IQ and Join.

Best,
Shay

On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org>
wrote:

I agree that it could easily be misused. There is a few Jira tickets
for
cases when people want to "cancel" a repartition step. I would hope
those tickets are linked to the KIP (if not, we should do this, and
maybe even c&p those cases as motivation into the KIP itself)?

It's always a tricky question to what extend we want to guide users,
and
to what extend we need to give levers for advances case (and how to
design those levers...) It's for sure a good idea to call out "use
with
case" in the JavaDocs for the new method.


-Matthias

On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
I guess I felt a bit uneasy about how this could be used/abused
while
reading the KIP, but if we truly believe this is an advanced
feature,
I'm
fine with the way things currently are. It doesn't feel like the
best
API,
but it does seem to be the best *possible* API given the way things
are.

W.r.t the KTable notes, that all makes sense to me. I just wanted
to
lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I
will
actually wait on before feeling comfortable enough to vote on this
would
be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too
much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org>
wrote:

Some thought about the API question.


A. kstream.groupBy(...).aggregate(...)

This can be re-writtten as

kstream.selectKey(...)
          .markAsRepartitioned()
          .groupByKey()
          .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it
would
be ok?


B. ktable.groupBy(...).aggregate(...)

For KTable aggregation, not sure how useful it would be? In the
end,
an
table aggregation does only make sense if we pick something from
the
value, ie, we indeed change the key?


C. kstream.selectKey(...).join(ktable)

We can just insert a `markAsRepartitioned()` after `selectKey` to
avoid
repartitioning of the left input KStream.


KStream.selectKey(...).toTable().join(...)

Not sure if I understand what you try to say with this example?
In the
end, `selectKey(...).toTable()` would repartiton. If I know that
one
can
upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the
right
"format", or there is no key, but data was partitioned by a
value-attribute upstream and we just want to extract this
value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
Hey Shay, while I don't have any specific concerns about the new
public
API
in this KIP, I'd like to better understand how this feature will
work
before I vote. We should document the behavior of this new
operator
clearly
in the KIP as well -- you don't necessarily need to write the
complete
javadocs up front, but it should be possible for a user to read
the
KIP
and
then understand how this feature will work and how they would
need to
apply
it.

To that end, I recommend framing this proposal with a few
examples to
help
clarify the semantics. When and where can you apply the
markAsPartitioned()
operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka
Streams
(selectKey, map, transform, etc.) now leads to automatic
repartition
before
an aggregation." We should change "aggregation" to "stateful
operation"
as
this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it
says
this
should not be a concern "if we use markAsPartitioned correctly".
Does
this
mean if we, the devs implementing this, write the feature
correctly?
Or
is
it saying that this won't be a problem as long as "we", the
users of
this
feature, use it correctly"? Just wondering if you've put any
thought
into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if
there's
only
one new method. Check out this KIP
<



https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

(or this KIP
<



https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
)
for a good reference on what the Public Interfaces section should
include
4. Regarding the proposed API itself, I wonder if KStream is
really
the
most appropriate interface for the new operator. A repartition
can be
triggered on just a KTable. Here's where some examples would
help.
Perhaps
we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional
vital
examples, but at the very least, these are the three to consider:
either
a
KStream or KTable followed by a groupBy/aggregation, or a KStream
with
key-changing operator followed by a join. Note that you could
have
something like KStream.selectKey(...).toTable().join(...) as
well,
but
since there are no pure key-changing operators (like #selectKey)
on
KTables, only groupBy() which must always be followed by
aggregation,
this
4th case can be reduced to an example like C of a KStream with
key-changing
operation and downstream join -- ie there's no way to do this
without
#toTable which is more like syntactic sugar for the purposes of
this
repartitioning discussion.

I worry that making this a DSL operator on KStream is too
generic,
and
we
would also need to add it to KTable for example B, despite
KTables
not
having any true pure key-changing operators outside of #groupBy.
Would
we
throw an exception if you invoked #markAsPartitioned on a KTable
that
wasn't followed by a groupBy? If you have multiple key-changing
operators,
would you need to add markAsPartitioned after each one? If not,
what
are
the semantics of that?  These are the main questions that got me
thinking
here, and will definitely need to be clarified in the KIP if we
do go
with
the current proposal. But I wanted to throw out another idea for
an
API I
think would help with some of this awkwardness by having clearly
defined
semantics:

Fundamentally it seems to me that these issues are arising from
that
"being
partitioned" is conceptually a property of other operations
applied
to
a
KStream/KTable, rather than an operation itself. So rather than
making
this
a DSL operator itself, what if we added it to the Grouped and
various
Joined configuration classes? It would allow us to more
carefully hit
only
the relevant parts of the DSL, so there are no questions about
whether/when
to throw errors when the operator is incorrectly applied -- there
would
be
no way to apply it incorrectly. The main drawback I can think of
is
simply
that this touches on a larger surface area of the API. I
personally
don't
believe this is a good enough reason to make it a DSL operator
as one
could
make that argument for nearly any kind of KStream or KTable
operator
configuration going forward, and would explode the
KStream/KTable API
surface area instead. Perhaps this was discussed during the
previous
iteration of this KIP, or I'm missing something here, so I just
wanted
to
put this out there and see what people think

Either way, thanks for picking up this KIP. It's been a long time
coming
:)

-Sophie





On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com>
wrote:

Hi all,

It's been a few days so I went ahead with editing the KIP, the
main
change
is on the method name





https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
.
I will follow up with a VOTE separately.

Best,
Shay

On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <
mj...@apache.org>
wrote:

Shay,

thanks for picking up this KIP. It's a pity that the discussion
stalled
for such a long time.

As expressed previously, I am happy with the name
`markAsPartitioned()`
and also believe it's ok to just document the impact and leave
it
to
the
user to do the right thing.

If we really get a lot of users that ask about it, because
they did
not
do the right thing, we could still add something (eg, a
reverse-mapper
function) in a follow-up KIP. But we don't know if it's
necessary;
thus,
making a small incremental step sounds like a good approach to
me.

Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:
Hi all,

Great discussion thread. May I take this KIP up? If it’s
alright
my
plan
is
to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications
to
downstream
joins or aggregation operations. Still, the operator is
intended
for
advanced users so my two cents is it would be a valuable
addition
nonetheless. We could add this as a caution/consideration as
part
of
the
java doc.

Let me know, thanks.
Shay











Reply via email to