Hey Bruno, Thanks for the feedback. Sorry for the late reply, I was hoping others might weigh in as well and it got away from me.
a) I like this but I think we should separate this out. This kip has already dragged on more than it should and I think that is a big enough change to get done by itself. b) I'm a bit resistant to adding a new type of processor or store for this change. It feels excessively complicated for what should be a small change. I think that it might be good but I don't want to expand the scope more than absolutely necessary here. best, walker On Wed, Apr 10, 2024 at 4:34 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi Walker, > > Thanks for the updates! > > > (1) While I like naming the methods differently, I have also to say that > I do not like addIsomorphicGlobalStore() because it does not really tell > what the method does. I could also not come up with a better name than > addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on > which I would like to have your opinion. > > (a) Add a new GlobalStoreBuilder in which users can set if the global > state store should reprocess on restore. Additionally, to the option to > enable or disable reprocessing on restore, you could also NOT offer a > way to enable or disable logging in the GlobalStoreBuilder. Currently, > if users enable logging for a store builder that they pass into > addGlobalStore(), Kafka Streams needs to explicitly disable it again, > which is not ideal. > > (b) Add a new GlobalProcessorSupplier in which users can set if the > global state store should reprocess on restore. Another ugliness that > could be fixed with this is passing Void, Void to ProcessorSupplier. The > GlobalProcessorSupplier would just have two type parameters <KIn, VIn>. > The nice aspect of this idea is that the option to enable/disable > reprocessing on restore is only needed when a processor supplier is > passed into the methods. That is not true for idea (a). > > > (2) Yes, that was my intent. > > > Best, > Bruno > > On 4/9/24 9:33 PM, Walker Carlson wrote: > > Hey all, > > > > (1) no I hadn't considered just naming the methods differently. I > actually > > really like this idea and am for it. Except we need 3 different methods > > now. One for no processor, one for a processor that should restore and > one > > that reprocesses. How about `addCustomGlobalStore` and > > `addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no > > processor case? If everyone likes that I can add that to the KIP and > rename > > the methods. > > > > (2) we can have the the built in case use StoreBuilder<? extends > > KeyValueStore> and manually check for the TimestampedKeyValueStore. That > is > > fine with me. > > > > Bruno I hope that was what you were intending. > > > > (3) For the scala api, do we need to make it match the java api or are we > > just making the minimum changes? as if we take point 1 I don't know how > > much we need to change. > > > > Thanks, > > Walker > > > > > > On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax <mj...@apache.org> wrote: > > > >> One more thing: > >> > >> I was just looking into the WIP PR, and it seems we will also need to > >> change `StreamsBuilder.scala`. The KIP needs to cover this changes as > well. > >> > >> > >> -Matthias > >> > >> On 4/1/24 10:33 PM, Bruno Cadonna wrote: > >>> Hi Walker and Matthias, > >>> > >>> (2) > >>> That is exactly my point about having a compile time error versus a > >>> runtime error. The added flexibility as proposed by Matthias sounds > good > >>> to me. > >>> > >>> Regarding the Named parameter, I was not aware that the processor that > >>> writes records to the global state store is named according to the name > >>> passed in by Consumed. I thought Consumed strictly specifies the names > >>> of source processors. So I am fine with not having an overload with a > >>> Named parameter. > >>> > >>> Best, > >>> Bruno > >>> > >>> On 3/31/24 11:30 AM, Matthias J. Sax wrote: > >>>> Two more follow up thoughts: > >>>> > >>>> (1) I am still not a big fan of the boolean parameter we introduce. > >>>> Did you consider to use different method names, like > >>>> `addReadOnlyGlobalStore()` (for the optimized method, that would not > >>>> reprocess data on restore), and maybe add `addModifiableGlobalStore()` > >>>> (not a good name, but we cannot re-use existing `addGlobalStore()` -- > >>>> maybe somebody else has a good idea about a better `addXxxGlobalStore` > >>>> that would describe it well). > >>>> > >>>> (2) I was thinking about Bruno's comment to limit the scope the store > >>>> builder for the optimized case. I think we should actually do > >>>> something about it, because in the end, the runtime (ie, the > >>>> `Processor` we hard wire) would need to pick a store it supports and > >>>> cast to the corresponding store? If the cast fails, we hit a runtime > >>>> exception, but by putting the store we cast to into the signature we > >>>> can actually convert it into a compile time error what seems better. > >>>> -- If we want, we could make it somewhat flexible and support both > >>>> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature > >>>> would be `KeyValueStore` but we explicitly check if the builder gives > >>>> us a `TimestampedKeyValueStore` instance and use it properly. > >>>> > >>>> If putting the signature does not work for some reason, we should at > >>>> least clearly call it out in the JavaDocs what store type is expected. > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> > >>>> On 3/28/24 5:05 PM, Walker Carlson wrote: > >>>>> Hey all, > >>>>> > >>>>> Thanks for the feedback Bruno, Almog and Matthias! > >>>>> > >>>>> Almog: I like the idea, but I agree with Matthais. I actually looked > at > >>>>> that ticket a bit when doing this and found that while similar they > are > >>>>> actually pretty unrelated codewise. I would love to see it get taken > >>>>> care > >>>>> of. > >>>>> > >>>>> Bruno and Matthias: The Named parameter doesn't really make sense to > >>>>> me to > >>>>> put it here. The store in the Store builder is already named through > >>>>> what > >>>>> Matthais described and the processor doesn't actually have a name. > That > >>>>> would be the processor node that gets named via the Named parameter > >> (in > >>>>> the DSL) and the internal streams builder uses the consumed object to > >>>>> make > >>>>> a source name. I think we should just keep the Consumed object and > used > >>>>> that for the processor node name. > >>>>> > >>>>> As for the limitation of the store builder interface I don't think it > >> is > >>>>> necessary. It could be something we add later if we really want to. > >>>>> > >>>>> Anyways I think we are getting close enough to consensus that I'm > >>>>> going to > >>>>> open a vote and hopefully we can get it voted on soon! > >>>>> > >>>>> best, > >>>>> Walker > >>>>> > >>>>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax <mj...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Hey, > >>>>>> > >>>>>> looking into the API, I am wondering why we would need to add an > >>>>>> overload talking a `Named` parameter? > >>>>>> > >>>>>> StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already > takes > >> a > >>>>>> `Consumed` parameter that allows to set a name. > >>>>>> > >>>>>> > >>>>>>> 2. > >>>>>>> I do not understand what you mean with "maximum flexibility". The > >>>>>> built-in processor needs to assume a given state store interface. > That > >>>>>> means, users have to provide a state store that offers that > >>>>>> interface. If > >>>>>> they do not they will get a runtime exception. If we require a store > >>>>>> builder for a given interface, we can catch the mistake at compile > >>>>>> time. > >>>>>> Let me know whether I misunderstood something. > >>>>>> > >>>>>> Yes, we could catch it at runtime. But I guess what I was trying to > >> say > >>>>>> is different: I was trying to say, we should not limit the API to > >>>>>> always > >>>>>> require a specific store, such that global stores can only be of a > >>>>>> certain type. Global Stores should be allowed to be of any type. > >> Hence, > >>>>>> if we add a built-in processor, it can only be one option, and we > >>>>>> always > >>>>>> need to support custom processor, and might also want to try to > allow > >>>>>> the restore optimization for custom processor (and thus other store > >>>>>> types), not just for our built-in processor (and our built-in > stores). > >>>>>> Coupling the optimization to built-in stores would prevent us to > apply > >>>>>> the optimization to custom stores. > >>>>>> > >>>>>> > >>>>>> > >>>>>> @Almog: interesting idea. I tend to think that both issues are > >>>>>> orthogonal. If users pick to apply the optimization "added" by this > >>>>>> KIP, > >>>>>> the bug you mentioned would still apply to global stores, and thus > >> this > >>>>>> KIP is not addressing the issue you mentioned. > >>>>>> > >>>>>> Personally, I also think that we don't need a KIP to fix the ticket > >> you > >>>>>> mentioned? In the end, we need to skip records during restore, and > it > >>>>>> seems it does not make sense to make this configurable? > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 3/26/24 4:24 PM, Almog Gavra wrote: > >>>>>>> Thanks for the thoughts Bruno! > >>>>>>> > >>>>>>>> Do you mean a API to configure restoration instead of boolean flag > >>>>>>> reprocessOnRestore? > >>>>>>> > >>>>>>> Yes, this is exactly the type of thing I was musing (but I don't > >>>>>>> have any > >>>>>>> concrete suggestions). It feels like that would give the > >>>>>>> flexibility to > >>>>>> do > >>>>>>> things like the motivation section of the KIP (allow bulk loading > of > >>>>>>> records without reprocessing) while also solving other limitations. > >>>>>>> > >>>>>>> I'm supportive of the KIP as-is but was hoping somebody with more > >>>>>>> experience would have a sudden inspiration for how to solve both > >>>>>>> issues > >>>>>>> with one API! Anyway, I'll slide back into the lurking shadows for > >> now > >>>>>> and > >>>>>>> let the discussion continue :) > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Almog > >>>>>>> > >>>>>>> On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna <cado...@apache.org> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Almog, > >>>>>>>> > >>>>>>>> Do you mean a API to configure restoration instead of boolean flag > >>>>>>>> reprocessOnRestore? > >>>>>>>> > >>>>>>>> Do you already have an idea? > >>>>>>>> > >>>>>>>> The proposal in the KIP is focused on the processor that updates > the > >>>>>>>> global state whereas in the case of GlobalKTable and source KTable > >>>>>>>> the > >>>>>>>> issues lies in the deserialization of records from the input > >>>>>>>> topics, but > >>>>>>>> only if the deserialization error handler is configured to drop > the > >>>>>>>> problematic record. Additionally, for source KTable the source > topic > >>>>>>>> optimization must be turned on to run into the issue. I am > >>>>>>>> wondering how > >>>>>>>> a unified API for global stores, GlobalKTable, and source KTable > >>>>>>>> might > >>>>>>>> look like. > >>>>>>>> > >>>>>>>> While it is an interesting question, I am in favor of deferring > >>>>>>>> this to > >>>>>>>> a separate KIP. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Bruno > >>>>>>>> > >>>>>>>> On 3/26/24 12:49 AM, Almog Gavra wrote: > >>>>>>>>> Hello Folk! > >>>>>>>>> > >>>>>>>>> Glad to see improvements to the GlobalKTables in discussion! I > >> think > >>>>>> they > >>>>>>>>> deserve more love :) > >>>>>>>>> > >>>>>>>>> Scope creep alert (which I'm generally against and certainly > still > >>>>>>>> support > >>>>>>>>> this KIP without but I want to see if there's an elegant way to > >>>>>>>>> address > >>>>>>>>> both problems). The KIP mentions that "Now the restore is done by > >>>>>>>>> reprocessing using an instance from the customer processor > >> supplier" > >>>>>>>> which > >>>>>>>>> I suppose fixed a long-standing bug ( > >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-8037) but only for > >>>>>>>>> GlobalKTables and not for normal KTables that use the > >>>>>>>>> source-changelog > >>>>>>>>> optimization. Since this API could be used to signal "I want to > >>>>>> reprocess > >>>>>>>>> on restore" I'm wondering whether it makes sense to design this > >>>>>>>>> API in > >>>>>> a > >>>>>>>>> way that could be extended for KTables as well so a fix for > >>>>>>>>> KAFKA-8037 > >>>>>>>>> would be possible with the same mechanism. Thoughts? > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Almog > >>>>>>>>> > >>>>>>>>> On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson > >>>>>>>>> <wcarl...@confluent.io.invalid> wrote: > >>>>>>>>> > >>>>>>>>>> Hey Bruno, > >>>>>>>>>> > >>>>>>>>>> 1) I'm actually not sure why that is in there. It certainly > >> doesn't > >>>>>>>> match > >>>>>>>>>> the convention. Best to remove it and match the other methods. > >>>>>>>>>> > >>>>>>>>>> 2) Yeah, I thought about it but I'm not convinced it is a > >> necessary > >>>>>>>>>> restriction. It might be useful for the already defined > >>>>>>>>>> processors but > >>>>>>>> then > >>>>>>>>>> they might as well use the `globalTable` method. I think the add > >>>>>>>>>> state > >>>>>>>>>> store option should go for maximum flexibility. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Walker > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna < > >> cado...@apache.org> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Walker, > >>>>>>>>>>> > >>>>>>>>>>> A couple of follow-up questions. > >>>>>>>>>>> > >>>>>>>>>>> 1. > >>>>>>>>>>> Why do you propose to explicitly pass a parameter "storeName" > in > >>>>>>>>>>> StreamsBuilder#addGlobalStore? > >>>>>>>>>>> The StoreBuilder should already provide a name for the store, > if > >> I > >>>>>>>>>>> understand the code correctly. > >>>>>>>>>>> I would avoid using the same name for the source node and the > >>>>>>>>>>> state > >>>>>>>>>>> store, because it limits the flexibility in naming. Why do you > >> not > >>>>>> use > >>>>>>>>>>> Named for the name of the source node? > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> Did you consider Matthias' proposal to restrict the type of the > >>>>>>>>>>> store > >>>>>>>>>>> builder to `StoreBuilder<TimestampedKeyValueStore>` (or even > >>>>>>>>>>> `StoreBuilder<? extends TimestampedKeyValueStore>`) for the > case > >>>>>> where > >>>>>>>>>>> the processor is built-in? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Bruno > >>>>>>>>>>> > >>>>>>>>>>> On 3/13/24 11:05 PM, Walker Carlson wrote: > >>>>>>>>>>>> Thanks for the feedback Bruno, Matthias, and Lucas! > >>>>>>>>>>>> > >>>>>>>>>>>> There is a decent amount but I'm going to try and just hit the > >>>>>>>>>>>> major > >>>>>>>>>>> points > >>>>>>>>>>>> as I would like to keep this change simple. > >>>>>>>>>>>> > >>>>>>>>>>>> I've made corrections for the mistakes pointed out. Thanks for > >>>>>>>>>>>> the > >>>>>>>>>>>> suggestions everyone. > >>>>>>>>>>>> > >>>>>>>>>>>> The main sticking point seems to be with the method of > >> signalling > >>>>>> the > >>>>>>>>>>>> restore behavior. It seems we can all agree with how the API > >>>>>>>>>>>> should > >>>>>>>>>> look > >>>>>>>>>>>> with the default option we are adding. I think keeping the > >>>>>>>>>>>> option to > >>>>>>>>>> load > >>>>>>>>>>>> directly from the topic into the store is a good idea. It is > >> much > >>>>>> more > >>>>>>>>>>>> performant and could make a simple metric collector processor > >>>>>>>>>>>> much > >>>>>>>>>>> simpler. > >>>>>>>>>>>> > >>>>>>>>>>>> I think something that Matthais said about creating a special > >>>>>>>>>>>> class > >>>>>> of > >>>>>>>>>>>> processors for the global stores helps me think about the > >>>>>>>>>>>> issue. I > >>>>>>>> tend > >>>>>>>>>>> to > >>>>>>>>>>>> fall into the category that we should keep global stores open > >>>>>>>>>>>> to the > >>>>>>>>>>>> possibility of having child nodes in the future. I don't > >>>>>>>>>>>> really see > >>>>>>>> the > >>>>>>>>>>>> downside of having that as an option. It might not be best for > >>>>>>>>>>>> a lot > >>>>>>>> of > >>>>>>>>>>>> cases, but something simple could be very useful to put in the > >>>>>>>>>>>> PAPI. > >>>>>>>>>>>> > >>>>>>>>>>>> I like the idea of having a `GlobalStoreParameters` but only > >>>>>>>>>>>> if we > >>>>>>>>>> decide > >>>>>>>>>>>> to make the processor need to extend an interface like > >>>>>>>>>>>> 'GobalStoreProcessor`. If not that seems excessive. > >>>>>>>>>>>> > >>>>>>>>>>>> As of right now I don't see a better option than having a > >> boolean > >>>>>> flag > >>>>>>>>>>> for > >>>>>>>>>>>> the reprocessOnRestore option. I expanded the description in > the > >>>>>> docs > >>>>>>>>>> so > >>>>>>>>>>> I > >>>>>>>>>>>> hope that helps. > >>>>>>>>>>>> > >>>>>>>>>>>> I am more than willing to take other ideas on it. > >>>>>>>>>>>> > >>>>>>>>>>>> thanks, > >>>>>>>>>>>> Walker > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > > >