Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-30 Thread Walker Carlson
Hey Bruno,

Thanks for the feedback. Sorry for the late reply, I was hoping others
might weigh in as well and it got away from me.

a) I like this but I think we should separate this out. This kip has
already dragged on more than it should and I think that is a big enough
change to get done by itself.

b) I'm a bit resistant to adding a new type of processor or store for this
change. It feels excessively complicated for what should be a small change.
I think that it might be good but I don't want to expand the scope more
than absolutely necessary here.

best,
walker

On Wed, Apr 10, 2024 at 4:34 AM Bruno Cadonna  wrote:

> Hi Walker,
>
> Thanks for the updates!
>
>
> (1) While I like naming the methods differently, I have also to say that
> I do not like addIsomorphicGlobalStore() because it does not really tell
> what the method does. I could also not come up with a better name than
> addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on
> which I would like to have your opinion.
>
> (a) Add a new GlobalStoreBuilder in which users can set if the global
> state store should reprocess on restore. Additionally, to the option to
> enable or disable reprocessing on restore, you could also NOT offer a
> way to enable or disable logging in the GlobalStoreBuilder. Currently,
> if users enable logging for a store builder that they pass into
> addGlobalStore(), Kafka Streams needs to explicitly disable it again,
> which is not ideal.
>
> (b) Add a new GlobalProcessorSupplier in which users can set if the
> global state store should reprocess on restore. Another ugliness that
> could be fixed with this is passing Void, Void to ProcessorSupplier. The
> GlobalProcessorSupplier would just have two type parameters .
> The nice aspect of this idea is that the option to enable/disable
> reprocessing on restore is only needed when a processor supplier is
> passed into the methods. That is not true for idea (a).
>
>
> (2) Yes, that was my intent.
>
>
> Best,
> Bruno
>
> On 4/9/24 9:33 PM, Walker Carlson wrote:
> > Hey all,
> >
> > (1) no I hadn't considered just naming the methods differently. I
> actually
> > really like this idea and am for it. Except we need 3 different methods
> > now. One for no processor, one for a processor that should restore and
> one
> > that reprocesses. How about `addCustomGlobalStore` and
> > `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
> > processor case? If everyone likes that I can add that to the KIP and
> rename
> > the methods.
> >
> > (2) we can have the the built in case use StoreBuilder > KeyValueStore> and manually check for the TimestampedKeyValueStore. That
> is
> > fine with me.
> >
> > Bruno I hope that was what you were intending.
> >
> > (3) For the scala api, do we need to make it match the java api or are we
> > just making the minimum changes? as if we take point 1 I don't know how
> > much we need to change.
> >
> > Thanks,
> > Walker
> >
> >
> > On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:
> >
> >> One more thing:
> >>
> >> I was just looking into the WIP PR, and it seems we will also need to
> >> change `StreamsBuilder.scala`. The KIP needs to cover this changes as
> well.
> >>
> >>
> >> -Matthias
> >>
> >> On 4/1/24 10:33 PM, Bruno Cadonna wrote:
> >>> Hi Walker and Matthias,
> >>>
> >>> (2)
> >>> That is exactly my point about having a compile time error versus a
> >>> runtime error. The added flexibility as proposed by Matthias sounds
> good
> >>> to me.
> >>>
> >>> Regarding the Named parameter, I was not aware that the processor that
> >>> writes records to the global state store is named according to the name
> >>> passed in by Consumed. I thought Consumed strictly specifies the names
> >>> of source processors. So I am fine with not having an overload with a
> >>> Named parameter.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 3/31/24 11:30 AM, Matthias J. Sax wrote:
>  Two more follow up thoughts:
> 
>  (1) I am still not a big fan of the boolean parameter we introduce.
>  Did you consider to use different method names, like
>  `addReadOnlyGlobalStore()` (for the optimized method, that would not
>  reprocess data on restore), and maybe add `addModifiableGlobalStore()`
>  (not a good name, but we cannot re-use existing `addGlobalStore()` --
>  maybe somebody else has a good idea about a better `addXxxGlobalStore`
>  that would describe it well).
> 
>  (2) I was thinking about Bruno's comment to limit the scope the store
>  builder for the optimized case. I think we should actually do
>  something about it, because in the end, the runtime (ie, the
>  `Processor` we hard wire) would need to pick a store it supports and
>  cast to the corresponding store? If the cast fails, we hit a runtime
>  exception, but by putting the store we cast to into the signature we
>  can actually convert it into a compile time error what seems better.
>  -- If we 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-10 Thread Bruno Cadonna

Hi Walker,

Thanks for the updates!


(1) While I like naming the methods differently, I have also to say that 
I do not like addIsomorphicGlobalStore() because it does not really tell 
what the method does. I could also not come up with a better name than 
addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on 
which I would like to have your opinion.


(a) Add a new GlobalStoreBuilder in which users can set if the global 
state store should reprocess on restore. Additionally, to the option to 
enable or disable reprocessing on restore, you could also NOT offer a 
way to enable or disable logging in the GlobalStoreBuilder. Currently, 
if users enable logging for a store builder that they pass into 
addGlobalStore(), Kafka Streams needs to explicitly disable it again, 
which is not ideal.


(b) Add a new GlobalProcessorSupplier in which users can set if the 
global state store should reprocess on restore. Another ugliness that 
could be fixed with this is passing Void, Void to ProcessorSupplier. The 
GlobalProcessorSupplier would just have two type parameters . 
The nice aspect of this idea is that the option to enable/disable 
reprocessing on restore is only needed when a processor supplier is 
passed into the methods. That is not true for idea (a).



(2) Yes, that was my intent.


Best,
Bruno

On 4/9/24 9:33 PM, Walker Carlson wrote:

Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:


One more thing:

I was just looking into the WIP PR, and it seems we will also need to
change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.


-Matthias

On 4/1/24 10:33 PM, Bruno Cadonna wrote:

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a
runtime error. The added flexibility as proposed by Matthias sounds good
to me.

Regarding the Named parameter, I was not aware that the processor that
writes records to the global state store is named according to the name
passed in by Consumed. I thought Consumed strictly specifies the names
of source processors. So I am fine with not having an overload with a
Named parameter.

Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce.
Did you consider to use different method names, like
`addReadOnlyGlobalStore()` (for the optimized method, that would not
reprocess data on restore), and maybe add `addModifiableGlobalStore()`
(not a good name, but we cannot re-use existing `addGlobalStore()` --
maybe somebody else has a good idea about a better `addXxxGlobalStore`
that would describe it well).

(2) I was thinking about Bruno's comment to limit the scope the store
builder for the optimized case. I think we should actually do
something about it, because in the end, the runtime (ie, the
`Processor` we hard wire) would need to pick a store it supports and
cast to the corresponding store? If the cast fails, we hit a runtime
exception, but by putting the store we cast to into the signature we
can actually convert it into a compile time error what seems better.
-- If we want, we could make it somewhat flexible and support both
`KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
would be `KeyValueStore` but we explicitly check if the builder gives
us a `TimestampedKeyValueStore` instance and use it properly.

If putting the signature does not work for some reason, we should at
least clearly call it out in the JavaDocs what store type is expected.



-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken
care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to
me to
put it here. The store in the Store builder is already named through
what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter

(in

the DSL) 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-09 Thread Walker Carlson
Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:

> One more thing:
>
> I was just looking into the WIP PR, and it seems we will also need to
> change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.
>
>
> -Matthias
>
> On 4/1/24 10:33 PM, Bruno Cadonna wrote:
> > Hi Walker and Matthias,
> >
> > (2)
> > That is exactly my point about having a compile time error versus a
> > runtime error. The added flexibility as proposed by Matthias sounds good
> > to me.
> >
> > Regarding the Named parameter, I was not aware that the processor that
> > writes records to the global state store is named according to the name
> > passed in by Consumed. I thought Consumed strictly specifies the names
> > of source processors. So I am fine with not having an overload with a
> > Named parameter.
> >
> > Best,
> > Bruno
> >
> > On 3/31/24 11:30 AM, Matthias J. Sax wrote:
> >> Two more follow up thoughts:
> >>
> >> (1) I am still not a big fan of the boolean parameter we introduce.
> >> Did you consider to use different method names, like
> >> `addReadOnlyGlobalStore()` (for the optimized method, that would not
> >> reprocess data on restore), and maybe add `addModifiableGlobalStore()`
> >> (not a good name, but we cannot re-use existing `addGlobalStore()` --
> >> maybe somebody else has a good idea about a better `addXxxGlobalStore`
> >> that would describe it well).
> >>
> >> (2) I was thinking about Bruno's comment to limit the scope the store
> >> builder for the optimized case. I think we should actually do
> >> something about it, because in the end, the runtime (ie, the
> >> `Processor` we hard wire) would need to pick a store it supports and
> >> cast to the corresponding store? If the cast fails, we hit a runtime
> >> exception, but by putting the store we cast to into the signature we
> >> can actually convert it into a compile time error what seems better.
> >> -- If we want, we could make it somewhat flexible and support both
> >> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
> >> would be `KeyValueStore` but we explicitly check if the builder gives
> >> us a `TimestampedKeyValueStore` instance and use it properly.
> >>
> >> If putting the signature does not work for some reason, we should at
> >> least clearly call it out in the JavaDocs what store type is expected.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/28/24 5:05 PM, Walker Carlson wrote:
> >>> Hey all,
> >>>
> >>> Thanks for the feedback Bruno, Almog and Matthias!
> >>>
> >>> Almog: I like the idea, but I agree with Matthais. I actually looked at
> >>> that ticket a bit when doing this and found that while similar they are
> >>> actually pretty unrelated codewise. I would love to see it get taken
> >>> care
> >>> of.
> >>>
> >>> Bruno and Matthias: The Named parameter doesn't really make sense to
> >>> me to
> >>> put it here. The store in the Store builder is already named through
> >>> what
> >>> Matthais described and the processor doesn't actually have a name. That
> >>> would be the processor node that gets named via the Named parameter
> (in
> >>> the DSL) and the internal streams builder uses the consumed object to
> >>> make
> >>> a source name. I think we should just keep the Consumed object and used
> >>> that for the processor node name.
> >>>
> >>> As for the limitation of the store builder interface I don't think it
> is
> >>> necessary. It could be something we add later if we really want to.
> >>>
> >>> Anyways I think we are getting close enough to consensus that I'm
> >>> going to
> >>> open a vote and hopefully we can get it voted on soon!
> >>>
> >>> best,
> >>> Walker
> >>>
> >>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax 
> >>> wrote:
> >>>
>  Hey,
> 
>  looking into the API, I am wondering why we would need to add an
>  overload talking a `Named` parameter?
> 
>  StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes
> a
>  `Consumed` parameter that allows to set a name.
> 
> 
> > 2.
> > I do not understand what you mean with "maximum flexibility". The
>  built-in processor needs to assume a given state store 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-02 Thread Matthias J. Sax

One more thing:

I was just looking into the WIP PR, and it seems we will also need to 
change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.



-Matthias

On 4/1/24 10:33 PM, Bruno Cadonna wrote:

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a 
runtime error. The added flexibility as proposed by Matthias sounds good 
to me.


Regarding the Named parameter, I was not aware that the processor that 
writes records to the global state store is named according to the name 
passed in by Consumed. I thought Consumed strictly specifies the names 
of source processors. So I am fine with not having an overload with a 
Named parameter.


Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce. 
Did you consider to use different method names, like 
`addReadOnlyGlobalStore()` (for the optimized method, that would not 
reprocess data on restore), and maybe add `addModifiableGlobalStore()` 
(not a good name, but we cannot re-use existing `addGlobalStore()` -- 
maybe somebody else has a good idea about a better `addXxxGlobalStore` 
that would describe it well).


