Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-31 Thread Shay Lin
Hi all,

It's been a few days, if there is no further comments or questions I'd like
to call for a vote. There is an existing VOTE thread if you search for
KIP-759.

Thank you,
Shay

On Wed, Jul 26, 2023 at 7:30 PM Shay Lin  wrote:

> Very good catch, Matthias. I updated the KIP to state that the new
> DSLOperation will return a new, mutated KStream.
>
> Thank you,
> Shay
>
> On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax  wrote:
>
>> One last question. What should happen for the following case:
>>
>> KStream myStream = build.stream(...).map(...);
>> myStream.markAsPartiitoned().groupByKey().aggregate(...);
>> myStream.join(...)
>>
>> The question is about the "fan-out" pattern. `myStream`, which is marked
>> for partitioning, is fed into two downstream operations. Thus, it's
>> clear that the aggregation won't trigger a rebalance. However, the
>> fan-out happens before `markAsRepartiitoned` and thus I would assume
>> that the join would trigger a repartitioning?
>>
>> This question is important, because if we follow what I said above,
>> `markAsRepartiitoned` returns a new KStream object, but does mutate the
>> upstream KStream object, what is semantically two different things. It
>> also has an impact on how we need to implement the feature. The KIP
>> should explicitly explain this case.
>>
>>
>> -Matthias
>>
>> On 7/26/23 4:58 PM, Shay Lin wrote:
>> > Hi John,
>> >
>> > Thanks for your reply. I updated the KIP to reflect the changes we
>> > discussed in the thread today.
>> > #1 is duly noted, I learned from the examples Sophie sent earlier! =)
>> >
>> > In the new version, I also talked about why IQ and joins will not work
>> with
>> > the interface and talked about the mitigation. The proposal
>> > now specifically states we are solving the unneeded partition problem
>> when
>> > IQ or join does not coexist in the kafka streams. In the concerns
>> section,
>> > the proposal talks about having a reverse mapping would make this new
>> > interface compatible with IQ and join again but is subject to demand.
>> >
>> > Let me know what you think. Thanks!
>> > Shay
>> >
>> >
>> >
>> > On Wed, Jul 26, 2023 at 2:35 PM John Roesler 
>> wrote:
>> >
>> >> Hello Shay,
>> >>
>> >> Thanks for the KIP!
>> >>
>> >> I just took a look in preparation to vote, and there are two small-ish
>> >> things that I'd like to fix first. Apologies if this stuff has already
>> come
>> >> up in the discussion thread; I only skimmed it.
>> >>
>> >> 1. The KIP only mentions the name of the method instead of providing a
>> >> code snippet showing exactly what the method signature will be in the
>> >> interface. Normally, KIPs do the latter because it removes all
>> ambiguity
>> >> from the proposal. It also gives you an opportunity to write down the
>> >> Javadoc you would add to the method instead of just mentioning the
>> points
>> >> that you plan to document.
>> >>
>> >> 2. The KIP lists some concerns, but not what you will do to mitigate
>> them.
>> >> For example, the concern about IQ not behaving correctly. Will you
>> disable
>> >> the use of the implicit partitioner downstream of one of these
>> >> cancellations? Or provide a new interface to supply the "reverse
>> mapping"
>> >> you mentioned? Or include documentation in the Javadoc for how to deal
>> with
>> >> the situation? I think there are a range of options for each of those
>> >> concerns, and we should state up front what we plan to do.
>> >>
>> >> Thanks again!
>> >> -John
>> >>
>> >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
>> >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the
>> >> current
>> >>> proposal. I think once you make the minor
>> >>> updates to the KIP document this will be ready for voting again.
>> >>>
>> >>> Cheers,
>> >>> Sophie
>> >>>
>> >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:
>> >>>
>>  Hi Sophie and Matthias, thanks for your comments and replies.
>> 
>>  1. Scope of change: KStreams only or KStreams/KTable
>>  I took some time to digest your points, looking through how KStreams
>>  triggers repartitions today. I noticed that `repartitionRequired`is a
>> >> flag
>>  in KStreamImpl etc and not in KTableImpl etc. When I look further, in
>> >> the
>>  case of KTable, instead of passing in a boolean flag, a repartition
>> >> node `
>>  TableRepartitionMapNode` is directly created. I went back and
>> >> referenced
>>  the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
>>  focused on KStreams, i.e. not to change the partition why the input
>> >> streams
>>  are already correctly keyed. Is it possible that in the case of
>> KTable,
>>  users always intend to repartition (change key) when they call on
>>  aggregate? -- (this was written before I saw Matthias's comment)
>> 
>>  Overall, based on the tickets, I see the benefit of doing a contained
>>  change focusing on KStreams, i.e. repartitionRequired, which would
>> >> solve
>> 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread Shay Lin
Very good catch, Matthias. I updated the KIP to state that the new
DSLOperation will return a new, mutated KStream.

Thank you,
Shay

On Wed, Jul 26, 2023 at 6:13 PM Matthias J. Sax  wrote:

> One last question. What should happen for the following case:
>
> KStream myStream = build.stream(...).map(...);
> myStream.markAsPartiitoned().groupByKey().aggregate(...);
> myStream.join(...)
>
> The question is about the "fan-out" pattern. `myStream`, which is marked
> for partitioning, is fed into two downstream operations. Thus, it's
> clear that the aggregation won't trigger a rebalance. However, the
> fan-out happens before `markAsRepartiitoned` and thus I would assume
> that the join would trigger a repartitioning?
>
> This question is important, because if we follow what I said above,
> `markAsRepartiitoned` returns a new KStream object, but does mutate the
> upstream KStream object, what is semantically two different things. It
> also has an impact on how we need to implement the feature. The KIP
> should explicitly explain this case.
>
>
> -Matthias
>
> On 7/26/23 4:58 PM, Shay Lin wrote:
> > Hi John,
> >
> > Thanks for your reply. I updated the KIP to reflect the changes we
> > discussed in the thread today.
> > #1 is duly noted, I learned from the examples Sophie sent earlier! =)
> >
> > In the new version, I also talked about why IQ and joins will not work
> with
> > the interface and talked about the mitigation. The proposal
> > now specifically states we are solving the unneeded partition problem
> when
> > IQ or join does not coexist in the kafka streams. In the concerns
> section,
> > the proposal talks about having a reverse mapping would make this new
> > interface compatible with IQ and join again but is subject to demand.
> >
> > Let me know what you think. Thanks!
> > Shay
> >
> >
> >
> > On Wed, Jul 26, 2023 at 2:35 PM John Roesler 
> wrote:
> >
> >> Hello Shay,
> >>
> >> Thanks for the KIP!
> >>
> >> I just took a look in preparation to vote, and there are two small-ish
> >> things that I'd like to fix first. Apologies if this stuff has already
> come
> >> up in the discussion thread; I only skimmed it.
> >>
> >> 1. The KIP only mentions the name of the method instead of providing a
> >> code snippet showing exactly what the method signature will be in the
> >> interface. Normally, KIPs do the latter because it removes all ambiguity
> >> from the proposal. It also gives you an opportunity to write down the
> >> Javadoc you would add to the method instead of just mentioning the
> points
> >> that you plan to document.
> >>
> >> 2. The KIP lists some concerns, but not what you will do to mitigate
> them.
> >> For example, the concern about IQ not behaving correctly. Will you
> disable
> >> the use of the implicit partitioner downstream of one of these
> >> cancellations? Or provide a new interface to supply the "reverse
> mapping"
> >> you mentioned? Or include documentation in the Javadoc for how to deal
> with
> >> the situation? I think there are a range of options for each of those
> >> concerns, and we should state up front what we plan to do.
> >>
> >> Thanks again!
> >> -John
> >>
> >> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
> >>> Thanks Shay! You and Matthias have convinced me, I'm happy with the
> >> current
> >>> proposal. I think once you make the minor
> >>> updates to the KIP document this will be ready for voting again.
> >>>
> >>> Cheers,
> >>> Sophie
> >>>
> >>> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:
> >>>
>  Hi Sophie and Matthias, thanks for your comments and replies.
> 
>  1. Scope of change: KStreams only or KStreams/KTable
>  I took some time to digest your points, looking through how KStreams
>  triggers repartitions today. I noticed that `repartitionRequired`is a
> >> flag
>  in KStreamImpl etc and not in KTableImpl etc. When I look further, in
> >> the
>  case of KTable, instead of passing in a boolean flag, a repartition
> >> node `
>  TableRepartitionMapNode` is directly created. I went back and
> >> referenced
>  the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
>  focused on KStreams, i.e. not to change the partition why the input
> >> streams
>  are already correctly keyed. Is it possible that in the case of
> KTable,
>  users always intend to repartition (change key) when they call on
>  aggregate? -- (this was written before I saw Matthias's comment)
> 
>  Overall, based on the tickets, I see the benefit of doing a contained
>  change focusing on KStreams, i.e. repartitionRequired, which would
> >> solve
>  the pain points nicely. If we ran into similar complaints/optimization
>  requests for KTable down the line, we can address them on top of
> >> this(let
>  me know if we have these requests already, I might just be negligent).
> 
>  2. API: markAsPartitioned() vs config
>  If we go with the KStreams only scope, markAsPartition() is 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread Matthias J. Sax

One last question. What should happen for the following case:

KStream myStream = build.stream(...).map(...);
myStream.markAsPartiitoned().groupByKey().aggregate(...);
myStream.join(...)

The question is about the "fan-out" pattern. `myStream`, which is marked 
for partitioning, is fed into two downstream operations. Thus, it's 
clear that the aggregation won't trigger a rebalance. However, the 
fan-out happens before `markAsRepartiitoned` and thus I would assume 
that the join would trigger a repartitioning?


This question is important, because if we follow what I said above, 
`markAsRepartiitoned` returns a new KStream object, but does mutate the 
upstream KStream object, what is semantically two different things. It 
also has an impact on how we need to implement the feature. The KIP 
should explicitly explain this case.



-Matthias

On 7/26/23 4:58 PM, Shay Lin wrote:

Hi John,

Thanks for your reply. I updated the KIP to reflect the changes we
discussed in the thread today.
#1 is duly noted, I learned from the examples Sophie sent earlier! =)

In the new version, I also talked about why IQ and joins will not work with
the interface and talked about the mitigation. The proposal
now specifically states we are solving the unneeded partition problem when
IQ or join does not coexist in the kafka streams. In the concerns section,
the proposal talks about having a reverse mapping would make this new
interface compatible with IQ and join again but is subject to demand.

Let me know what you think. Thanks!
Shay



On Wed, Jul 26, 2023 at 2:35 PM John Roesler  wrote:


Hello Shay,

Thanks for the KIP!

I just took a look in preparation to vote, and there are two small-ish
things that I'd like to fix first. Apologies if this stuff has already come
up in the discussion thread; I only skimmed it.

1. The KIP only mentions the name of the method instead of providing a
code snippet showing exactly what the method signature will be in the
interface. Normally, KIPs do the latter because it removes all ambiguity
from the proposal. It also gives you an opportunity to write down the
Javadoc you would add to the method instead of just mentioning the points
that you plan to document.

2. The KIP lists some concerns, but not what you will do to mitigate them.
For example, the concern about IQ not behaving correctly. Will you disable
the use of the implicit partitioner downstream of one of these
cancellations? Or provide a new interface to supply the "reverse mapping"
you mentioned? Or include documentation in the Javadoc for how to deal with
the situation? I think there are a range of options for each of those
concerns, and we should state up front what we plan to do.

Thanks again!
-John

On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:

Thanks Shay! You and Matthias have convinced me, I'm happy with the

current

proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:


Hi Sophie and Matthias, thanks for your comments and replies.

1. Scope of change: KStreams only or KStreams/KTable
I took some time to digest your points, looking through how KStreams
triggers repartitions today. I noticed that `repartitionRequired`is a

flag

in KStreamImpl etc and not in KTableImpl etc. When I look further, in

the

case of KTable, instead of passing in a boolean flag, a repartition

node `

TableRepartitionMapNode` is directly created. I went back and

referenced

the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
focused on KStreams, i.e. not to change the partition why the input

streams

are already correctly keyed. Is it possible that in the case of KTable,
users always intend to repartition (change key) when they call on
aggregate? -- (this was written before I saw Matthias's comment)

Overall, based on the tickets, I see the benefit of doing a contained
change focusing on KStreams, i.e. repartitionRequired, which would

solve

the pain points nicely. If we ran into similar complaints/optimization
requests for KTable down the line, we can address them on top of

this(let

me know if we have these requests already, I might just be negligent).

2. API: markAsPartitioned() vs config
If we go with the KStreams only scope, markAsPartition() is more
adequate, i.e. maps nicely to repartitionRequired. There is a list of
NamedOperations that may or may not trigger repartition based on its
context(KStreams or KTable) which would make the implementation more
confusing.

3. KIP documentation: Thanks for providing the links to previous KIPs.

I

will be adding the three use cases and javadoc. I will also document

the

risks when it relates to IQ and Join.

Best,
Shay

On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax 

wrote:



I agree that it could easily be misused. There is a few Jira tickets

for

cases when people want to "cancel" a repartition step. I would hope
those tickets are linked to the KIP 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread Shay Lin
Hi John,

Thanks for your reply. I updated the KIP to reflect the changes we
discussed in the thread today.
#1 is duly noted, I learned from the examples Sophie sent earlier! =)

In the new version, I also talked about why IQ and joins will not work with
the interface and talked about the mitigation. The proposal
now specifically states we are solving the unneeded partition problem when
IQ or join does not coexist in the kafka streams. In the concerns section,
the proposal talks about having a reverse mapping would make this new
interface compatible with IQ and join again but is subject to demand.

Let me know what you think. Thanks!
Shay



On Wed, Jul 26, 2023 at 2:35 PM John Roesler  wrote:

> Hello Shay,
>
> Thanks for the KIP!
>
> I just took a look in preparation to vote, and there are two small-ish
> things that I'd like to fix first. Apologies if this stuff has already come
> up in the discussion thread; I only skimmed it.
>
> 1. The KIP only mentions the name of the method instead of providing a
> code snippet showing exactly what the method signature will be in the
> interface. Normally, KIPs do the latter because it removes all ambiguity
> from the proposal. It also gives you an opportunity to write down the
> Javadoc you would add to the method instead of just mentioning the points
> that you plan to document.
>
> 2. The KIP lists some concerns, but not what you will do to mitigate them.
> For example, the concern about IQ not behaving correctly. Will you disable
> the use of the implicit partitioner downstream of one of these
> cancellations? Or provide a new interface to supply the "reverse mapping"
> you mentioned? Or include documentation in the Javadoc for how to deal with
> the situation? I think there are a range of options for each of those
> concerns, and we should state up front what we plan to do.
>
> Thanks again!
> -John
>
> On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
> > Thanks Shay! You and Matthias have convinced me, I'm happy with the
> current
> > proposal. I think once you make the minor
> > updates to the KIP document this will be ready for voting again.
> >
> > Cheers,
> > Sophie
> >
> > On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:
> >
> > > Hi Sophie and Matthias, thanks for your comments and replies.
> > >
> > > 1. Scope of change: KStreams only or KStreams/KTable
> > > I took some time to digest your points, looking through how KStreams
> > > triggers repartitions today. I noticed that `repartitionRequired`is a
> flag
> > > in KStreamImpl etc and not in KTableImpl etc. When I look further, in
> the
> > > case of KTable, instead of passing in a boolean flag, a repartition
> node `
> > > TableRepartitionMapNode` is directly created. I went back and
> referenced
> > > the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
> > > focused on KStreams, i.e. not to change the partition why the input
> streams
> > > are already correctly keyed. Is it possible that in the case of KTable,
> > > users always intend to repartition (change key) when they call on
> > > aggregate? -- (this was written before I saw Matthias's comment)
> > >
> > > Overall, based on the tickets, I see the benefit of doing a contained
> > > change focusing on KStreams, i.e. repartitionRequired, which would
> solve
> > > the pain points nicely. If we ran into similar complaints/optimization
> > > requests for KTable down the line, we can address them on top of
> this(let
> > > me know if we have these requests already, I might just be negligent).
> > >
> > > 2. API: markAsPartitioned() vs config
> > > If we go with the KStreams only scope, markAsPartition() is more
> > > adequate, i.e. maps nicely to repartitionRequired. There is a list of
> > > NamedOperations that may or may not trigger repartition based on its
> > > context(KStreams or KTable) which would make the implementation more
> > > confusing.
> > >
> > > 3. KIP documentation: Thanks for providing the links to previous KIPs.
> I
> > > will be adding the three use cases and javadoc. I will also document
> the
> > > risks when it relates to IQ and Join.
> > >
> > > Best,
> > > Shay
> > >
> > > On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax 
> wrote:
> > >
> > > > I agree that it could easily be misused. There is a few Jira tickets
> for
> > > > cases when people want to "cancel" a repartition step. I would hope
> > > > those tickets are linked to the KIP (if not, we should do this, and
> > > > maybe even c those cases as motivation into the KIP itself)?
> > > >
> > > > It's always a tricky question to what extend we want to guide users,
> and
> > > > to what extend we need to give levers for advances case (and how to
> > > > design those levers...) It's for sure a good idea to call out "use
> with
> > > > case" in the JavaDocs for the new method.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> > > > > I guess I felt a bit uneasy about how this could be used/abused
> while
> 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread John Roesler
Hello Shay,

Thanks for the KIP!

I just took a look in preparation to vote, and there are two small-ish things 
that I'd like to fix first. Apologies if this stuff has already come up in the 
discussion thread; I only skimmed it.

1. The KIP only mentions the name of the method instead of providing a code 
snippet showing exactly what the method signature will be in the interface. 
Normally, KIPs do the latter because it removes all ambiguity from the 
proposal. It also gives you an opportunity to write down the Javadoc you would 
add to the method instead of just mentioning the points that you plan to 
document.

2. The KIP lists some concerns, but not what you will do to mitigate them. For 
example, the concern about IQ not behaving correctly. Will you disable the use 
of the implicit partitioner downstream of one of these cancellations? Or 
provide a new interface to supply the "reverse mapping" you mentioned? Or 
include documentation in the Javadoc for how to deal with the situation? I 
think there are a range of options for each of those concerns, and we should 
state up front what we plan to do.

Thanks again!
-John

On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:
> Thanks Shay! You and Matthias have convinced me, I'm happy with the current
> proposal. I think once you make the minor
> updates to the KIP document this will be ready for voting again.
> 
> Cheers,
> Sophie
> 
> On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:
> 
> > Hi Sophie and Matthias, thanks for your comments and replies.
> >
> > 1. Scope of change: KStreams only or KStreams/KTable
> > I took some time to digest your points, looking through how KStreams
> > triggers repartitions today. I noticed that `repartitionRequired`is a flag
> > in KStreamImpl etc and not in KTableImpl etc. When I look further, in the
> > case of KTable, instead of passing in a boolean flag, a repartition node `
> > TableRepartitionMapNode` is directly created. I went back and referenced
> > the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
> > focused on KStreams, i.e. not to change the partition why the input streams
> > are already correctly keyed. Is it possible that in the case of KTable,
> > users always intend to repartition (change key) when they call on
> > aggregate? -- (this was written before I saw Matthias's comment)
> >
> > Overall, based on the tickets, I see the benefit of doing a contained
> > change focusing on KStreams, i.e. repartitionRequired, which would solve
> > the pain points nicely. If we ran into similar complaints/optimization
> > requests for KTable down the line, we can address them on top of this(let
> > me know if we have these requests already, I might just be negligent).
> >
> > 2. API: markAsPartitioned() vs config
> > If we go with the KStreams only scope, markAsPartition() is more
> > adequate, i.e. maps nicely to repartitionRequired. There is a list of
> > NamedOperations that may or may not trigger repartition based on its
> > context(KStreams or KTable) which would make the implementation more
> > confusing.
> >
> > 3. KIP documentation: Thanks for providing the links to previous KIPs. I
> > will be adding the three use cases and javadoc. I will also document the
> > risks when it relates to IQ and Join.
> >
> > Best,
> > Shay
> >
> > On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax  wrote:
> >
> > > I agree that it could easily be misused. There is a few Jira tickets for
> > > cases when people want to "cancel" a repartition step. I would hope
> > > those tickets are linked to the KIP (if not, we should do this, and
> > > maybe even c those cases as motivation into the KIP itself)?
> > >
> > > It's always a tricky question to what extend we want to guide users, and
> > > to what extend we need to give levers for advances case (and how to
> > > design those levers...) It's for sure a good idea to call out "use with
> > > case" in the JavaDocs for the new method.
> > >
> > >
> > > -Matthias
> > >
> > > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> > > > I guess I felt a bit uneasy about how this could be used/abused while
> > > > reading the KIP, but if we truly believe this is an advanced feature,
> > I'm
> > > > fine with the way things currently are. It doesn't feel like the best
> > > API,
> > > > but it does seem to be the best *possible* API given the way things
> > are.
> > > >
> > > > W.r.t the KTable notes, that all makes sense to me. I just wanted to
> > lay
> > > > out all the potential cases to make sure we had our bases covered.
> > > >
> > > > I still think an example or two would help, but the only thing I will
> > > > actually wait on before feeling comfortable enough to vote on this
> > would
> > > be
> > > > a clear method signature (and maybe sample javadocs) in the "Public
> > > > Interfaces" section.
> > > >
> > > > Thanks again for the KIP Shay! Hope I haven't dragged it out too much
> > > >
> > > > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax 
> > > wrote:
> > > >
> > 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-24 Thread Sophie Blee-Goldman
Thanks Shay! You and Matthias have convinced me, I'm happy with the current
proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:

> Hi Sophie and Matthias, thanks for your comments and replies.
>
> 1. Scope of change: KStreams only or KStreams/KTable
> I took some time to digest your points, looking through how KStreams
> triggers repartitions today. I noticed that `repartitionRequired`is a flag
> in KStreamImpl etc and not in KTableImpl etc. When I look further, in the
> case of KTable, instead of passing in a boolean flag, a repartition node `
> TableRepartitionMapNode` is directly created. I went back and referenced
> the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
> focused on KStreams, i.e. not to change the partition why the input streams
> are already correctly keyed. Is it possible that in the case of KTable,
> users always intend to repartition (change key) when they call on
> aggregate? -- (this was written before I saw Matthias's comment)
>
> Overall, based on the tickets, I see the benefit of doing a contained
> change focusing on KStreams, i.e. repartitionRequired, which would solve
> the pain points nicely. If we ran into similar complaints/optimization
> requests for KTable down the line, we can address them on top of this(let
> me know if we have these requests already, I might just be negligent).
>
> 2. API: markAsPartitioned() vs config
> If we go with the KStreams only scope, markAsPartition() is more
> adequate, i.e. maps nicely to repartitionRequired. There is a list of
> NamedOperations that may or may not trigger repartition based on its
> context(KStreams or KTable) which would make the implementation more
> confusing.
>
> 3. KIP documentation: Thanks for providing the links to previous KIPs. I
> will be adding the three use cases and javadoc. I will also document the
> risks when it relates to IQ and Join.
>
> Best,
> Shay
>
> On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax  wrote:
>
> > I agree that it could easily be misused. There is a few Jira tickets for
> > cases when people want to "cancel" a repartition step. I would hope
> > those tickets are linked to the KIP (if not, we should do this, and
> > maybe even c those cases as motivation into the KIP itself)?
> >
> > It's always a tricky question to what extend we want to guide users, and
> > to what extend we need to give levers for advances case (and how to
> > design those levers...) It's for sure a good idea to call out "use with
> > case" in the JavaDocs for the new method.
> >
> >
> > -Matthias
> >
> > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> > > I guess I felt a bit uneasy about how this could be used/abused while
> > > reading the KIP, but if we truly believe this is an advanced feature,
> I'm
> > > fine with the way things currently are. It doesn't feel like the best
> > API,
> > > but it does seem to be the best *possible* API given the way things
> are.
> > >
> > > W.r.t the KTable notes, that all makes sense to me. I just wanted to
> lay
> > > out all the potential cases to make sure we had our bases covered.
> > >
> > > I still think an example or two would help, but the only thing I will
> > > actually wait on before feeling comfortable enough to vote on this
> would
> > be
> > > a clear method signature (and maybe sample javadocs) in the "Public
> > > Interfaces" section.
> > >
> > > Thanks again for the KIP Shay! Hope I haven't dragged it out too much
> > >
> > > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax 
> > wrote:
> > >
> > >> Some thought about the API question.
> > >>
> > >>
> >  A. kstream.groupBy(...).aggregate(...)
> > >>
> > >> This can be re-writtten as
> > >>
> > >> kstream.selectKey(...)
> > >>  .markAsRepartitioned()
> > >>  .groupByKey()
> > >>  .aggregate()
> > >>
> > >> Given that `markAsRepartitoned` is an advanced feature, I think it
> would
> > >> be ok?
> > >>
> > >>
> >  B. ktable.groupBy(...).aggregate(...)
> > >>
> > >> For KTable aggregation, not sure how useful it would be? In the end,
> an
> > >> table aggregation does only make sense if we pick something from the
> > >> value, ie, we indeed change the key?
> > >>
> > >>
> >  C. kstream.selectKey(...).join(ktable)
> > >>
> > >> We can just insert a `markAsRepartitioned()` after `selectKey` to
> avoid
> > >> repartitioning of the left input KStream.
> > >>
> > >>
> > >>> KStream.selectKey(...).toTable().join(...)
> > >>
> > >> Not sure if I understand what you try to say with this example? In the
> > >> end, `selectKey(...).toTable()` would repartiton. If I know that one
> can
> > >> upsert directly, one inserts a `markAsRepartitioned()` in between.
> > >>
> > >>
> > >> In general, the use case seems to be that the key is not in the right
> > >> "format", or there is no key, but data was partitioned by a
> > >> value-attribute upstream and we just 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-24 Thread Shay Lin
Hi Sophie and Matthias, thanks for your comments and replies.

1. Scope of change: KStreams only or KStreams/KTable
I took some time to digest your points, looking through how KStreams
triggers repartitions today. I noticed that `repartitionRequired`is a flag
in KStreamImpl etc and not in KTableImpl etc. When I look further, in the
case of KTable, instead of passing in a boolean flag, a repartition node `
TableRepartitionMapNode` is directly created. I went back and referenced
the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
focused on KStreams, i.e. not to change the partition why the input streams
are already correctly keyed. Is it possible that in the case of KTable,
users always intend to repartition (change key) when they call on
aggregate? -- (this was written before I saw Matthias's comment)

Overall, based on the tickets, I see the benefit of doing a contained
change focusing on KStreams, i.e. repartitionRequired, which would solve
the pain points nicely. If we ran into similar complaints/optimization
requests for KTable down the line, we can address them on top of this(let
me know if we have these requests already, I might just be negligent).

2. API: markAsPartitioned() vs config
If we go with the KStreams only scope, markAsPartition() is more
adequate, i.e. maps nicely to repartitionRequired. There is a list of
NamedOperations that may or may not trigger repartition based on its
context(KStreams or KTable) which would make the implementation more
confusing.

3. KIP documentation: Thanks for providing the links to previous KIPs. I
will be adding the three use cases and javadoc. I will also document the
risks when it relates to IQ and Join.

Best,
Shay

On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax  wrote:

> I agree that it could easily be misused. There is a few Jira tickets for
> cases when people want to "cancel" a repartition step. I would hope
> those tickets are linked to the KIP (if not, we should do this, and
> maybe even c those cases as motivation into the KIP itself)?
>
> It's always a tricky question to what extend we want to guide users, and
> to what extend we need to give levers for advances case (and how to
> design those levers...) It's for sure a good idea to call out "use with
> case" in the JavaDocs for the new method.
>
>
> -Matthias
>
> On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:
> > I guess I felt a bit uneasy about how this could be used/abused while
> > reading the KIP, but if we truly believe this is an advanced feature, I'm
> > fine with the way things currently are. It doesn't feel like the best
> API,
> > but it does seem to be the best *possible* API given the way things are.
> >
> > W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
> > out all the potential cases to make sure we had our bases covered.
> >
> > I still think an example or two would help, but the only thing I will
> > actually wait on before feeling comfortable enough to vote on this would
> be
> > a clear method signature (and maybe sample javadocs) in the "Public
> > Interfaces" section.
> >
> > Thanks again for the KIP Shay! Hope I haven't dragged it out too much
> >
> > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax 
> wrote:
> >
> >> Some thought about the API question.
> >>
> >>
>  A. kstream.groupBy(...).aggregate(...)
> >>
> >> This can be re-writtten as
> >>
> >> kstream.selectKey(...)
> >>  .markAsRepartitioned()
> >>  .groupByKey()
> >>  .aggregate()
> >>
> >> Given that `markAsRepartitoned` is an advanced feature, I think it would
> >> be ok?
> >>
> >>
>  B. ktable.groupBy(...).aggregate(...)
> >>
> >> For KTable aggregation, not sure how useful it would be? In the end, an
> >> table aggregation does only make sense if we pick something from the
> >> value, ie, we indeed change the key?
> >>
> >>
>  C. kstream.selectKey(...).join(ktable)
> >>
> >> We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
> >> repartitioning of the left input KStream.
> >>
> >>
> >>> KStream.selectKey(...).toTable().join(...)
> >>
> >> Not sure if I understand what you try to say with this example? In the
> >> end, `selectKey(...).toTable()` would repartiton. If I know that one can
> >> upsert directly, one inserts a `markAsRepartitioned()` in between.
> >>
> >>
> >> In general, the use case seems to be that the key is not in the right
> >> "format", or there is no key, but data was partitioned by a
> >> value-attribute upstream and we just want to extract this
> >> value-attribute into the key. Both seems to be KStream cases?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
> >>> Hey Shay, while I don't have any specific concerns about the new public
> >> API
> >>> in this KIP, I'd like to better understand how this feature will work
> >>> before I vote. We should document the behavior of this new operator
> >> clearly
> >>> in the KIP as well -- you don't 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
I agree that it could easily be misused. There is a few Jira tickets for 
cases when people want to "cancel" a repartition step. I would hope 
those tickets are linked to the KIP (if not, we should do this, and 
maybe even c those cases as motivation into the KIP itself)?


It's always a tricky question to what extend we want to guide users, and 
to what extend we need to give levers for advances case (and how to 
design those levers...) It's for sure a good idea to call out "use with 
case" in the JavaDocs for the new method.



-Matthias

On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:

I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:


Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
 .markAsRepartitioned()
 .groupByKey()
 .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would
be ok?



B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an
table aggregation does only make sense if we pick something from the
value, ie, we indeed change the key?



C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
repartitioning of the left input KStream.



KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the
end, `selectKey(...).toTable()` would repartiton. If I know that one can
upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the right
"format", or there is no key, but data was partitioned by a
value-attribute upstream and we just want to extract this
value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public

API

in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator

clearly

in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP

and

then understand how this feature will work and how they would need to

apply

it.

To that end, I recommend framing this proposal with a few examples to

help

clarify the semantics. When and where can you apply the

markAsPartitioned()

operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition

before

an aggregation." We should change "aggregation" to "stateful operation"

as

this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says

this

should not be a concern "if we use markAsPartitioned correctly". Does

this

mean if we, the devs implementing this, write the feature correctly? Or

is

it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL


(or this KIP
<

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808

)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help.

Perhaps

we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either

a

KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Sophie Blee-Goldman
I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:

> Some thought about the API question.
>
>
> >> A. kstream.groupBy(...).aggregate(...)
>
> This can be re-writtten as
>
> kstream.selectKey(...)
> .markAsRepartitioned()
> .groupByKey()
> .aggregate()
>
> Given that `markAsRepartitoned` is an advanced feature, I think it would
> be ok?
>
>
> >> B. ktable.groupBy(...).aggregate(...)
>
> For KTable aggregation, not sure how useful it would be? In the end, an
> table aggregation does only make sense if we pick something from the
> value, ie, we indeed change the key?
>
>
> >> C. kstream.selectKey(...).join(ktable)
>
> We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
> repartitioning of the left input KStream.
>
>
> > KStream.selectKey(...).toTable().join(...)
>
> Not sure if I understand what you try to say with this example? In the
> end, `selectKey(...).toTable()` would repartiton. If I know that one can
> upsert directly, one inserts a `markAsRepartitioned()` in between.
>
>
> In general, the use case seems to be that the key is not in the right
> "format", or there is no key, but data was partitioned by a
> value-attribute upstream and we just want to extract this
> value-attribute into the key. Both seems to be KStream cases?
>
>
> -Matthias
>
>
>
> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:
> > Hey Shay, while I don't have any specific concerns about the new public
> API
> > in this KIP, I'd like to better understand how this feature will work
> > before I vote. We should document the behavior of this new operator
> clearly
> > in the KIP as well -- you don't necessarily need to write the complete
> > javadocs up front, but it should be possible for a user to read the KIP
> and
> > then understand how this feature will work and how they would need to
> apply
> > it.
> >
> > To that end, I recommend framing this proposal with a few examples to
> help
> > clarify the semantics. When and where can you apply the
> markAsPartitioned()
> > operator? Some suggestions below.
> >
> > Specific notes:
> >
> > 1. The KIP opens with "Each key changing operation in Kafka Streams
> > (selectKey, map, transform, etc.) now leads to automatic repartition
> before
> > an aggregation." We should change "aggregation" to "stateful operation"
> as
> > this is true for things like joins as well as aggregations
> > 2. The callout on IQ makes me a bit uncomfortable -- basically it says
> this
> > should not be a concern "if we use markAsPartitioned correctly". Does
> this
> > mean if we, the devs implementing this, write the feature correctly? Or
> is
> > it saying that this won't be a problem as long as "we", the users of this
> > feature, use it correctly"? Just wondering if you've put any thought into
> > how this would work yet (I personally have not)
> > 3. The KIP should lay out the proposed API exactly, even if there's only
> > one new method. Check out this KIP
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > (or this KIP
> > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808
> >)
> > for a good reference on what the Public Interfaces section should include
> > 4. Regarding the proposed API itself, I wonder if KStream is really the
> > most appropriate interface for the new operator. A repartition can be
> > triggered on just a KTable. Here's where some examples would help.
> Perhaps
> > we could focus on these three cases:
> >
> > A. kstream.groupBy(...).aggregate(...)
> > B. ktable.groupBy(...).aggregate(...)
> > C. kstream.selectKey(...).join(ktable)
> >
> > I'm sure someone will correct me if I'm missing any additional vital
> > examples, but at the very least, these are the three to consider: either
> a
> > KStream or KTable followed by a groupBy/aggregation, or a KStream with
> > key-changing operator followed by a join. Note that you could have
> > something like KStream.selectKey(...).toTable().join(...) as well, but
> > since there are no pure key-changing operators (like #selectKey) on
> > KTables, only groupBy() which must always be followed by aggregation,
> this
> > 4th case can be reduced to an example like C of a KStream with
> 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax

Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
   .markAsRepartitioned()
   .groupByKey()
   .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would 
be ok?




B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an 
table aggregation does only make sense if we pick something from the 
value, ie, we indeed change the key?




C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid 
repartitioning of the left input KStream.




KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the 
end, `selectKey(...).toTable()` would repartiton. If I know that one can 
upsert directly, one inserts a `markAsRepartitioned()` in between.



In general, the use case seems to be that the key is not in the right 
"format", or there is no key, but data was partitioned by a 
value-attribute upstream and we just want to extract this 
value-attribute into the key. Both seems to be KStream cases?



-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public API
in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator clearly
in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP and
then understand how this feature will work and how they would need to apply
it.

To that end, I recommend framing this proposal with a few examples to help
clarify the semantics. When and where can you apply the markAsPartitioned()
operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition before
an aggregation." We should change "aggregation" to "stateful operation" as
this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says this
should not be a concern "if we use markAsPartitioned correctly". Does this
mean if we, the devs implementing this, write the feature correctly? Or is
it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP

(or this KIP
)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help. Perhaps
we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either a
KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing operator followed by a join. Note that you could have
something like KStream.selectKey(...).toTable().join(...) as well, but
since there are no pure key-changing operators (like #selectKey) on
KTables, only groupBy() which must always be followed by aggregation, this
4th case can be reduced to an example like C of a KStream with key-changing
operation and downstream join -- ie there's no way to do this without
#toTable which is more like syntactic sugar for the purposes of this
repartitioning discussion.

I worry that making this a DSL operator on KStream is too generic, and we
would also need to add it to KTable for example B, despite KTables not
having any true pure key-changing operators outside of #groupBy. Would we
throw an exception if you invoked #markAsPartitioned on a KTable that
wasn't followed by a groupBy? If you have multiple key-changing operators,
would you need to add markAsPartitioned after each one? If not, what are
the semantics of that?  These are the main questions that got me thinking
here, and will definitely need to be clarified in the KIP if we do go with
the current proposal. But I wanted to throw out another idea for an API I
think would help with some of this awkwardness by having clearly defined
semantics:

Fundamentally it seems to me that these issues are arising from that "being
partitioned" is conceptually a property of other operations applied to a

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-15 Thread Sophie Blee-Goldman
Hey Shay, while I don't have any specific concerns about the new public API
in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator clearly
in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP and
then understand how this feature will work and how they would need to apply
it.

To that end, I recommend framing this proposal with a few examples to help
clarify the semantics. When and where can you apply the markAsPartitioned()
operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition before
an aggregation." We should change "aggregation" to "stateful operation" as
this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says this
should not be a concern "if we use markAsPartitioned correctly". Does this
mean if we, the devs implementing this, write the feature correctly? Or is
it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP

(or this KIP
)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help. Perhaps
we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to consider: either a
KStream or KTable followed by a groupBy/aggregation, or a KStream with
key-changing operator followed by a join. Note that you could have
something like KStream.selectKey(...).toTable().join(...) as well, but
since there are no pure key-changing operators (like #selectKey) on
KTables, only groupBy() which must always be followed by aggregation, this
4th case can be reduced to an example like C of a KStream with key-changing
operation and downstream join -- ie there's no way to do this without
#toTable which is more like syntactic sugar for the purposes of this
repartitioning discussion.

I worry that making this a DSL operator on KStream is too generic, and we
would also need to add it to KTable for example B, despite KTables not
having any true pure key-changing operators outside of #groupBy. Would we
throw an exception if you invoked #markAsPartitioned on a KTable that
wasn't followed by a groupBy? If you have multiple key-changing operators,
would you need to add markAsPartitioned after each one? If not, what are
the semantics of that?  These are the main questions that got me thinking
here, and will definitely need to be clarified in the KIP if we do go with
the current proposal. But I wanted to throw out another idea for an API I
think would help with some of this awkwardness by having clearly defined
semantics:

Fundamentally it seems to me that these issues are arising from that "being
partitioned" is conceptually a property of other operations applied to a
KStream/KTable, rather than an operation itself. So rather than making this
a DSL operator itself, what if we added it to the Grouped and various
Joined configuration classes? It would allow us to more carefully hit only
the relevant parts of the DSL, so there are no questions about whether/when
to throw errors when the operator is incorrectly applied -- there would be
no way to apply it incorrectly. The main drawback I can think of is simply
that this touches on a larger surface area of the API. I personally don't
believe this is a good enough reason to make it a DSL operator as one could
make that argument for nearly any kind of KStream or KTable operator
configuration going forward, and would explode the KStream/KTable API
surface area instead. Perhaps this was discussed during the previous
iteration of this KIP, or I'm missing something here, so I just wanted to
put this out there and see what people think

Either way, thanks for picking up this KIP. It's been a long time coming :)

-Sophie





On Mon, Jul 10, 2023 at 2:05 PM Shay Lin  wrote:

> Hi all,
>
> It's been a few days so I went ahead with editing the KIP, the main change
> is on the method name
>
> 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-10 Thread Shay Lin
Hi all,

It's been a few days so I went ahead with editing the KIP, the main change
is on the method name
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling.
I will follow up with a VOTE separately.

Best,
Shay

On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax  wrote:

> Shay,
>
> thanks for picking up this KIP. It's a pity that the discussion stalled
> for such a long time.
>
> As expressed previously, I am happy with the name `markAsPartitioned()`
> and also believe it's ok to just document the impact and leave it to the
> user to do the right thing.
>
> If we really get a lot of users that ask about it, because they did not
> do the right thing, we could still add something (eg, a reverse-mapper
> function) in a follow-up KIP. But we don't know if it's necessary; thus,
> making a small incremental step sounds like a good approach to me.
>
> Let's see if others agree or not.
>
>
> -Matthias
>
> On 6/28/23 5:29 PM, Shay Lin wrote:
> > Hi all,
> >
> > Great discussion thread. May I take this KIP up? If it’s alright my plan
> is
> > to update the KIP with the operator `markAsPartitioned()`.
> >
> > As you have discussed and pointed out, there are implications to
> downstream
> > joins or aggregation operations. Still, the operator is intended for
> > advanced users so my two cents is it would be a valuable addition
> > nonetheless. We could add this as a caution/consideration as part of the
> > java doc.
> >
> > Let me know, thanks.
> > Shay
> >
>


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-06-29 Thread Matthias J. Sax

Shay,

thanks for picking up this KIP. It's a pity that the discussion stalled 
for such a long time.


As expressed previously, I am happy with the name `markAsPartitioned()` 
and also believe it's ok to just document the impact and leave it to the 
user to do the right thing.


If we really get a lot of users that ask about it, because they did not 
do the right thing, we could still add something (eg, a reverse-mapper 
function) in a follow-up KIP. But we don't know if it's necessary; thus, 
making a small incremental step sounds like a good approach to me.


Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:

Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my plan is
to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to downstream
joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of the
java doc.

Let me know, thanks.
Shay



Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-06-28 Thread Shay Lin
Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my plan is
to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to downstream
joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of the
java doc.

Let me know, thanks.
Shay


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-05-23 Thread Levani Kokhreidze
Hi all,

Since there was no activity around this KIP, I’ll pick it up in coming weeks 
and continue the discussion.

Best,
Levani

> On 27. Apr 2022, at 22:50, Matthias J. Sax  wrote:
> 
> Let's wait a couple of days to give Ivan a chance to reply. If he does not 
> reply, feel free to pick it up.
> 
> 
> -Matthias
> 
> On 4/26/22 3:58 AM, Levani Kokhreidze wrote:
>> Hi,
>> Sorry, maybe I am jumping the gun here, but if by any chance this KIP 
>> becomes dormant, I'd be interested in picking it up.
>> Levani
>>> On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:
>>> 
>>> Ivan,
>>> 
>>> are you still interested in this KIP? I think it would be a good addition.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
 Your point about the IQ problem is an interesting one. I missed the
 point that the "new key" would be a "superkey", and thus, it should
 always be possible to compute the original key from the superkey. (As a
 matter of fact, for windowed-table the windowed-key is also a superkey...)
 I am not sure if we need to follow the "use the head idea" or if we need
 a "CompositeKey" interface? It seems we can just allow for any types and
 we can be agnostic to it?
 KStream stream = ...
 KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()
 We only need a `Function` without any restrictions on the type,
 to map the "superkey" to the original "partition key"?
 Do you propose to provide the "revers mapper" via the
 `markAsPartitioned()` method (or config object), or via the IQ methods?
 Not sure which one is better?
 However, I am not sure if it would solve the join problem? At least not
 easily: if one has two KStream and one is properly
 partitioned by `Tuple` while the other one is "marked-as-partitoned",
 the join would just fail. -- Similar for a stream-table join. -- The
 only fix would be to do the re-partitioning anyway, effectively ignoring
 the "user hint", but it seems to defeat the purpose? Again, I would
 argue that it is ok to not handle this case, but leave it as the
 responsibility for the user to not mess it up.
 -Matthias
 On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
> Hi Matthias and Sophie!
> 
> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
> 
> I don't have a strong opinion here, both Sophie's and Matthias' points
> look convincing for me.
> 
> I think we should estimate the following: what is the probability that
> we will ever need to extend `selectKey` etc. with a config for the
> purposes other than `markAsPartitioned`?
> 
> If we find this probability high, then it's just a refactoring to
> deprecate overloads with `Named` and introduce overloads with dedicated
> configs, and we should do it this way.
> 
> If it's low or zero, maybe it's better not to mess with the existing
> APIs and to introduce a single `markAsPartitioned()` method, which
> itself can be easily deprecated if we find a better solution later!
> 
> 
> ==2. The IQ problem==
> 
>> it then has to be the case that
> 
>> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> 
> Sophie, you got this wrong, and Matthias already explained why.
> 
> The actual required property for the mapping function is:
> 
> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
> 
> or, by contraposition law,
> 
> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
> 
> 
> (look at the whiteboard photo that I attached to the KIP).
> 
> There is a big class of such mappings: key -> Tuple(key, anyValue). This
> is actually what we often do before aggregation, and this mapping does
> not require repartition.
> 
> But of course we can extract the original key from Tuple(key, anyValue),
> and this can save IQ and joins!
> 
> This is what I'm talking about when I talk about 'CompositeKey' idea.
> 
> We can do the following:
> 
> 1. implement a 'partitioner wrapper' that recognizes tuples
> (CompositeKeys) and uses only the 'head' to calculate the partition,
> 
> 2. implement
> 
> selectCompositeKey(BiFunction tailSelector) {
>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>   //MARK_AS_PARTITIONED call here,
>   //but this call is an implementation detail and we do not expose
>   //markAsPartitioned publicly!
> }
> 
> WDYT? (it's just a brainstorming idea)
> 
> 09.08.2021 2:38, Matthias J. Sax пишет:
>> Hi,
>> 
>> I originally had a similar thought about `markAsPartitioned()` vs
>> extending `selectKey()` et al. with a config. While I agree that it
>> might be conceptually cleaner to use a config object, I did not 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-27 Thread Matthias J. Sax
Let's wait a couple of days to give Ivan a chance to reply. If he does 
not reply, feel free to pick it up.



-Matthias

On 4/26/22 3:58 AM, Levani Kokhreidze wrote:

Hi,

Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes 
dormant, I'd be interested in picking it up.

Levani


On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:

Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)
I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?
KStream stream = ...
KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()
We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?
Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?
However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.
-Matthias
On 8/9/21 2:32 PM, Ivan Ponomarev wrote:

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==


it then has to be the case that



Partitioner.partition(key) == Partitioner.partition(map(key))



Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-26 Thread Levani Kokhreidze
Hi,

Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes 
dormant, I'd be interested in picking it up.

Levani

> On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:
> 
> Ivan,
> 
> are you still interested in this KIP? I think it would be a good addition.
> 
> 
> -Matthias
> 
> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>> Your point about the IQ problem is an interesting one. I missed the
>> point that the "new key" would be a "superkey", and thus, it should
>> always be possible to compute the original key from the superkey. (As a
>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>> I am not sure if we need to follow the "use the head idea" or if we need
>> a "CompositeKey" interface? It seems we can just allow for any types and
>> we can be agnostic to it?
>> KStream stream = ...
>> KStream stream2 =
>>   stream.selectKey(/*set superkey*/)
>> .markAsPartitioned()
>> We only need a `Function` without any restrictions on the type,
>> to map the "superkey" to the original "partition key"?
>> Do you propose to provide the "revers mapper" via the
>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>> Not sure which one is better?
>> However, I am not sure if it would solve the join problem? At least not
>> easily: if one has two KStream and one is properly
>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>> the join would just fail. -- Similar for a stream-table join. -- The
>> only fix would be to do the re-partitioning anyway, effectively ignoring
>> the "user hint", but it seems to defeat the purpose? Again, I would
>> argue that it is ok to not handle this case, but leave it as the
>> responsibility for the user to not mess it up.
>> -Matthias
>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>> Hi Matthias and Sophie!
>>> 
>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>> 
>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>> look convincing for me.
>>> 
>>> I think we should estimate the following: what is the probability that
>>> we will ever need to extend `selectKey` etc. with a config for the
>>> purposes other than `markAsPartitioned`?
>>> 
>>> If we find this probability high, then it's just a refactoring to
>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>> configs, and we should do it this way.
>>> 
>>> If it's low or zero, maybe it's better not to mess with the existing
>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>> itself can be easily deprecated if we find a better solution later!
>>> 
>>> 
>>> ==2. The IQ problem==
>>> 
 it then has to be the case that
>>> 
 Partitioner.partition(key) == Partitioner.partition(map(key))
>>> 
>>> 
>>> Sophie, you got this wrong, and Matthias already explained why.
>>> 
>>> The actual required property for the mapping function is:
>>> 
>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>> 
>>> or, by contraposition law,
>>> 
>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>> 
>>> 
>>> (look at the whiteboard photo that I attached to the KIP).
>>> 
>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>> is actually what we often do before aggregation, and this mapping does
>>> not require repartition.
>>> 
>>> But of course we can extract the original key from Tuple(key, anyValue),
>>> and this can save IQ and joins!
>>> 
>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>> 
>>> We can do the following:
>>> 
>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>> 
>>> 2. implement
>>> 
>>> selectCompositeKey(BiFunction tailSelector) {
>>>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>   //MARK_AS_PARTITIONED call here,
>>>   //but this call is an implementation detail and we do not expose
>>>   //markAsPartitioned publicly!
>>> }
>>> 
>>> WDYT? (it's just a brainstorming idea)
>>> 
>>> 09.08.2021 2:38, Matthias J. Sax пишет:
 Hi,
 
 I originally had a similar thought about `markAsPartitioned()` vs
 extending `selectKey()` et al. with a config. While I agree that it
 might be conceptually cleaner to use a config object, I did not propose
 it as the API impact (deprecating stuff and adding new stuff) is quite
 big... If we think it's an acceptable price to pay, I am ok with it
 though.
 
 I also do think, that `markAsPartitioned()` could actually be
 categorized as an operator... We don't expose it in the API as
 first-class citizen atm, but in fact we have two types of `KStream` -- a
 "PartitionedKStream" and a "NonPartitionedKStream". Thus,
 `markAsPartitioned()` can be seen as a "cast operator" that converts the
 one into the other.
 
 I also think that the 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-22 Thread Matthias J. Sax

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:

Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream stream = ...
KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()

We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==


it then has to be the case that



Partitioner.partition(key) == Partitioner.partition(map(key))



Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-16 Thread Matthias J. Sax
Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream stream = ...
KStream stream2 =
  stream.selectKey(/*set superkey*/)
.markAsPartitioned()

We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
> Hi Matthias and Sophie!
> 
> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
> 
> I don't have a strong opinion here, both Sophie's and Matthias' points
> look convincing for me.
> 
> I think we should estimate the following: what is the probability that
> we will ever need to extend `selectKey` etc. with a config for the
> purposes other than `markAsPartitioned`?
> 
> If we find this probability high, then it's just a refactoring to
> deprecate overloads with `Named` and introduce overloads with dedicated
> configs, and we should do it this way.
> 
> If it's low or zero, maybe it's better not to mess with the existing
> APIs and to introduce a single `markAsPartitioned()` method, which
> itself can be easily deprecated if we find a better solution later!
> 
> 
> ==2. The IQ problem==
> 
>> it then has to be the case that
> 
>> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> 
> Sophie, you got this wrong, and Matthias already explained why.
> 
> The actual required property for the mapping function is:
> 
> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
> 
> or, by contraposition law,
> 
> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
> 
> 
> (look at the whiteboard photo that I attached to the KIP).
> 
> There is a big class of such mappings: key -> Tuple(key, anyValue). This
> is actually what we often do before aggregation, and this mapping does
> not require repartition.
> 
> But of course we can extract the original key from Tuple(key, anyValue),
> and this can save IQ and joins!
> 
> This is what I'm talking about when I talk about 'CompositeKey' idea.
> 
> We can do the following:
> 
> 1. implement a 'partitioner wrapper' that recognizes tuples
> (CompositeKeys) and uses only the 'head' to calculate the partition,
> 
> 2. implement
> 
> selectCompositeKey(BiFunction tailSelector) {
>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>   //MARK_AS_PARTITIONED call here,
>   //but this call is an implementation detail and we do not expose
>   //markAsPartitioned publicly!   
> }
> 
> WDYT? (it's just a brainstorming idea)
> 
> 09.08.2021 2:38, Matthias J. Sax пишет:
>> Hi,
>>
>> I originally had a similar thought about `markAsPartitioned()` vs
>> extending `selectKey()` et al. with a config. While I agree that it
>> might be conceptually cleaner to use a config object, I did not propose
>> it as the API impact (deprecating stuff and adding new stuff) is quite
>> big... If we think it's an acceptable price to pay, I am ok with it
>> though.
>>
>> I also do think, that `markAsPartitioned()` could actually be
>> categorized as an operator... We don't expose it in the API as
>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>> one into the other.
>>
>> I also think that the raised concern about "forgetting to remove
>> `markAsPartitioned()`" might not be very strong though. If you have
>> different places in the code that link stuff together, a call to eg.
>> `selectKey().markAsPartitioned()` must always to together. If you have
>> some other place in the code that get a `KStream` passed an input, it
>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>> know anything about the upstream code. Of course, it requires some
>> "coding discipline" to follow this pattern... Also, you can shoot
>> themselves into the foot if they want 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-09 Thread Ivan Ponomarev

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points 
look convincing for me.


I think we should estimate the following: what is the probability that 
we will ever need to extend `selectKey` etc. with a config for the 
purposes other than `markAsPartitioned`?


If we find this probability high, then it's just a refactoring to 
deprecate overloads with `Named` and introduce overloads with dedicated 
configs, and we should do it this way.


If it's low or zero, maybe it's better not to mess with the existing 
APIs and to introduce a single `markAsPartitioned()` method, which 
itself can be easily deprecated if we find a better solution later!



==2. The IQ problem==

> it then has to be the case that

> Partitioner.partition(key) == Partitioner.partition(map(key))


Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This 
is actually what we often do before aggregation, and this mapping does 
not require repartition.


But of course we can extract the original key from Tuple(key, anyValue), 
and this can save IQ and joins!


This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples 
(CompositeKeys) and uses only the 'head' to calculate the partition,


2. implement

selectCompositeKey(BiFunction tailSelector) {
  selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
  //MARK_AS_PARTITIONED call here,
  //but this call is an implementation detail and we do not expose
  //markAsPartitioned publicly! 
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStream` passed in, you can skip repartitioning via
`selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
slightly prefer to add `markAsPartitioned()` as an operator.

(Maybe we should have expose a `PartitionedKStream` as first class
object to begin with... Hard to introduce now I guess...)


The concern about IQ is interesting -- I did not realize this impact.
Thanks for bringing it up.


a repartition would be a no-op, ie that the stream (and its partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))


@Sophie: I don't think this statement is correct. A `markAsPartition()`
only means, that the existing partitioning ensure that all messages of
the same new key are still in the same partition. Ie, it cannot happen
that two new keys (that are the same) are in a different partition.

However, if you would physically repartitiong on the new key using the
same hash-function as for the old key, there is no guarantee that the
same partitions would be picked... And that is why IQ breaks downstream.

Btw: using `markAsPartitioned()` could also be an issue for joins for
the same reason... I want to call out, that the Jira tickets that did
raise the concern about unnecessary repartitioning are about downstream
aggregations though...

Last but not least: we actually have a similar situation for
windowed-aggregations: The result of a window aggregation is partitioned
by the "plain 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-08 Thread Matthias J. Sax
Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStream` passed in, you can skip repartitioning via
`selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
slightly prefer to add `markAsPartitioned()` as an operator.

(Maybe we should have expose a `PartitionedKStream` as first class
object to begin with... Hard to introduce now I guess...)


The concern about IQ is interesting -- I did not realize this impact.
Thanks for bringing it up.

> a repartition would be a no-op, ie that the stream (and its partitioning)
> would be the same
> whether or not a repartition is inserted. For this to be true, it then has
> to be the case that
> 
> Partitioner.partition(key) == Partitioner.partition(map(key))

@Sophie: I don't think this statement is correct. A `markAsPartition()`
only means, that the existing partitioning ensure that all messages of
the same new key are still in the same partition. Ie, it cannot happen
that two new keys (that are the same) are in a different partition.

However, if you would physically repartitiong on the new key using the
same hash-function as for the old key, there is no guarantee that the
same partitions would be picked... And that is why IQ breaks downstream.

Btw: using `markAsPartitioned()` could also be an issue for joins for
the same reason... I want to call out, that the Jira tickets that did
raise the concern about unnecessary repartitioning are about downstream
aggregations though...

Last but not least: we actually have a similar situation for
windowed-aggregations: The result of a window aggregation is partitioned
by the "plain key": if we write the result into a topic using the same
partitioning function, we would write to different partitions... (I
guess it was never an issue so far, as we don't have KIP-300 in place
yet...)

It's also not an issue for IQ, because we know the plain key, and thus
can route to the right task.


About a solution: I think it might be ok to say we don't need to solve
this problem, but it's the users responsibility to take IQ into account.
Ie, if they want to use IQ downstream, the need to repartition: for this
case, repartitioning is _NOT_ unnecessary... The same argument seems to
apply for the join case I mentioned above. -- Given that
`markAsPartitioned()` is an advanced feature, it seems ok to leave it to
the user to use correctly (we should of course call it out in the docs!).



-Matthias



On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
> Before I dive in to the question of IQ and the approaches you proposed, can
> you just
> elaborate on the problem itself? By definition, the `markAsPartitioned`
> flag means that
> a repartition would be a no-op, ie that the stream (and its partitioning)
> would be the same
> whether or not a repartition is inserted. For this to be true, it then has
> to be the case that
> 
> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> The left-hand side of the above is precisely how we determine the partition
> number that
> a key belongs to when using IQ. It shouldn't matter whether the user is
> querying a key
> in a store upstream of the key-changing operation or a mapped key
> downstream of it
> -- either way we just apply the given Partitioner.
> 
> See StreamsMetadataState#getKeyQueryMetadataForKey
> 
> for where this happens
> 
> 
> If we're concerned that users might try to abuse the new
> `markAsPartitioned` feature,
> or accidentally misuse it, then we could add a runtime check that applies
> the Partitioner
> associated with 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-07 Thread Sophie Blee-Goldman
Before I dive in to the question of IQ and the approaches you proposed, can
you just
elaborate on the problem itself? By definition, the `markAsPartitioned`
flag means that
a repartition would be a no-op, ie that the stream (and its partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))

The left-hand side of the above is precisely how we determine the partition
number that
a key belongs to when using IQ. It shouldn't matter whether the user is
querying a key
in a store upstream of the key-changing operation or a mapped key
downstream of it
-- either way we just apply the given Partitioner.

See StreamsMetadataState#getKeyQueryMetadataForKey

for where this happens


If we're concerned that users might try to abuse the new
`markAsPartitioned` feature,
or accidentally misuse it, then we could add a runtime check that applies
the Partitioner
associated with that subtopology to the key being processed and the mapped
key result
to assert that they do indeed match. Imo this is probably overkill, just
putting it out there.

On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev 
wrote:

> Hi Sophie,
>
> thanks for your reply! So your proposal is:
>
> 1). For each key-changing operation, deprecate the existing overloads
> that accept a Named, and replace them with overloads that take an
> operator-specific config object.
> 2). Add `markAsPartitioned` flag to these configs.
>
> IMO, this looks much better than the original proposal, I like it very
> much and I think I will rewrite the KIP soon. I absolutely agree with
> your points. Repartition logic is not a part of the public contract, and
> it's much better to give it correct hints instead of telling explicitly
> what it should do.
>
> ...
>
> Since we're generating such bright ideas, maybe we should also
> brainstorm the interactive query problem?
>
> The problem is that interactive queries will not work properly when
> `markAsPartitioned` is used. Although original key and mapped key will
> be in the same partition, we will no longer be able to guess this
> partition given the mapped key only.
>
> The possible approaches are:
>
> 1) Give up and don't use interactive queries together with
> `markAsPartitioned`. This is what I suppose now. But can we do better?
>
> 2) Maybe we should ask the user to provide 'reverse mapping' that will
> allow IQ to restore the original key in order to choose the correct
> partition. We can place this mapping in our new configuration object. Of
> course, there is no way for KStreams to verify in compile time/startup
> time that the this function is actually the reverse mapping that extract
> the old key from the new one. Users will forget to provide this
> function. Users will provide wrong functions. This all looks too fragile.
>
> 3) Maybe there can be a completely different approach. Let's introduce a
> new entity -- composite keys, consisting of "head" and "tail". The
> partition for the composite key is calculated based on its 'head' value
> only. If we provide a key mapping in form key -> CompositeKey(key,
> tail), then it's obvious that we do not need a repartition. When an
> interactive query needs to guess the partition for CompositeKey, it just
> extracts its head and calculates the correct partition.
>
> We can select CompositeKey before groupByKey() and aggregation
> operations, and this will not involve repartition. And IQ will work.
>
> Is it too daring idea, WDYT? My concern: will it cover all the cases
> when we want to choose a different key, but also avoid repartition?
>
> Regards,
>
> Ivan
>
>
>
> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
> > Hey Ivan
> >
> > I completely agree that adding it as a config to Grouped/Joined/etc isn't
> > much better, I was just
> > listing it for completeness, and that I would prefer to make it a
> > configuration of the key-changing
> > operation itself -- that's what I meant by
> >
> > a better alternative might be to introduce this ... to the config object
> of
> >> the operator that's actually
> >
> > doing the key changing operation
> >
> >
> > I personally believe this is the semantically "correct" way to approach
> > this, since "preserves partitioning"
> > or "does not preserve partitioning" is a property of a key-changing
> > operation and not an operation on the
> > stream itself. Also, this way the user need only tell Streams which
> > operations do or do not preserve the
> > partitioning, and Streams can figure out where to insert a repartition in
> > the topology as it does today.
> >
> > Otherwise, we're rendering this particularly useful feature of the DSL --
> > automatic repartitioning -- pretty
> > much useless, since the user now has to figure out whether a 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-07 Thread Ivan Ponomarev

Hi Sophie,

thanks for your reply! So your proposal is:

1). For each key-changing operation, deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object.
2). Add `markAsPartitioned` flag to these configs.

IMO, this looks much better than the original proposal, I like it very 
much and I think I will rewrite the KIP soon. I absolutely agree with 
your points. Repartition logic is not a part of the public contract, and 
it's much better to give it correct hints instead of telling explicitly 
what it should do.


...

Since we're generating such bright ideas, maybe we should also 
brainstorm the interactive query problem?


The problem is that interactive queries will not work properly when 
`markAsPartitioned` is used. Although original key and mapped key will 
be in the same partition, we will no longer be able to guess this 
partition given the mapped key only.


The possible approaches are:

1) Give up and don't use interactive queries together with 
`markAsPartitioned`. This is what I suppose now. But can we do better?


2) Maybe we should ask the user to provide 'reverse mapping' that will 
allow IQ to restore the original key in order to choose the correct 
partition. We can place this mapping in our new configuration object. Of 
course, there is no way for KStreams to verify in compile time/startup 
time that the this function is actually the reverse mapping that extract 
the old key from the new one. Users will forget to provide this 
function. Users will provide wrong functions. This all looks too fragile.


3) Maybe there can be a completely different approach. Let's introduce a 
new entity -- composite keys, consisting of "head" and "tail". The 
partition for the composite key is calculated based on its 'head' value 
only. If we provide a key mapping in form key -> CompositeKey(key, 
tail), then it's obvious that we do not need a repartition. When an 
interactive query needs to guess the partition for CompositeKey, it just 
extracts its head and calculates the correct partition.


We can select CompositeKey before groupByKey() and aggregation 
operations, and this will not involve repartition. And IQ will work.


Is it too daring idea, WDYT? My concern: will it cover all the cases 
when we want to choose a different key, but also avoid repartition?


Regards,

Ivan



06.08.2021 23:19, Sophie Blee-Goldman пишет:

Hey Ivan

I completely agree that adding it as a config to Grouped/Joined/etc isn't
much better, I was just
listing it for completeness, and that I would prefer to make it a
configuration of the key-changing
operation itself -- that's what I meant by

a better alternative might be to introduce this ... to the config object of

the operator that's actually


doing the key changing operation


I personally believe this is the semantically "correct" way to approach
this, since "preserves partitioning"
or "does not preserve partitioning" is a property of a key-changing
operation and not an operation on the
stream itself. Also, this way the user need only tell Streams which
operations do or do not preserve the
partitioning, and Streams can figure out where to insert a repartition in
the topology as it does today.

Otherwise, we're rendering this particularly useful feature of the DSL --
automatic repartitioning -- pretty
much useless, since the user now has to figure out whether a repartition is
needed. On top of that, they
need to have some understanding of where and when this internal automatic
repartitioning logic is going
to insert that repartition in order to cancel it in the appropriate place.
Which is pretty unfortunate, since
that logic is not part of the public contract: it can change at any time,
for example as it did when we introduced
the repartition merging optimization.

All that said, those are valid concerns regarding the expansion of the
API's surface area. Since none of
the key-changing operations currently have a config object like some other
operations (for example Grouped
or Consumed, etc), this would double the number of overloads. But maybe
this is a good opportunity to fix
that problem, rather than keep digging ourselves into holes by trying to
work around it.

It looks like all of those key-changing operations have two overloads at
the moment, one with no parameters
beyond the operation itself (eg KeyValueMapper for #selectKey) and the
other with an additional Named
parameter, which is itself another kind of configuration. What if we
instead deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object like we do
elsewhere (eg Grouped for #groupByKey). Then we can have both Named and
this  `markAsPartitioned` flag
be part of the general config object, which (a) does not expand the API
surface area at all in this KIP, and (b)
also protects future KIPs from needing to have this same conversation over
and over, because we can now

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-06 Thread Sophie Blee-Goldman
Hey Ivan

I completely agree that adding it as a config to Grouped/Joined/etc isn't
much better, I was just
listing it for completeness, and that I would prefer to make it a
configuration of the key-changing
operation itself -- that's what I meant by

a better alternative might be to introduce this ... to the config object of
> the operator that's actually

doing the key changing operation


I personally believe this is the semantically "correct" way to approach
this, since "preserves partitioning"
or "does not preserve partitioning" is a property of a key-changing
operation and not an operation on the
stream itself. Also, this way the user need only tell Streams which
operations do or do not preserve the
partitioning, and Streams can figure out where to insert a repartition in
the topology as it does today.

Otherwise, we're rendering this particularly useful feature of the DSL --
automatic repartitioning -- pretty
much useless, since the user now has to figure out whether a repartition is
needed. On top of that, they
need to have some understanding of where and when this internal automatic
repartitioning logic is going
to insert that repartition in order to cancel it in the appropriate place.
Which is pretty unfortunate, since
that logic is not part of the public contract: it can change at any time,
for example as it did when we introduced
the repartition merging optimization.

All that said, those are valid concerns regarding the expansion of the
API's surface area. Since none of
the key-changing operations currently have a config object like some other
operations (for example Grouped
or Consumed, etc), this would double the number of overloads. But maybe
this is a good opportunity to fix
that problem, rather than keep digging ourselves into holes by trying to
work around it.

It looks like all of those key-changing operations have two overloads at
the moment, one with no parameters
beyond the operation itself (eg KeyValueMapper for #selectKey) and the
other with an additional Named
parameter, which is itself another kind of configuration. What if we
instead deprecate the existing overloads
that accept a Named, and replace them with overloads that take an
operator-specific config object like we do
elsewhere (eg Grouped for #groupByKey). Then we can have both Named and
this  `markAsPartitioned` flag
be part of the general config object, which (a) does not expand the API
surface area at all in this KIP, and (b)
also protects future KIPs from needing to have this same conversation over
and over, because we can now
stick any additional operator properties into that same config object.

WDYT? By the way, the above idea (introducing a single config object to
wrap all operator properties) was also
raised by John Roesler a while back. Let's hope he hasn't changed his mind
since then :)


On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev 
wrote:

> Hi Matthias,
>
> Concerning the naming: I like `markAsPartitioned`, because it describes
> what this operation is actually doing!
>
> Hi Sophie,
>
> I see the concern about poor code cohesion. We declare key mapping in
> one place of code, then later in another place we say
> "markAsPartitioned()". When we change the code six months later, we
> might forget to remove markAsPartitioned(), especially if it's placed in
> another method or class. But I don't understand why do you propose to
> include this config into Grouped/Joined/StreamJoined, because from this
> point of view it's not a better solution?
>
> The best approach regarding the cohesion might be to to add an extra
> 'preservePartition' flag to every key-changing operation, that is
>
> 1) selectKey
> 2) map
> 3) flatMap
> 4) transform
> 5) flatTransform
>
> in order to tell if the provided mapping require repartition or not.
> Indeed, this is a mapping operation property, not grouping one! BTW: the
> idea of adding extra parameter to `selectKey` was once coined by John
> Roesler.
>
> Arguments in favour for this approach: 1) better code cohesion from the
> point of view of the user, 2) 'smarter' code (the decision is taken
> depending on metadata provided for all the upstream mappings), 3)
> overall safer for the user.
>
> Arguments against: invasive KStreams API change, 5 more method
> overloads. Further on, when we add a new key-changing operation to
> KStream, we must add an overloaded version with 'preservePartition'.
> When we add a new overloaded version for existing operation, we actually
> might need to add two or more overloaded versions. This will soon become
> a mess.
>
> I thought that since `markAsPartitioned` is intended for advanced users,
> they will use it with care. When you're in a position where every
> serialization/deserialization round matters for the latency, you're
> extremely careful with the topology and you will not thoughtlessly add
> new key-changing operations without controlling how it's going to change
> the overall topology.
>
> By the way, if we later find a better solution, 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-06 Thread Ivan Ponomarev

Hi Matthias,

Concerning the naming: I like `markAsPartitioned`, because it describes 
what this operation is actually doing!


Hi Sophie,

I see the concern about poor code cohesion. We declare key mapping in 
one place of code, then later in another place we say 
"markAsPartitioned()". When we change the code six months later, we 
might forget to remove markAsPartitioned(), especially if it's placed in 
another method or class. But I don't understand why do you propose to 
include this config into Grouped/Joined/StreamJoined, because from this 
point of view it's not a better solution?


The best approach regarding the cohesion might be to to add an extra 
'preservePartition' flag to every key-changing operation, that is


1) selectKey
2) map
3) flatMap
4) transform
5) flatTransform

in order to tell if the provided mapping require repartition or not. 
Indeed, this is a mapping operation property, not grouping one! BTW: the 
idea of adding extra parameter to `selectKey` was once coined by John 
Roesler.


Arguments in favour for this approach: 1) better code cohesion from the 
point of view of the user, 2) 'smarter' code (the decision is taken 
depending on metadata provided for all the upstream mappings), 3) 
overall safer for the user.


Arguments against: invasive KStreams API change, 5 more method 
overloads. Further on, when we add a new key-changing operation to 
KStream, we must add an overloaded version with 'preservePartition'. 
When we add a new overloaded version for existing operation, we actually 
might need to add two or more overloaded versions. This will soon become 
a mess.


