Some thought about the API question.

A. kstream.groupBy(...).aggregate(...)

This can be re-writtten as

kstream.selectKey(...)
       .markAsRepartitioned()
       .groupByKey()
       .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would be ok?


B. ktable.groupBy(...).aggregate(...)

For KTable aggregation, not sure how useful it would be? In the end, an table aggregation does only make sense if we pick something from the value, ie, we indeed change the key?


C. kstream.selectKey(...).join(ktable)

We can just insert a `markAsRepartitioned()` after `selectKey` to avoid repartitioning of the left input KStream.


KStream.selectKey(...).toTable().join(...)

Not sure if I understand what you try to say with this example? In the end, `selectKey(...).toTable()` would repartiton. If I know that one can upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the right "format", or there is no key, but data was partitioned by a value-attribute upstream and we just want to extract this value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
Hey Shay, while I don't have any specific concerns about the new public API
in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator clearly
in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP and
then understand how this feature will work and how they would need to apply
it.

To that end, I recommend framing this proposal with a few examples to help
clarify the semantics. When and where can you apply the markAsPartitioned()
operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition before
an aggregation." We should change "aggregation" to "stateful operation" as
this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says this
should not be a concern "if we use markAsPartitioned correctly". Does this
mean if we, the devs implementing this, write the feature correctly? Or is
it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL>
(or this KIP
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808>)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help. Perhaps
we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either a
KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing operator followed by a join. Note that you could have
something like KStream.selectKey(...).toTable().join(...) as well, but
since there are no pure key-changing operators (like #selectKey) on
KTables, only groupBy() which must always be followed by aggregation, this
4th case can be reduced to an example like C of a KStream with key-changing
operation and downstream join -- ie there's no way to do this without
#toTable which is more like syntactic sugar for the purposes of this
repartitioning discussion.

I worry that making this a DSL operator on KStream is too generic, and we
would also need to add it to KTable for example B, despite KTables not
having any true pure key-changing operators outside of #groupBy. Would we
throw an exception if you invoked #markAsPartitioned on a KTable that
wasn't followed by a groupBy? If you have multiple key-changing operators,
would you need to add markAsPartitioned after each one? If not, what are
the semantics of that?  These are the main questions that got me thinking
here, and will definitely need to be clarified in the KIP if we do go with
the current proposal. But I wanted to throw out another idea for an API I
think would help with some of this awkwardness by having clearly defined
semantics:

Fundamentally it seems to me that these issues are arising from that "being
partitioned" is conceptually a property of other operations applied to a
KStream/KTable, rather than an operation itself. So rather than making this
a DSL operator itself, what if we added it to the Grouped and various
Joined configuration classes? It would allow us to more carefully hit only
the relevant parts of the DSL, so there are no questions about whether/when
to throw errors when the operator is incorrectly applied -- there would be
no way to apply it incorrectly. The main drawback I can think of is simply
that this touches on a larger surface area of the API. I personally don't
believe this is a good enough reason to make it a DSL operator as one could
make that argument for nearly any kind of KStream or KTable operator
configuration going forward, and would explode the KStream/KTable API
surface area instead. Perhaps this was discussed during the previous
iteration of this KIP, or I'm missing something here, so I just wanted to
put this out there and see what people think

Either way, thanks for picking up this KIP. It's been a long time coming :)

-Sophie





On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> wrote:

Hi all,

It's been a few days so I went ahead with editing the KIP, the main change
is on the method name

https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
.
I will follow up with a VOTE separately.

Best,
Shay

On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <mj...@apache.org> wrote:

Shay,

thanks for picking up this KIP. It's a pity that the discussion stalled
for such a long time.

As expressed previously, I am happy with the name `markAsPartitioned()`
and also believe it's ok to just document the impact and leave it to the
user to do the right thing.

If we really get a lot of users that ask about it, because they did not
do the right thing, we could still add something (eg, a reverse-mapper
function) in a follow-up KIP. But we don't know if it's necessary; thus,
making a small incremental step sounds like a good approach to me.

Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:
Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my
plan
is
to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to
downstream
joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of
the
java doc.

Let me know, thanks.
Shay




Reply via email to