(2) I was thinking about Bruno's comment to limit the scope the store 
builder for the optimized case. I think we should actually do 
something about it, because in the end, the runtime (ie, the 
`Processor` we hard wire) would need to pick a store it supports and 
cast to the corresponding store? If the cast fails, we hit a runtime 
exception, but by putting the store we cast to into the signature we 
can actually convert it into a compile time error what seems better. 
-- If we want, we could make it somewhat flexible and support both 
`KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature 
would be `KeyValueStore` but we explicitly check if the builder gives 
us a `TimestampedKeyValueStore` instance and use it properly.


If putting the signature does not work for some reason, we should at 
least clearly call it out in the JavaDocs what store type is expected.




-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken 
care

of.

Bruno and Matthias: The Named parameter doesn't really make sense to 
me to
put it here. The store in the Store builder is already named through 
what

Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to 
make

a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm 
going to

open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  
wrote:



Hey,

looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?

StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.



2.
I do not understand what you mean with "maximum flexibility". The

built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that 
interface. If

they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile 
time.

Let me know whether I misunderstood something.

Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to 
always

require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we 
always

need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.



@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this 
KIP,

the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.

Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the end, we need to skip 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-01 Thread Bruno Cadonna

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a 
runtime error. The added flexibility as proposed by Matthias sounds good 
to me.


Regarding the Named parameter, I was not aware that the processor that 
writes records to the global state store is named according to the name 
passed in by Consumed. I thought Consumed strictly specifies the names 
of source processors. So I am fine with not having an overload with a 
Named parameter.


Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce. Did 
you consider to use different method names, like 
`addReadOnlyGlobalStore()` (for the optimized method, that would not 
reprocess data on restore), and maybe add `addModifiableGlobalStore()` 
(not a good name, but we cannot re-use existing `addGlobalStore()` -- 
maybe somebody else has a good idea about a better `addXxxGlobalStore` 
that would describe it well).


(2) I was thinking about Bruno's comment to limit the scope the store 
builder for the optimized case. I think we should actually do something 
about it, because in the end, the runtime (ie, the `Processor` we hard 
wire) would need to pick a store it supports and cast to the 
corresponding store? If the cast fails, we hit a runtime exception, but 
by putting the store we cast to into the signature we can actually 
convert it into a compile time error what seems better. -- If we want, 
we could make it somewhat flexible and support both `KeyValueStore` and 
`TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` 
but we explicitly check if the builder gives us a 
`TimestampedKeyValueStore` instance and use it properly.


If putting the signature does not work for some reason, we should at 
least clearly call it out in the JavaDocs what store type is expected.




-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to 
me to

put it here. The store in the Store builder is already named through what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to 
make

a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm 
going to

open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  wrote:


Hey,

looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?

StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.



2.
I do not understand what you mean with "maximum flexibility". The

built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that 
interface. If

they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile time.
Let me know whether I misunderstood something.

Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to always
require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we always
need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.



@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this KIP,
the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.

Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the end, we need to skip records during restore, and it
seems it does not make sense to make this configurable?



-Matthias


On 3/26/24 4:24 PM, Almog Gavra wrote:

Thanks for the thoughts Bruno!


Do you mean a API to configure restoration instead of boolean 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-31 Thread Matthias J. Sax

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce. Did 
you consider to use different method names, like 
`addReadOnlyGlobalStore()` (for the optimized method, that would not 
reprocess data on restore), and maybe add `addModifiableGlobalStore()` 
(not a good name, but we cannot re-use existing `addGlobalStore()` -- 
maybe somebody else has a good idea about a better `addXxxGlobalStore` 
that would describe it well).


(2) I was thinking about Bruno's comment to limit the scope the store 
builder for the optimized case. I think we should actually do something 
about it, because in the end, the runtime (ie, the `Processor` we hard 
wire) would need to pick a store it supports and cast to the 
corresponding store? If the cast fails, we hit a runtime exception, but 
by putting the store we cast to into the signature we can actually 
convert it into a compile time error what seems better. -- If we want, 
we could make it somewhat flexible and support both `KeyValueStore` and 
`TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` 
but we explicitly check if the builder gives us a 
`TimestampedKeyValueStore` instance and use it properly.


If putting the signature does not work for some reason, we should at 
least clearly call it out in the JavaDocs what store type is expected.




-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to me to
put it here. The store in the Store builder is already named through what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to make
a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm going to
open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  wrote:


Hey,

looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?

StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.



2.
I do not understand what you mean with "maximum flexibility". The

built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that interface. If
they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile time.
Let me know whether I misunderstood something.

Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to always
require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we always
need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.



@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this KIP,
the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.

Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the end, we need to skip records during restore, and it
seems it does not make sense to make this configurable?



-Matthias


On 3/26/24 4:24 PM, Almog Gavra wrote:

Thanks for the thoughts Bruno!


Do you mean a API to configure restoration instead of boolean flag

reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't have any
concrete suggestions). It feels like that would give the flexibility to

do

things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now

and

let the discussion 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-28 Thread Walker Carlson
Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to me to
put it here. The store in the Store builder is already named through what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to make
a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm going to
open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  wrote:

> Hey,
>
> looking into the API, I am wondering why we would need to add an
> overload talking a `Named` parameter?
>
> StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
> `Consumed` parameter that allows to set a name.
>
>
> > 2.
> > I do not understand what you mean with "maximum flexibility". The
> built-in processor needs to assume a given state store interface. That
> means, users have to provide a state store that offers that interface. If
> they do not they will get a runtime exception. If we require a store
> builder for a given interface, we can catch the mistake at compile time.
> Let me know whether I misunderstood something.
>
> Yes, we could catch it at runtime. But I guess what I was trying to say
> is different: I was trying to say, we should not limit the API to always
> require a specific store, such that global stores can only be of a
> certain type. Global Stores should be allowed to be of any type. Hence,
> if we add a built-in processor, it can only be one option, and we always
> need to support custom processor, and might also want to try to allow
> the restore optimization for custom processor (and thus other store
> types), not just for our built-in processor (and our built-in stores).
> Coupling the optimization to built-in stores would prevent us to apply
> the optimization to custom stores.
>
>
>
> @Almog: interesting idea. I tend to think that both issues are
> orthogonal. If users pick to apply the optimization "added" by this KIP,
> the bug you mentioned would still apply to global stores, and thus this
> KIP is not addressing the issue you mentioned.
>
> Personally, I also think that we don't need a KIP to fix the ticket you
> mentioned? In the end, we need to skip records during restore, and it
> seems it does not make sense to make this configurable?
>
>
>
> -Matthias
>
>
> On 3/26/24 4:24 PM, Almog Gavra wrote:
> > Thanks for the thoughts Bruno!
> >
> >> Do you mean a API to configure restoration instead of boolean flag
> > reprocessOnRestore?
> >
> > Yes, this is exactly the type of thing I was musing (but I don't have any
> > concrete suggestions). It feels like that would give the flexibility to
> do
> > things like the motivation section of the KIP (allow bulk loading of
> > records without reprocessing) while also solving other limitations.
> >
> > I'm supportive of the KIP as-is but was hoping somebody with more
> > experience would have a sudden inspiration for how to solve both issues
> > with one API! Anyway, I'll slide back into the lurking shadows for now
> and
> > let the discussion continue :)
> >
> > Cheers,
> > Almog
> >
> > On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Almog,
> >>
> >> Do you mean a API to configure restoration instead of boolean flag
> >> reprocessOnRestore?
> >>
> >> Do you already have an idea?
> >>
> >> The proposal in the KIP is focused on the processor that updates the
> >> global state whereas in the case of GlobalKTable and source KTable the
> >> issues lies in the deserialization of records from the input topics, but
> >> only if the deserialization error handler is configured to drop the
> >> problematic record. Additionally, for source KTable the source topic
> >> optimization must be turned on to run into the issue. I am wondering how
> >> a unified API for global stores, GlobalKTable, and source KTable might
> >> look like.
> >>
> >> While it is an interesting question, I am in favor of deferring this to
> >> a separate KIP.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 3/26/24 12:49 AM, Almog Gavra wrote:
> >>> Hello Folk!
> >>>
> >>> Glad to see improvements to the GlobalKTables in discussion! I think
> they
> >>> deserve more love :)
> >>>
> >>> Scope creep alert (which I'm generally against and certainly still
> >> support
> >>> this KIP without but I want to see if there's an elegant way to address
> 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-28 Thread Matthias J. Sax

Hey,

looking into the API, I am wondering why we would need to add an 
overload talking a `Named` parameter?


StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a 
`Consumed` parameter that allows to set a name.




2.
I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. 


Yes, we could catch it at runtime. But I guess what I was trying to say 
is different: I was trying to say, we should not limit the API to always 
require a specific store, such that global stores can only be of a 
certain type. Global Stores should be allowed to be of any type. Hence, 
if we add a built-in processor, it can only be one option, and we always 
need to support custom processor, and might also want to try to allow 
the restore optimization for custom processor (and thus other store 
types), not just for our built-in processor (and our built-in stores). 
Coupling the optimization to built-in stores would prevent us to apply 
the optimization to custom stores.




@Almog: interesting idea. I tend to think that both issues are 
orthogonal. If users pick to apply the optimization "added" by this KIP, 
the bug you mentioned would still apply to global stores, and thus this 
KIP is not addressing the issue you mentioned.


Personally, I also think that we don't need a KIP to fix the ticket you 
mentioned? In the end, we need to skip records during restore, and it 
seems it does not make sense to make this configurable?




-Matthias


On 3/26/24 4:24 PM, Almog Gavra wrote:

Thanks for the thoughts Bruno!


Do you mean a API to configure restoration instead of boolean flag

reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't have any
concrete suggestions). It feels like that would give the flexibility to do
things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now and
let the discussion continue :)

Cheers,
Almog

On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna  wrote:


Hi Almog,

Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?

Do you already have an idea?

The proposal in the KIP is focused on the processor that updates the
global state whereas in the case of GlobalKTable and source KTable the
issues lies in the deserialization of records from the input topics, but
only if the deserialization error handler is configured to drop the
problematic record. Additionally, for source KTable the source topic
optimization must be turned on to run into the issue. I am wondering how
a unified API for global stores, GlobalKTable, and source KTable might
look like.

While it is an interesting question, I am in favor of deferring this to
a separate KIP.

Best,
Bruno

On 3/26/24 12:49 AM, Almog Gavra wrote:

Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think they
deserve more love :)

Scope creep alert (which I'm generally against and certainly still

support

this KIP without but I want to see if there's an elegant way to address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier"

which

I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the source-changelog
optimization. Since this API could be used to signal "I want to reprocess
on restore" I'm wondering whether it makes sense to design this API in a
way that could be extended for KTables as well so a fix for KAFKA-8037
would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:


Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't

match

the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but

then

they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna 

wrote:



Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-26 Thread Almog Gavra
Thanks for the thoughts Bruno!

> Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't have any
concrete suggestions). It feels like that would give the flexibility to do
things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now and
let the discussion continue :)

Cheers,
Almog

On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna  wrote:

> Hi Almog,
>
> Do you mean a API to configure restoration instead of boolean flag
> reprocessOnRestore?
>
> Do you already have an idea?
>
> The proposal in the KIP is focused on the processor that updates the
> global state whereas in the case of GlobalKTable and source KTable the
> issues lies in the deserialization of records from the input topics, but
> only if the deserialization error handler is configured to drop the
> problematic record. Additionally, for source KTable the source topic
> optimization must be turned on to run into the issue. I am wondering how
> a unified API for global stores, GlobalKTable, and source KTable might
> look like.
>
> While it is an interesting question, I am in favor of deferring this to
> a separate KIP.
>
> Best,
> Bruno
>
> On 3/26/24 12:49 AM, Almog Gavra wrote:
> > Hello Folk!
> >
> > Glad to see improvements to the GlobalKTables in discussion! I think they
> > deserve more love :)
> >
> > Scope creep alert (which I'm generally against and certainly still
> support
> > this KIP without but I want to see if there's an elegant way to address
> > both problems). The KIP mentions that "Now the restore is done by
> > reprocessing using an instance from the customer processor supplier"
> which
> > I suppose fixed a long-standing bug (
> > https://issues.apache.org/jira/browse/KAFKA-8037) but only for
> > GlobalKTables and not for normal KTables that use the source-changelog
> > optimization. Since this API could be used to signal "I want to reprocess
> > on restore" I'm wondering whether it makes sense to design this API in a
> > way that could be extended for KTables as well so a fix for KAFKA-8037
> > would be possible with the same mechanism. Thoughts?
> >
> > Cheers,
> > Almog
> >
> > On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
> >  wrote:
> >
> >> Hey Bruno,
> >>
> >> 1) I'm actually not sure why that is in there. It certainly doesn't
> match
> >> the convention. Best to remove it and match the other methods.
> >>
> >> 2) Yeah, I thought about it but I'm not convinced it is a necessary
> >> restriction. It might be useful for the already defined processors but
> then
> >> they might as well use the `globalTable` method. I think the add state
> >> store option should go for maximum flexibility.
> >>
> >> Best,
> >> Walker
> >>
> >>
> >>
> >> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna 
> wrote:
> >>
> >>> Hi Walker,
> >>>
> >>> A couple of follow-up questions.
> >>>
> >>> 1.
> >>> Why do you propose to explicitly pass a parameter "storeName" in
> >>> StreamsBuilder#addGlobalStore?
> >>> The StoreBuilder should already provide a name for the store, if I
> >>> understand the code correctly.
> >>> I would avoid using the same name for the source node and the state
> >>> store, because it limits the flexibility in naming. Why do you not use
> >>> Named for the name of the source node?
> >>>
> >>> 2.
> >>> Did you consider Matthias' proposal to restrict the type of the store
> >>> builder to `StoreBuilder` (or even
> >>> `StoreBuilder`) for the case where
> >>> the processor is built-in?
> >>>
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 3/13/24 11:05 PM, Walker Carlson wrote:
>  Thanks for the feedback Bruno, Matthias, and Lucas!
> 
>  There is a decent amount but I'm going to try and just hit the major
> >>> points
>  as I would like to keep this change simple.
> 
>  I've made corrections for the mistakes pointed out. Thanks for the
>  suggestions everyone.
> 
>  The main sticking point seems to be with the method of signalling the
>  restore behavior. It seems we can all agree with how the API should
> >> look
>  with the default option we are adding. I think keeping the option to
> >> load
>  directly from the topic into the store is a good idea. It is much more
>  performant and could make a simple metric collector processor much
> >>> simpler.
> 
>  I think something that Matthais said about creating a special class of
>  processors for the global stores helps me think about the issue. I
> tend
> >>> to
>  fall into the category that we should keep global stores open to the
>  possibility of having child nodes in the future. I don't really see
> the
>  downside of having 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-26 Thread Bruno Cadonna

Hi Almog,

Do you mean a API to configure restoration instead of boolean flag 
reprocessOnRestore?


Do you already have an idea?

The proposal in the KIP is focused on the processor that updates the 
global state whereas in the case of GlobalKTable and source KTable the 
issues lies in the deserialization of records from the input topics, but 
only if the deserialization error handler is configured to drop the 
problematic record. Additionally, for source KTable the source topic 
optimization must be turned on to run into the issue. I am wondering how 
a unified API for global stores, GlobalKTable, and source KTable might 
look like.


While it is an interesting question, I am in favor of deferring this to 
a separate KIP.


Best,
Bruno

On 3/26/24 12:49 AM, Almog Gavra wrote:

Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think they
deserve more love :)

Scope creep alert (which I'm generally against and certainly still support
this KIP without but I want to see if there's an elegant way to address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier" which
I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the source-changelog
optimization. Since this API could be used to signal "I want to reprocess
on restore" I'm wondering whether it makes sense to design this API in a
way that could be extended for KTables as well so a fix for KAFKA-8037
would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:


Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't match
the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but then
they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:


Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not use
Named for the name of the source node?

2.
Did you consider Matthias' proposal to restrict the type of the store
builder to `StoreBuilder` (or even
`StoreBuilder`) for the case where
the processor is built-in?


Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major

points

as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should

look

with the default option we are adding. I think keeping the option to

load

directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much

simpler.


I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend

to

fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we

decide

to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag

for

the reprocessOnRestore option. I expanded the description in the docs

so

I

hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker









Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-26 Thread Bruno Cadonna

Hi Walker,

I have follow-up comments.

1.
I think, we should add an overload to the StreamsBuilder class that 
allows to name the processor with Named. That makes that processor 
consistent with all other processors in the DSL regarding naming.


2.
I do not understand what you mean with "maximum flexibility". The 
built-in processor needs to assume a given state store interface. That 
means, users have to provide a state store that offers that interface. 
If they do not they will get a runtime exception. If we require a store 
builder for a given interface, we can catch the mistake at compile time. 
Let me know whether I misunderstood something.


Best,
Bruno


On 3/25/24 7:05 PM, Walker Carlson wrote:

Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't match
the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but then
they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:


Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not use
Named for the name of the source node?

2.
Did you consider Matthias' proposal to restrict the type of the store
builder to `StoreBuilder` (or even
`StoreBuilder`) for the case where
the processor is built-in?


Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major

points

as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much

simpler.


I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend

to

fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag

for

the reprocessOnRestore option. I expanded the description in the docs so

I

hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker







Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-25 Thread Almog Gavra
Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think they
deserve more love :)

Scope creep alert (which I'm generally against and certainly still support
this KIP without but I want to see if there's an elegant way to address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier" which
I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the source-changelog
optimization. Since this API could be used to signal "I want to reprocess
on restore" I'm wondering whether it makes sense to design this API in a
way that could be extended for KTables as well so a fix for KAFKA-8037
would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:

> Hey Bruno,
>
> 1) I'm actually not sure why that is in there. It certainly doesn't match
> the convention. Best to remove it and match the other methods.
>
> 2) Yeah, I thought about it but I'm not convinced it is a necessary
> restriction. It might be useful for the already defined processors but then
> they might as well use the `globalTable` method. I think the add state
> store option should go for maximum flexibility.
>
> Best,
> Walker
>
>
>
> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:
>
> > Hi Walker,
> >
> > A couple of follow-up questions.
> >
> > 1.
> > Why do you propose to explicitly pass a parameter "storeName" in
> > StreamsBuilder#addGlobalStore?
> > The StoreBuilder should already provide a name for the store, if I
> > understand the code correctly.
> > I would avoid using the same name for the source node and the state
> > store, because it limits the flexibility in naming. Why do you not use
> > Named for the name of the source node?
> >
> > 2.
> > Did you consider Matthias' proposal to restrict the type of the store
> > builder to `StoreBuilder` (or even
> > `StoreBuilder`) for the case where
> > the processor is built-in?
> >
> >
> > Best,
> > Bruno
> >
> > On 3/13/24 11:05 PM, Walker Carlson wrote:
> > > Thanks for the feedback Bruno, Matthias, and Lucas!
> > >
> > > There is a decent amount but I'm going to try and just hit the major
> > points
> > > as I would like to keep this change simple.
> > >
> > > I've made corrections for the mistakes pointed out. Thanks for the
> > > suggestions everyone.
> > >
> > > The main sticking point seems to be with the method of signalling the
> > > restore behavior. It seems we can all agree with how the API should
> look
> > > with the default option we are adding. I think keeping the option to
> load
> > > directly from the topic into the store is a good idea. It is much more
> > > performant and could make a simple metric collector processor much
> > simpler.
> > >
> > > I think something that Matthais said about creating a special class of
> > > processors for the global stores helps me think about the issue. I tend
> > to
> > > fall into the category that we should keep global stores open to the
> > > possibility of having child nodes in the future. I don't really see the
> > > downside of having that as an option. It might not be best for a lot of
> > > cases, but something simple could be very useful to put in the PAPI.
> > >
> > > I like the idea of having a `GlobalStoreParameters` but only if we
> decide
> > > to make the processor need to extend an interface like
> > > 'GobalStoreProcessor`. If not that seems excessive.
> > >
> > > As of right now I don't see a better option than having a boolean flag
> > for
> > > the reprocessOnRestore option. I expanded the description in the docs
> so
> > I
> > > hope that helps.
> > >
> > > I am more than willing to take other ideas on it.
> > >
> > > thanks,
> > > Walker
> > >
> >
>


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-25 Thread Walker Carlson
Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't match
the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but then
they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:

> Hi Walker,
>
> A couple of follow-up questions.
>
> 1.
> Why do you propose to explicitly pass a parameter "storeName" in
> StreamsBuilder#addGlobalStore?
> The StoreBuilder should already provide a name for the store, if I
> understand the code correctly.
> I would avoid using the same name for the source node and the state
> store, because it limits the flexibility in naming. Why do you not use
> Named for the name of the source node?
>
> 2.
> Did you consider Matthias' proposal to restrict the type of the store
> builder to `StoreBuilder` (or even
> `StoreBuilder`) for the case where
> the processor is built-in?
>
>
> Best,
> Bruno
>
> On 3/13/24 11:05 PM, Walker Carlson wrote:
> > Thanks for the feedback Bruno, Matthias, and Lucas!
> >
> > There is a decent amount but I'm going to try and just hit the major
> points
> > as I would like to keep this change simple.
> >
> > I've made corrections for the mistakes pointed out. Thanks for the
> > suggestions everyone.
> >
> > The main sticking point seems to be with the method of signalling the
> > restore behavior. It seems we can all agree with how the API should look
> > with the default option we are adding. I think keeping the option to load
> > directly from the topic into the store is a good idea. It is much more
> > performant and could make a simple metric collector processor much
> simpler.
> >
> > I think something that Matthais said about creating a special class of
> > processors for the global stores helps me think about the issue. I tend
> to
> > fall into the category that we should keep global stores open to the
> > possibility of having child nodes in the future. I don't really see the
> > downside of having that as an option. It might not be best for a lot of
> > cases, but something simple could be very useful to put in the PAPI.
> >
> > I like the idea of having a `GlobalStoreParameters` but only if we decide
> > to make the processor need to extend an interface like
> > 'GobalStoreProcessor`. If not that seems excessive.
> >
> > As of right now I don't see a better option than having a boolean flag
> for
> > the reprocessOnRestore option. I expanded the description in the docs so
> I
> > hope that helps.
> >
> > I am more than willing to take other ideas on it.
> >
> > thanks,
> > Walker
> >
>


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-22 Thread Bruno Cadonna

Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in 
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I 
understand the code correctly.
I would avoid using the same name for the source node and the state 
store, because it limits the flexibility in naming. Why do you not use 
Named for the name of the source node?