I thought that since `markAsPartitioned` is intended for advanced users, 
they will use it with care. When you're in a position where every 
serialization/deserialization round matters for the latency, you're 
extremely careful with the topology and you will not thoughtlessly add 
new key-changing operations without controlling how it's going to change 
the overall topology.


By the way, if we later find a better solution, it's way more easy to 
deprecate a single `markAsPartitioned` operation than 5 method overloads.


What do you think?




04.08.2021 4:23, Sophie Blee-Goldman пишет:

Do we really need a whole DSL operator for this? I think the original name
for this
operator -- `cancelRepartition()` -- is itself a sign that this is not an
operation on the
stream itself but rather a command/request to whichever operator would have
otherwise triggered this repartition.

What about instead adding a new field to the Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then need to
specify
this for every stateful operation downstream of the key-changing operation.
So a
better alternative might be to introduce this `skipRepartition` field, or
whatever we
want to call it, to the config object of the operator that's actually doing
the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal, as the user
has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several key
changing
operations at the beginning which do require repartitioning. Then you get
to the end
of the topology and insert one final key changing operation that doesn't,
assume
you can just cancel the repartition, and suddenly you're wondering why your
results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax  wrote:


Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control, and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

  - markAsPartitioned()
  - markAsKeyed()
  - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:

Hello,

