Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder<? extends
KeyValueStore> and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax <mj...@apache.org> wrote:

> One more thing:
>
> I was just looking into the WIP PR, and it seems we will also need to
> change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.
>
>
> -Matthias
>
> On 4/1/24 10:33 PM, Bruno Cadonna wrote:
> > Hi Walker and Matthias,
> >
> > (2)
> > That is exactly my point about having a compile time error versus a
> > runtime error. The added flexibility as proposed by Matthias sounds good
> > to me.
> >
> > Regarding the Named parameter, I was not aware that the processor that
> > writes records to the global state store is named according to the name
> > passed in by Consumed. I thought Consumed strictly specifies the names
> > of source processors. So I am fine with not having an overload with a
> > Named parameter.
> >
> > Best,
> > Bruno
> >
> > On 3/31/24 11:30 AM, Matthias J. Sax wrote:
> >> Two more follow up thoughts:
> >>
> >> (1) I am still not a big fan of the boolean parameter we introduce.
> >> Did you consider to use different method names, like
> >> `addReadOnlyGlobalStore()` (for the optimized method, that would not
> >> reprocess data on restore), and maybe add `addModifiableGlobalStore()`
> >> (not a good name, but we cannot re-use existing `addGlobalStore()` --
> >> maybe somebody else has a good idea about a better `addXxxGlobalStore`
> >> that would describe it well).
> >>
> >> (2) I was thinking about Bruno's comment to limit the scope the store
> >> builder for the optimized case. I think we should actually do
> >> something about it, because in the end, the runtime (ie, the
> >> `Processor` we hard wire) would need to pick a store it supports and
> >> cast to the corresponding store? If the cast fails, we hit a runtime
> >> exception, but by putting the store we cast to into the signature we
> >> can actually convert it into a compile time error what seems better.
> >> -- If we want, we could make it somewhat flexible and support both
> >> `KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
> >> would be `KeyValueStore` but we explicitly check if the builder gives
> >> us a `TimestampedKeyValueStore` instance and use it properly.
> >>
> >> If putting the signature does not work for some reason, we should at
> >> least clearly call it out in the JavaDocs what store type is expected.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/28/24 5:05 PM, Walker Carlson wrote:
> >>> Hey all,
> >>>
> >>> Thanks for the feedback Bruno, Almog and Matthias!
> >>>
> >>> Almog: I like the idea, but I agree with Matthais. I actually looked at
> >>> that ticket a bit when doing this and found that while similar they are
> >>> actually pretty unrelated codewise. I would love to see it get taken
> >>> care
> >>> of.
> >>>
> >>> Bruno and Matthias: The Named parameter doesn't really make sense to
> >>> me to
> >>> put it here. The store in the Store builder is already named through
> >>> what
> >>> Matthais described and the processor doesn't actually have a name. That
> >>> would be the processor node that gets named via the Named parameter
> (in
> >>> the DSL) and the internal streams builder uses the consumed object to
> >>> make
> >>> a source name. I think we should just keep the Consumed object and used
> >>> that for the processor node name.
> >>>
> >>> As for the limitation of the store builder interface I don't think it
> is
> >>> necessary. It could be something we add later if we really want to.
> >>>
> >>> Anyways I think we are getting close enough to consensus that I'm
> >>> going to
> >>> open a vote and hopefully we can get it voted on soon!
> >>>
> >>> best,
> >>> Walker
> >>>
> >>> On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>
> >>>> Hey,
> >>>>
> >>>> looking into the API, I am wondering why we would need to add an
> >>>> overload talking a `Named` parameter?
> >>>>
> >>>> StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes
> a
> >>>> `Consumed` parameter that allows to set a name.
> >>>>
> >>>>
> >>>>> 2.
> >>>>> I do not understand what you mean with "maximum flexibility". The
> >>>> built-in processor needs to assume a given state store interface. That
> >>>> means, users have to provide a state store that offers that
> >>>> interface. If
> >>>> they do not they will get a runtime exception. If we require a store
> >>>> builder for a given interface, we can catch the mistake at compile
> >>>> time.
> >>>> Let me know whether I misunderstood something.
> >>>>
> >>>> Yes, we could catch it at runtime. But I guess what I was trying to
> say
> >>>> is different: I was trying to say, we should not limit the API to
> >>>> always
> >>>> require a specific store, such that global stores can only be of a
> >>>> certain type. Global Stores should be allowed to be of any type.
> Hence,
> >>>> if we add a built-in processor, it can only be one option, and we
> >>>> always
> >>>> need to support custom processor, and might also want to try to allow
> >>>> the restore optimization for custom processor (and thus other store
> >>>> types), not just for our built-in processor (and our built-in stores).
> >>>> Coupling the optimization to built-in stores would prevent us to apply
> >>>> the optimization to custom stores.
> >>>>
> >>>>
> >>>>
> >>>> @Almog: interesting idea. I tend to think that both issues are
> >>>> orthogonal. If users pick to apply the optimization "added" by this
> >>>> KIP,
> >>>> the bug you mentioned would still apply to global stores, and thus
> this
> >>>> KIP is not addressing the issue you mentioned.
> >>>>
> >>>> Personally, I also think that we don't need a KIP to fix the ticket
> you
> >>>> mentioned? In the end, we need to skip records during restore, and it
> >>>> seems it does not make sense to make this configurable?
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 3/26/24 4:24 PM, Almog Gavra wrote:
> >>>>> Thanks for the thoughts Bruno!
> >>>>>
> >>>>>> Do you mean a API to configure restoration instead of boolean flag
> >>>>> reprocessOnRestore?
> >>>>>
> >>>>> Yes, this is exactly the type of thing I was musing (but I don't
> >>>>> have any
> >>>>> concrete suggestions). It feels like that would give the
> >>>>> flexibility to
> >>>> do
> >>>>> things like the motivation section of the KIP (allow bulk loading of
> >>>>> records without reprocessing) while also solving other limitations.
> >>>>>
> >>>>> I'm supportive of the KIP as-is but was hoping somebody with more
> >>>>> experience would have a sudden inspiration for how to solve both
> >>>>> issues
> >>>>> with one API! Anyway, I'll slide back into the lurking shadows for
> now
> >>>> and
> >>>>> let the discussion continue :)
> >>>>>
> >>>>> Cheers,
> >>>>> Almog
> >>>>>
> >>>>> On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Almog,
> >>>>>>
> >>>>>> Do you mean a API to configure restoration instead of boolean flag
> >>>>>> reprocessOnRestore?
> >>>>>>
> >>>>>> Do you already have an idea?
> >>>>>>
> >>>>>> The proposal in the KIP is focused on the processor that updates the
> >>>>>> global state whereas in the case of GlobalKTable and source KTable
> >>>>>> the
> >>>>>> issues lies in the deserialization of records from the input
> >>>>>> topics, but
> >>>>>> only if the deserialization error handler is configured to drop the
> >>>>>> problematic record. Additionally, for source KTable the source topic
> >>>>>> optimization must be turned on to run into the issue. I am
> >>>>>> wondering how
> >>>>>> a unified API for global stores, GlobalKTable, and source KTable
> >>>>>> might
> >>>>>> look like.
> >>>>>>
> >>>>>> While it is an interesting question, I am in favor of deferring
> >>>>>> this to
> >>>>>> a separate KIP.
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 3/26/24 12:49 AM, Almog Gavra wrote:
> >>>>>>> Hello Folk!
> >>>>>>>
> >>>>>>> Glad to see improvements to the GlobalKTables in discussion! I
> think
> >>>> they
> >>>>>>> deserve more love :)
> >>>>>>>
> >>>>>>> Scope creep alert (which I'm generally against and certainly still
> >>>>>> support
> >>>>>>> this KIP without but I want to see if there's an elegant way to
> >>>>>>> address
> >>>>>>> both problems). The KIP mentions that "Now the restore is done by
> >>>>>>> reprocessing using an instance from the customer processor
> supplier"
> >>>>>> which
> >>>>>>> I suppose fixed a long-standing bug (
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-8037) but only for
> >>>>>>> GlobalKTables and not for normal KTables that use the
> >>>>>>> source-changelog
> >>>>>>> optimization. Since this API could be used to signal "I want to
> >>>> reprocess
> >>>>>>> on restore" I'm wondering whether it makes sense to design this
> >>>>>>> API in
> >>>> a
> >>>>>>> way that could be extended for KTables as well so a fix for
> >>>>>>> KAFKA-8037
> >>>>>>> would be possible with the same mechanism. Thoughts?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Almog
> >>>>>>>
> >>>>>>> On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
> >>>>>>> <wcarl...@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>>> Hey Bruno,
> >>>>>>>>
> >>>>>>>> 1) I'm actually not sure why that is in there. It certainly
> doesn't
> >>>>>> match
> >>>>>>>> the convention. Best to remove it and match the other methods.
> >>>>>>>>
> >>>>>>>> 2) Yeah, I thought about it but I'm not convinced it is a
> necessary
> >>>>>>>> restriction. It might be useful for the already defined
> >>>>>>>> processors but
> >>>>>> then
> >>>>>>>> they might as well use the `globalTable` method. I think the add
> >>>>>>>> state
> >>>>>>>> store option should go for maximum flexibility.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Walker
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna <
> cado...@apache.org>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Walker,
> >>>>>>>>>
> >>>>>>>>> A couple of follow-up questions.
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>> Why do you propose to explicitly pass a parameter "storeName" in
> >>>>>>>>> StreamsBuilder#addGlobalStore?
> >>>>>>>>> The StoreBuilder should already provide a name for the store, if
> I
> >>>>>>>>> understand the code correctly.
> >>>>>>>>> I would avoid using the same name for the source node and the
> >>>>>>>>> state
> >>>>>>>>> store, because it limits the flexibility in naming. Why do you
> not
> >>>> use
> >>>>>>>>> Named for the name of the source node?
> >>>>>>>>>
> >>>>>>>>> 2.
> >>>>>>>>> Did you consider Matthias' proposal to restrict the type of the
> >>>>>>>>> store
> >>>>>>>>> builder to `StoreBuilder<TimestampedKeyValueStore>` (or even
> >>>>>>>>> `StoreBuilder<? extends TimestampedKeyValueStore>`) for the case
> >>>> where
> >>>>>>>>> the processor is built-in?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>> On 3/13/24 11:05 PM, Walker Carlson wrote:
> >>>>>>>>>> Thanks for the feedback Bruno, Matthias, and Lucas!
> >>>>>>>>>>
> >>>>>>>>>> There is a decent amount but I'm going to try and just hit the
> >>>>>>>>>> major
> >>>>>>>>> points
> >>>>>>>>>> as I would like to keep this change simple.
> >>>>>>>>>>
> >>>>>>>>>> I've made corrections for the mistakes pointed out. Thanks for
> >>>>>>>>>> the
> >>>>>>>>>> suggestions everyone.
> >>>>>>>>>>
> >>>>>>>>>> The main sticking point seems to be with the method of
> signalling
> >>>> the
> >>>>>>>>>> restore behavior. It seems we can all agree with how the API
> >>>>>>>>>> should
> >>>>>>>> look
> >>>>>>>>>> with the default option we are adding. I think keeping the
> >>>>>>>>>> option to
> >>>>>>>> load
> >>>>>>>>>> directly from the topic into the store is a good idea. It is
> much
> >>>> more
> >>>>>>>>>> performant and could make a simple metric collector processor
> >>>>>>>>>> much
> >>>>>>>>> simpler.
> >>>>>>>>>>
> >>>>>>>>>> I think something that Matthais said about creating a special
> >>>>>>>>>> class
> >>>> of
> >>>>>>>>>> processors for the global stores helps me think about the
> >>>>>>>>>> issue. I
> >>>>>> tend
> >>>>>>>>> to
> >>>>>>>>>> fall into the category that we should keep global stores open
> >>>>>>>>>> to the
> >>>>>>>>>> possibility of having child nodes in the future. I don't
> >>>>>>>>>> really see
> >>>>>> the
> >>>>>>>>>> downside of having that as an option. It might not be best for
> >>>>>>>>>> a lot
> >>>>>> of
> >>>>>>>>>> cases, but something simple could be very useful to put in the
> >>>>>>>>>> PAPI.
> >>>>>>>>>>
> >>>>>>>>>> I like the idea of having a `GlobalStoreParameters` but only
> >>>>>>>>>> if we
> >>>>>>>> decide
> >>>>>>>>>> to make the processor need to extend an interface like
> >>>>>>>>>> 'GobalStoreProcessor`. If not that seems excessive.
> >>>>>>>>>>
> >>>>>>>>>> As of right now I don't see a better option than having a
> boolean
> >>>> flag
> >>>>>>>>> for
> >>>>>>>>>> the reprocessOnRestore option. I expanded the description in the
> >>>> docs
> >>>>>>>> so
> >>>>>>>>> I
> >>>>>>>>>> hope that helps.
> >>>>>>>>>>
> >>>>>>>>>> I am more than willing to take other ideas on it.
> >>>>>>>>>>
> >>>>>>>>>> thanks,
> >>>>>>>>>> Walker
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
>

Reply via email to