2.
Did you consider Matthias' proposal to restrict the type of the store 
builder to `StoreBuilder` (or even 
`StoreBuilder`) for the case where 
the processor is built-in?



Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major points
as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much simpler.

I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend to
fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag for
the reprocessOnRestore option. I expanded the description in the docs so I
hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker



Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-13 Thread Walker Carlson
Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major points
as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much simpler.

I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend to
fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag for
the reprocessOnRestore option. I expanded the description in the docs so I
hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-13 Thread Matthias J. Sax
If the custom store is a key-value store, yes, we could do this. But the 
interface does not enforce a key-value store, it's just a most generic 
`StateStore` that we pass in, and thus it could be something totally 
unknown to us, and we cannot apply a cast...


The underlying idea is really about 100% flexibility in the PAPI layer.

That's also the reason why all stores need to provide a callback for the 
restore path. Kafka Streams runtime can only read the record from the 
changelog, but it cannot put it into the store, as the runtime only sees 
the `StateStore` interface -- thus, we invoke a store specific callback 
(`StateRestoreCallback` interface) that needs to actually put the data 
into the store for us. For our built-in store, we of course provide 
these callbacks, but the point is, that the runtime does not know 
anything about the nature of the store but is fully agnostic to it, to 
allow the plugin of any custom store with any custom interface (which 
just needs to implement `StateStore`).



