Hey Bruno,

Thanks for the feedback. Sorry for the late reply, I was hoping others
might weigh in as well and it got away from me.

a) I like this but I think we should separate this out. This kip has
already dragged on more than it should and I think that is a big enough
change to get done by itself.

b) I'm a bit resistant to adding a new type of processor or store for this
change. It feels excessively complicated for what should be a small change.
I think that it might be good but I don't want to expand the scope more
than absolutely necessary here.

best,
walker

On Wed, Apr 10, 2024 at 4:34 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Walker,
>
> Thanks for the updates!
>
>
> (1) While I like naming the methods differently, I have also to say that
> I do not like addIsomorphicGlobalStore() because it does not really tell
> what the method does. I could also not come up with a better name than
> addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on
> which I would like to have your opinion.
>
> (a) Add a new GlobalStoreBuilder in which users can set if the global
> state store should reprocess on restore. Additionally, to the option to
> enable or disable reprocessing on restore, you could also NOT offer a
> way to enable or disable logging in the GlobalStoreBuilder. Currently,
> if users enable logging for a store builder that they pass into
> addGlobalStore(), Kafka Streams needs to explicitly disable it again,
> which is not ideal.
>
> (b) Add a new GlobalProcessorSupplier in which users can set if the
> global state store should reprocess on restore. Another ugliness that
> could be fixed with this is passing Void, Void to ProcessorSupplier. The
> GlobalProcessorSupplier would just have two type parameters <KIn, VIn>.
> The nice aspect of this idea is that the option to enable/disable
> reprocessing on restore is only needed when a processor supplier is
> passed into the methods. That is not true for idea (a).
>
>
> (2) Yes, that was my intent.
>
>
> Best,
> Bruno
>
> On 4/9/24 9:33 PM, Walker Carlson wrote:
> > Hey all,
> >
> > (1) no I hadn't considered just naming the methods differently. I
> actually
> > really like this idea and am for it. Except we need 3 different methods
> > now. One for no processor, one for a processor that should restore and
> one
> > that reprocesses. How about `addCustomGlobalStore` and
> > `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
> > processor case? If everyone likes that I can add that to the KIP and
> rename
> > the methods.
> >
> > (2) we can have the the built in case use StoreBuilder<? extends
> > KeyValueStore> and manually check for the TimestampedKeyValueStore. That
> is
> > fine with me.
> >
> > Bruno I hope that was what you were intending.
> >
> > (3) For the scala api, do we need to make it match the java api or are we
> > just making the minimum changes? as if we take point 1 I don't know how
> > much we need to change.
> >
> > Thanks,
> > Walker
> >
> >
> > On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> One more thing:
> >>
> >> I was just looking into the WIP PR, and it seems we will also need to
> >> change `StreamsBuilder.scala`. The KIP needs to cover this changes as
> well.
> >>
> >>
> >> -Matthias
> >>
> >> On 4/1/24 10:33 PM, Bruno Cadonna wrote:
> >>> Hi Walker and Matthias,
> >>>
> >>> (2)
> >>> That is exactly my point about having a compile time error versus a
> >>> runtime error. The added flexibility as proposed by Matthias sounds
> good
> >>> to me.
> >>>
> >>> Regarding the Named parameter, I was not aware that the processor that
> >>> writes records to the global state store is named according to the name
> >>> passed in by Consumed. I thought Consumed strictly specifies the names
> >>> of source processors. So I am fine with not having an overload with a
> >>> Named parameter.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 3/31/24 11:30 AM, Matthias J. Sax wrote:
> >>>> Two more follow up thoughts:
> >>>>
> >>>> (1) I am still not a big fan of the boolean parameter we introduce.
> >>>> Did you consider to use different method names, like
> >>>> `addReadOnlyGlobalStore()` (for the optimized method, that would not
> >>>> reprocess data on restore), and maybe add `addModifiableGlobalStore()`
> >>>> (not a good name, but we cannot re-use existing `addGlobalStore()` --
> >>>> maybe somebody else has a good idea about a better `addXxxGlobalStore`
> >>>> that would describe it well).
> >>>>
> >>>> (2) I was thinking about Bruno's comment to limit the scope the store
> >>>> builder for the optimized case. I think we should actually do
> >>>> something about it, because in the end, the runtime (ie, the
> >>>> `Processor` we hard wire) would need to pick a store it supports and
> >>>> cast to the corresponding store? If the cast fails, we hit a runtime
> >>>> exception, but by putting the store we cast to into the signature we
> >>>> can actually convert it into a compile time error what seems better.
> >>>> -- If we want, we could make it somewhat flexible and support both
> >>>> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
> >>>> would be `KeyValueStore` but we explicitly check if the builder gives
> >>>> us a `TimestampedKeyValueStore` instance and use it properly.
> >>>>
> >>>> If putting the signature does not work for some reason, we should at
> >>>> least clearly call it out in the JavaDocs what store type is expected.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 3/28/24 5:05 PM, Walker Carlson wrote:
> >>>>> Hey all,
> >>>>>
> >>>>> Thanks for the feedback Bruno, Almog and Matthias!
> >>>>>
> >>>>> Almog: I like the idea, but I agree with Matthais. I actually looked
> at
> >>>>> that ticket a bit when doing this and found that while similar they
> are
> >>>>> actually pretty unrelated codewise. I would love to see it get taken
> >>>>> care
> >>>>> of.
> >>>>>
> >>>>> Bruno and Matthias: The Named parameter doesn't really make sense to
> >>>>> me to
> >>>>> put it here. The store in the Store builder is already named through
> >>>>> what
> >>>>> Matthais described and the processor doesn't actually have a name.
> That
> >>>>> would be the processor node that gets named via the Named parameter
> >> (in
> >>>>> the DSL) and the internal streams builder uses the consumed object to
> >>>>> make
> >>>>> a source name. I think we should just keep the Consumed object and
> used
> >>>>> that for the processor node name.
> >>>>>
> >>>>> As for the limitation of the store builder interface I don't think it
> >> is
> >>>>> necessary. It could be something we add later if we really want to.
> >>>>>
> >>>>> Anyways I think we are getting close enough to consensus that I'm
> >>>>> going to
> >>>>> open a vote and hopefully we can get it voted on soon!
> >>>>>
> >>>>> best,
> >>>>> Walker
> >>>>>
> >>>>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax <mj...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Hey,
> >>>>>>
> >>>>>> looking into the API, I am wondering why we would need to add an
> >>>>>> overload talking a `Named` parameter?
> >>>>>>
> >>>>>> StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already
> takes
> >> a
> >>>>>> `Consumed` parameter that allows to set a name.
> >>>>>>
> >>>>>>
> >>>>>>> 2.
> >>>>>>> I do not understand what you mean with "maximum flexibility". The
> >>>>>> built-in processor needs to assume a given state store interface.
> That
> >>>>>> means, users have to provide a state store that offers that
> >>>>>> interface. If
> >>>>>> they do not they will get a runtime exception. If we require a store
> >>>>>> builder for a given interface, we can catch the mistake at compile
> >>>>>> time.
> >>>>>> Let me know whether I misunderstood something.
> >>>>>>
> >>>>>> Yes, we could catch it at runtime. But I guess what I was trying to
> >> say
> >>>>>> is different: I was trying to say, we should not limit the API to
> >>>>>> always
> >>>>>> require a specific store, such that global stores can only be of a
> >>>>>> certain type. Global Stores should be allowed to be of any type.
> >> Hence,
> >>>>>> if we add a built-in processor, it can only be one option, and we
> >>>>>> always
> >>>>>> need to support custom processor, and might also want to try to
> allow
> >>>>>> the restore optimization for custom processor (and thus other store
> >>>>>> types), not just for our built-in processor (and our built-in
> stores).
> >>>>>> Coupling the optimization to built-in stores would prevent us to
> apply
> >>>>>> the optimization to custom stores.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> @Almog: interesting idea. I tend to think that both issues are
> >>>>>> orthogonal. If users pick to apply the optimization "added" by this
> >>>>>> KIP,
> >>>>>> the bug you mentioned would still apply to global stores, and thus
> >> this
> >>>>>> KIP is not addressing the issue you mentioned.
> >>>>>>
> >>>>>> Personally, I also think that we don't need a KIP to fix the ticket
> >> you
> >>>>>> mentioned? In the end, we need to skip records during restore, and
> it
> >>>>>> seems it does not make sense to make this configurable?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 3/26/24 4:24 PM, Almog Gavra wrote:
> >>>>>>> Thanks for the thoughts Bruno!
> >>>>>>>
> >>>>>>>> Do you mean a API to configure restoration instead of boolean flag
> >>>>>>> reprocessOnRestore?
> >>>>>>>
> >>>>>>> Yes, this is exactly the type of thing I was musing (but I don't
> >>>>>>> have any
> >>>>>>> concrete suggestions). It feels like that would give the
> >>>>>>> flexibility to
> >>>>>> do
> >>>>>>> things like the motivation section of the KIP (allow bulk loading
> of
> >>>>>>> records without reprocessing) while also solving other limitations.
> >>>>>>>
> >>>>>>> I'm supportive of the KIP as-is but was hoping somebody with more
> >>>>>>> experience would have a sudden inspiration for how to solve both
> >>>>>>> issues
> >>>>>>> with one API! Anyway, I'll slide back into the lurking shadows for
> >> now
> >>>>>> and
> >>>>>>> let the discussion continue :)
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Almog
> >>>>>>>
> >>>>>>> On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna <cado...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Almog,
> >>>>>>>>
> >>>>>>>> Do you mean a API to configure restoration instead of boolean flag
> >>>>>>>> reprocessOnRestore?
> >>>>>>>>
> >>>>>>>> Do you already have an idea?
> >>>>>>>>
> >>>>>>>> The proposal in the KIP is focused on the processor that updates
> the
> >>>>>>>> global state whereas in the case of GlobalKTable and source KTable
> >>>>>>>> the
> >>>>>>>> issues lies in the deserialization of records from the input
> >>>>>>>> topics, but
> >>>>>>>> only if the deserialization error handler is configured to drop
> the
> >>>>>>>> problematic record. Additionally, for source KTable the source
> topic
> >>>>>>>> optimization must be turned on to run into the issue. I am
> >>>>>>>> wondering how
> >>>>>>>> a unified API for global stores, GlobalKTable, and source KTable
> >>>>>>>> might
> >>>>>>>> look like.
> >>>>>>>>
> >>>>>>>> While it is an interesting question, I am in favor of deferring
> >>>>>>>> this to
> >>>>>>>> a separate KIP.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 3/26/24 12:49 AM, Almog Gavra wrote:
> >>>>>>>>> Hello Folk!
> >>>>>>>>>
> >>>>>>>>> Glad to see improvements to the GlobalKTables in discussion! I
> >> think
> >>>>>> they
> >>>>>>>>> deserve more love :)
> >>>>>>>>>
> >>>>>>>>> Scope creep alert (which I'm generally against and certainly
> still
> >>>>>>>> support
> >>>>>>>>> this KIP without but I want to see if there's an elegant way to
> >>>>>>>>> address
> >>>>>>>>> both problems). The KIP mentions that "Now the restore is done by
> >>>>>>>>> reprocessing using an instance from the customer processor
> >> supplier"
> >>>>>>>> which
> >>>>>>>>> I suppose fixed a long-standing bug (
> >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-8037) but only for
> >>>>>>>>> GlobalKTables and not for normal KTables that use the
> >>>>>>>>> source-changelog
> >>>>>>>>> optimization. Since this API could be used to signal "I want to
> >>>>>> reprocess
> >>>>>>>>> on restore" I'm wondering whether it makes sense to design this
> >>>>>>>>> API in
> >>>>>> a
> >>>>>>>>> way that could be extended for KTables as well so a fix for
> >>>>>>>>> KAFKA-8037
> >>>>>>>>> would be possible with the same mechanism. Thoughts?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Almog
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
> >>>>>>>>> <wcarl...@confluent.io.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hey Bruno,
> >>>>>>>>>>
> >>>>>>>>>> 1) I'm actually not sure why that is in there. It certainly
> >> doesn't
> >>>>>>>> match
> >>>>>>>>>> the convention. Best to remove it and match the other methods.
> >>>>>>>>>>
> >>>>>>>>>> 2) Yeah, I thought about it but I'm not convinced it is a
> >> necessary
> >>>>>>>>>> restriction. It might be useful for the already defined
> >>>>>>>>>> processors but
> >>>>>>>> then
> >>>>>>>>>> they might as well use the `globalTable` method. I think the add
> >>>>>>>>>> state
> >>>>>>>>>> store option should go for maximum flexibility.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Walker
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna <
> >> cado...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Walker,
> >>>>>>>>>>>
> >>>>>>>>>>> A couple of follow-up questions.
> >>>>>>>>>>>
> >>>>>>>>>>> 1.
> >>>>>>>>>>> Why do you propose to explicitly pass a parameter "storeName"
> in
> >>>>>>>>>>> StreamsBuilder#addGlobalStore?
> >>>>>>>>>>> The StoreBuilder should already provide a name for the store,
> if
> >> I
> >>>>>>>>>>> understand the code correctly.
> >>>>>>>>>>> I would avoid using the same name for the source node and the
> >>>>>>>>>>> state
> >>>>>>>>>>> store, because it limits the flexibility in naming. Why do you
> >> not
> >>>>>> use
> >>>>>>>>>>> Named for the name of the source node?
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> Did you consider Matthias' proposal to restrict the type of the
> >>>>>>>>>>> store
> >>>>>>>>>>> builder to `StoreBuilder<TimestampedKeyValueStore>` (or even
> >>>>>>>>>>> `StoreBuilder<? extends TimestampedKeyValueStore>`) for the
> case
> >>>>>> where
> >>>>>>>>>>> the processor is built-in?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/13/24 11:05 PM, Walker Carlson wrote:
> >>>>>>>>>>>> Thanks for the feedback Bruno, Matthias, and Lucas!
> >>>>>>>>>>>>
> >>>>>>>>>>>> There is a decent amount but I'm going to try and just hit the
> >>>>>>>>>>>> major
> >>>>>>>>>>> points
> >>>>>>>>>>>> as I would like to keep this change simple.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I've made corrections for the mistakes pointed out. Thanks for
> >>>>>>>>>>>> the
> >>>>>>>>>>>> suggestions everyone.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The main sticking point seems to be with the method of
> >> signalling
> >>>>>> the
> >>>>>>>>>>>> restore behavior. It seems we can all agree with how the API
> >>>>>>>>>>>> should
> >>>>>>>>>> look
> >>>>>>>>>>>> with the default option we are adding. I think keeping the
> >>>>>>>>>>>> option to
> >>>>>>>>>> load
> >>>>>>>>>>>> directly from the topic into the store is a good idea. It is
> >> much
> >>>>>> more
> >>>>>>>>>>>> performant and could make a simple metric collector processor
> >>>>>>>>>>>> much
> >>>>>>>>>>> simpler.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think something that Matthais said about creating a special
> >>>>>>>>>>>> class
> >>>>>> of
> >>>>>>>>>>>> processors for the global stores helps me think about the
> >>>>>>>>>>>> issue. I
> >>>>>>>> tend
> >>>>>>>>>>> to
> >>>>>>>>>>>> fall into the category that we should keep global stores open
> >>>>>>>>>>>> to the
> >>>>>>>>>>>> possibility of having child nodes in the future. I don't
> >>>>>>>>>>>> really see
> >>>>>>>> the
> >>>>>>>>>>>> downside of having that as an option. It might not be best for
> >>>>>>>>>>>> a lot
> >>>>>>>> of
> >>>>>>>>>>>> cases, but something simple could be very useful to put in the
> >>>>>>>>>>>> PAPI.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I like the idea of having a `GlobalStoreParameters` but only
> >>>>>>>>>>>> if we
> >>>>>>>>>> decide
> >>>>>>>>>>>> to make the processor need to extend an interface like
> >>>>>>>>>>>> 'GobalStoreProcessor`. If not that seems excessive.
> >>>>>>>>>>>>
> >>>>>>>>>>>> As of right now I don't see a better option than having a
> >> boolean
> >>>>>> flag
> >>>>>>>>>>> for
> >>>>>>>>>>>> the reprocessOnRestore option. I expanded the description in
> the
> >>>>>> docs
> >>>>>>>>>> so
> >>>>>>>>>>> I
> >>>>>>>>>>>> hope that helps.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I am more than willing to take other ideas on it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks,
> >>>>>>>>>>>> Walker
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> >
>

Reply via email to