I'd like to start a discussion for KIP-759:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling



This is an offshoot of the discussion of KIP-655 for a `distinct`
operator, which turned out to be a separate proposal.

The proposal is quite trivial, however, we still might consider
alternatives (see 'Possible Alternatives' section).

Regards,

Ivan








Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-03 Thread Sophie Blee-Goldman
Do we really need a whole DSL operator for this? I think the original name
for this
operator -- `cancelRepartition()` -- is itself a sign that this is not an
operation on the
stream itself but rather a command/request to whichever operator would have
otherwise triggered this repartition.

What about instead adding a new field to the Grouped/Joined/StreamJoined
config
objects that signals them to skip the repartitioning?

The one downside to this specific proposal is that you would then need to
specify
this for every stateful operation downstream of the key-changing operation.
So a
better alternative might be to introduce this `skipRepartition` field, or
whatever we
want to call it, to the config object of the operator that's actually doing
the key
changing operation which is apparently preserving the partitioning.

Imo this would be more "safe" relative to the current proposal, as the user
has to
explicitly consider whether every key changing operation is indeed
preserving the
partitioning. Otherwise you could code up a topology with several key
changing
operations at the beginning which do require repartitioning. Then you get
to the end
of the topology and insert one final key changing operation that doesn't,
assume
you can just cancel the repartition, and suddenly you're wondering why your
results
are all screwed up