Not sure if I understand what you mean by this transformation step?



-Matthias


On 3/12/24 3:04 AM, Lucas Brutschy wrote:

@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function,
ConsumerRecord>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like  => 

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax  wrote:


@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code
path, we only support record-ts, but there is no need for a custom-ts
because for regular changelog topics KS sets the ts, and thus, the
optimization this KIP proposes required that the global topic follow the
changelog format, ie, the ts must be in the record-ts.

However, for the regular processing path, I am not sure if we can omit
deserializers. The way the PAPI is wired up, seems to require that we
give proper types to _other_ Processor that read from the global state
store. For this reason, the store (which takes `Serdes` with proper
types) is wrapped with a `MeteredStore` (like all others) to do the
Serde work, and this MeteredStore is also exposed to the
global-Processor? Might be good for Walker to dig into this to find out
the details?

If would of course be nice if we could avoid the unnecessary
deserialization on topic read, and re-serialization on global-store put
for this case, but it seems not to be straightforward to do...


(2). Is this about the PAPI/Topology? For this case, we don't have any
config object across the board. We only do this in the DSL. Hence, I
would propose to just follow the existing pattern in this KIP to keep
the API consistent. For the DSL, it could make sense of course. -- Of
course, if we think the PAPI could be improved with config objects, we
could do this in a dedicate KIP.


@Lucas:

The PAPI is unfortunately (by design) much more open and less
restrictive. If a users has a custom state store, we need some
`Processor` code from them, because we cannot provide a built-in
processor for an unknown store. The overload which won't take a
processor would only work for the built-in key-value store, what I
assume would cover most use-cases, however, we should keep the door open
for other use cases. Otherwise, we disallow this optimization for custom
stores. PAPI is really about flexibility, and yes, with great power
comes great responsibility for the users :)