On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax  wrote:

> Thanks for the KIP Ivan!
>
> I think it's a good feature to give advanced users more control, and
> allow them to build more efficient application.
>
> Not sure if I like the proposed named though (the good old "naming
> things" discussion :))
>
> Did you consider alternatives? What about
>
>  - markAsPartitioned()
>  - markAsKeyed()
>  - skipRepartition()
>
> Not sure if there are other idea on a good name?
>
>
>
> -Matthias
>
> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
> > Hello,
> >
> > I'd like to start a discussion for KIP-759:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> >
> >
> > This is an offshoot of the discussion of KIP-655 for a `distinct`
> > operator, which turned out to be a separate proposal.
> >
> > The proposal is quite trivial, however, we still might consider
> > alternatives (see 'Possible Alternatives' section).
> >
> > Regards,
> >
> > Ivan
>


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-03 Thread Matthias J. Sax
Thanks for the KIP Ivan!

I think it's a good feature to give advanced users more control, and
allow them to build more efficient application.

Not sure if I like the proposed named though (the good old "naming
things" discussion :))

Did you consider alternatives? What about

 - markAsPartitioned()
 - markAsKeyed()
 - skipRepartition()

Not sure if there are other idea on a good name?



-Matthias

On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
> Hello,
> 
> I'd like to start a discussion for KIP-759:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
> 
> 
> This is an offshoot of the discussion of KIP-655 for a `distinct`
> operator, which turned out to be a separate proposal.
> 
> The proposal is quite trivial, however, we still might consider
> alternatives (see 'Possible Alternatives' section).
> 
> Regards,
> 
> Ivan


[DISCUSS] KIP-759: Unneeded repartition canceling

2021-06-24 Thread Ivan Ponomarev

Hello,

I'd like to start a discussion for KIP-759: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling


This is an offshoot of the discussion of KIP-655 for a `distinct` 
operator, which turned out to be a separate proposal.


The proposal is quite trivial, however, we still might consider 
alternatives (see 'Possible Alternatives' section).


Regards,

Ivan