But this actually highlights a different aspect: the overload not
accepting a custom `Processor` but using a built-in processor, should
not accept a generic `StoreBuilder` but should restrict the type to
`StoreBuilder`?


-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:

Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
   1) a copy-restore variant without custom processing, as you propose.
   2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-12 Thread Lucas Brutschy
@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function,
ConsumerRecord>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like  => 

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax  wrote:
>
> @Bruno:
>
> (1), I think you are spot for the ts-extractor: on the restore code
> path, we only support record-ts, but there is no need for a custom-ts
> because for regular changelog topics KS sets the ts, and thus, the
> optimization this KIP proposes required that the global topic follow the
> changelog format, ie, the ts must be in the record-ts.
>
> However, for the regular processing path, I am not sure if we can omit
> deserializers. The way the PAPI is wired up, seems to require that we
> give proper types to _other_ Processor that read from the global state
> store. For this reason, the store (which takes `Serdes` with proper
> types) is wrapped with a `MeteredStore` (like all others) to do the
> Serde work, and this MeteredStore is also exposed to the
> global-Processor? Might be good for Walker to dig into this to find out
> the details?
>
> If would of course be nice if we could avoid the unnecessary
> deserialization on topic read, and re-serialization on global-store put
> for this case, but it seems not to be straightforward to do...
>
>
> (2). Is this about the PAPI/Topology? For this case, we don't have any
> config object across the board. We only do this in the DSL. Hence, I
> would propose to just follow the existing pattern in this KIP to keep
> the API consistent. For the DSL, it could make sense of course. -- Of
> course, if we think the PAPI could be improved with config objects, we
> could do this in a dedicate KIP.
>
>
> @Lucas:
>
> The PAPI is unfortunately (by design) much more open and less
> restrictive. If a users has a custom state store, we need some
> `Processor` code from them, because we cannot provide a built-in
> processor for an unknown store. The overload which won't take a
> processor would only work for the built-in key-value store, what I
> assume would cover most use-cases, however, we should keep the door open
> for other use cases. Otherwise, we disallow this optimization for custom
> stores. PAPI is really about flexibility, and yes, with great power
> comes great responsibility for the users :)
>
> But this actually highlights a different aspect: the overload not
> accepting a custom `Processor` but using a built-in processor, should
> not accept a generic `StoreBuilder` but should restrict the type to
> `StoreBuilder`?
>
>
> -Matthias
>
>
>
> On 3/6/24 1:14 PM, Lucas Brutschy wrote:
> > Hey Walker
> >
> > Thanks for the KIP, and congrats on the KiBiKIP ;)
> >
> > My main point is that I'd vote against introducing
> > `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
> > just incorrect and should be removed or deprecated. If we think we
> > need to keep the old behavior around, renaming the methods, e.g., to
> > `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
> > behavior. But at a first glance, the old behavior just looks like a
> > bug to me and should just be removed.
> >
> > So for this KIP, I'd keep two variants as you propose and drop the
> > boolean parameter, but the two variants will be
> >   1) a copy-restore variant without custom processing, as you propose.
> >   2) a process-restore variant with custom processing (parameters the
> > same as before). This should be combined with a clear warning in the
> > Javadoc of the performance downside of this approach.
> >
> > Presentation:
> > 1) I wonder if you could make another pass on the motivation section.
> > I was lacking some context on this problem, and I think the nature of
> > the restore issue only became clear to me when I read through the
> > comments in the JIRA ticket you linked.
> > 2) If we decide to keep the parameter `reprocessOnRestore`, the
> > Javadoc on it should be extended. This is a somewhat subtle issue, and
> > I don't think `restore by reprocessing` is enough of an explanation.
> >
> > Nits:
> >
> > `{@link ValueTransformer ValueTransformer}` -> `{@link
> > ValueTransformer ValueTransformers}`
> > `user defined` -> `user-defined`
> >
> > Cheers,
> > Lucas
> >
> > On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:
> >>
> >> Hi Walker,
> >>
> >> Thanks for the KIP!
> >>
> >> Great that you are going to fix this long-standing issue!
> >>
> >> 1.
> >> I was wondering if we need the timestamp extractor as well as the key
> >> and value deserializer in Topology#addGlobalStore() that do not take a
> >> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
> >> Since those methods 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-07 Thread Matthias J. Sax

@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code 
path, we only support record-ts, but there is no need for a custom-ts 
because for regular changelog topics KS sets the ts, and thus, the 
optimization this KIP proposes required that the global topic follow the 
changelog format, ie, the ts must be in the record-ts.


However, for the regular processing path, I am not sure if we can omit 
deserializers. The way the PAPI is wired up, seems to require that we 
give proper types to _other_ Processor that read from the global state 
store. For this reason, the store (which takes `Serdes` with proper 
types) is wrapped with a `MeteredStore` (like all others) to do the 
Serde work, and this MeteredStore is also exposed to the 
global-Processor? Might be good for Walker to dig into this to find out 
the details?


If would of course be nice if we could avoid the unnecessary 
deserialization on topic read, and re-serialization on global-store put 
for this case, but it seems not to be straightforward to do...



(2). Is this about the PAPI/Topology? For this case, we don't have any 
config object across the board. We only do this in the DSL. Hence, I 
would propose to just follow the existing pattern in this KIP to keep 
the API consistent. For the DSL, it could make sense of course. -- Of 
course, if we think the PAPI could be improved with config objects, we 
could do this in a dedicate KIP.



@Lucas:

The PAPI is unfortunately (by design) much more open and less 
restrictive. If a users has a custom state store, we need some 
`Processor` code from them, because we cannot provide a built-in 
processor for an unknown store. The overload which won't take a 
processor would only work for the built-in key-value store, what I 
assume would cover most use-cases, however, we should keep the door open 
for other use cases. Otherwise, we disallow this optimization for custom 
stores. PAPI is really about flexibility, and yes, with great power 
comes great responsibility for the users :)


But this actually highlights a different aspect: the overload not 
accepting a custom `Processor` but using a built-in processor, should 
not accept a generic `StoreBuilder` but should restrict the type to 
`StoreBuilder`?



-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:

Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
  1) a copy-restore variant without custom processing, as you propose.
  2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:


Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key
and value deserializer in Topology#addGlobalStore() that do not take a
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any
records, do they still need to deserialize records and extract
timestamps? Name might still be needed, right?

2.
  From an API point of view, it might make sense to put all
processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in
the KIP?


Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on
"restore reprocessing" is certainly a good improvement.

  From an API design POV, I like the idea to not require passing in a

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-06 Thread Lucas Brutschy
Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
 1) a copy-restore variant without custom processing, as you propose.
 2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:
>
> Hi Walker,
>
> Thanks for the KIP!
>
> Great that you are going to fix this long-standing issue!
>
> 1.
> I was wondering if we need the timestamp extractor as well as the key
> and value deserializer in Topology#addGlobalStore() that do not take a
> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
> Since those methods setup a global state store that does not process any
> records, do they still need to deserialize records and extract
> timestamps? Name might still be needed, right?
>
> 2.
>  From an API point of view, it might make sense to put all
> processor-related arguments into a parameter object. Something like:
> GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
> Just an idea, open for discussion.
>
> 3.
> Could you please go over the KIP and correct typos and other mistakes in
> the KIP?
>
>
> Best,
> Bruno
>
>
>
> On 3/2/24 1:43 AM, Matthias J. Sax wrote:
> > Thanks for the KIP Walker.
> >
> > Fixing this issue, and providing users some flexibility to opt-in/out on
> > "restore reprocessing" is certainly a good improvement.
> >
> >  From an API design POV, I like the idea to not require passing in a
> > ProcessorSupplier to begin with. Given the current implementation of the
> > restore process, this might have been the better API from the beginning
> > on... Well, better late than never :)
> >
> > For this new method w/o a supplier, I am wondering if we want to keep
> > `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
> > thing via KIP-813. Just an idea.
> >
> > However, I am not convinced that adding a new boolean parameter is the
> > best way to design the API. Unfortunately, I don't have any elegant
> > proposal myself. Just a somewhat crazy idea to do a larger API change:
> >
> > Making a step back, a global store, is by definition a terminal node --
> > we don't support to add child nodes. Hence, while we expose a full
> > `ProcessorContext` interface, we actually limit what functionality it
> > supports. Thus, I am wondering if we should stop using the generic
> > `Processor` interface to begin with, but design a new one which is
> > tailored to the needs of global stores? -- This would of course be of
> > much larger scope than originally intended by this KIP, but it might be
> > a great opportunity to kill two birds with one stone?
> >
> > The only other question to consider is: do we believe that global stores
> > will never have child nodes, or could we actually allow for child nodes
> > in the future? If yes, it might not be smart to move off using
> > `Processor` interface In general, I could imagine, especially as we
> > now want to support "process on restore" to allow simple stateless
> > operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
> > custom global processors) at some point in the future?
> >
> > Just wanted to put this out to see what people think...
> >
> >
> > -Matthias
> >
> >
> > On 2/29/24 1:26 PM, Walker Carlson wrote:
> >> Hello everybody,
> >>
> >> I wanted to propose a change to our addGlobalStore methods so that the
> >> restore behavior can be controlled on a preprocessor level. This should
> >> help Kafka Stream users to better tune Global stores added with the
> >> processor API to better fit their needs.
> >>
> >> Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ
> >>
> >> Thanks,
> >> Walker
> >>


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-06 Thread Bruno Cadonna

Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key 
and value deserializer in Topology#addGlobalStore() that do not take a 
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any 
records, do they still need to deserialize records and extract 
timestamps? Name might still be needed, right?


2.
From an API point of view, it might make sense to put all 
processor-related arguments into a parameter object. Something like:

GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in 
the KIP?



Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on 
"restore reprocessing" is certainly a good improvement.


 From an API design POV, I like the idea to not require passing in a 
ProcessorSupplier to begin with. Given the current implementation of the 
restore process, this might have been the better API from the beginning 
on... Well, better late than never :)


For this new method w/o a supplier, I am wondering if we want to keep 
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar 
thing via KIP-813. Just an idea.


However, I am not convinced that adding a new boolean parameter is the 
best way to design the API. Unfortunately, I don't have any elegant 
proposal myself. Just a somewhat crazy idea to do a larger API change:


Making a step back, a global store, is by definition a terminal node -- 
we don't support to add child nodes. Hence, while we expose a full 
`ProcessorContext` interface, we actually limit what functionality it 
supports. Thus, I am wondering if we should stop using the generic 
`Processor` interface to begin with, but design a new one which is 
tailored to the needs of global stores? -- This would of course be of 
much larger scope than originally intended by this KIP, but it might be 
a great opportunity to kill two birds with one stone?


The only other question to consider is: do we believe that global stores 
will never have child nodes, or could we actually allow for child nodes 
in the future? If yes, it might not be smart to move off using 
`Processor` interface In general, I could imagine, especially as we 
now want to support "process on restore" to allow simple stateless 
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add 
custom global processors) at some point in the future?


Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:

Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker



Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-01 Thread Matthias J. Sax

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on 
"restore reprocessing" is certainly a good improvement.


From an API design POV, I like the idea to not require passing in a 
ProcessorSupplier to begin with. Given the current implementation of the 
restore process, this might have been the better API from the beginning 
on... Well, better late than never :)


For this new method w/o a supplier, I am wondering if we want to keep 
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar 
thing via KIP-813. Just an idea.


However, I am not convinced that adding a new boolean parameter is the 
best way to design the API. Unfortunately, I don't have any elegant 
proposal myself. Just a somewhat crazy idea to do a larger API change:


Making a step back, a global store, is by definition a terminal node -- 
we don't support to add child nodes. Hence, while we expose a full 
`ProcessorContext` interface, we actually limit what functionality it 
supports. Thus, I am wondering if we should stop using the generic 
`Processor` interface to begin with, but design a new one which is 
tailored to the needs of global stores? -- This would of course be of 
much larger scope than originally intended by this KIP, but it might be 
a great opportunity to kill two birds with one stone?


The only other question to consider is: do we believe that global stores 
will never have child nodes, or could we actually allow for child nodes 
in the future? If yes, it might not be smart to move off using 
`Processor` interface In general, I could imagine, especially as we 
now want to support "process on restore" to allow simple stateless 
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add 
custom global processors) at some point in the future?


Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:

Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker