@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function<ConsumerRecord<byte[], byte[]>,
ConsumerRecord<byte[], byte[]>>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like <K,V> => <V,K>

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> @Bruno:
>
> (1), I think you are spot for the ts-extractor: on the restore code
> path, we only support record-ts, but there is no need for a custom-ts
> because for regular changelog topics KS sets the ts, and thus, the
> optimization this KIP proposes required that the global topic follow the
> changelog format, ie, the ts must be in the record-ts.
>
> However, for the regular processing path, I am not sure if we can omit
> deserializers. The way the PAPI is wired up, seems to require that we
> give proper types to _other_ Processor that read from the global state
> store. For this reason, the store (which takes `Serdes` with proper
> types) is wrapped with a `MeteredStore` (like all others) to do the
> Serde work, and this MeteredStore is also exposed to the
> global-Processor? Might be good for Walker to dig into this to find out
> the details?
>
> If would of course be nice if we could avoid the unnecessary
> deserialization on topic read, and re-serialization on global-store put
> for this case, but it seems not to be straightforward to do...
>
>
> (2). Is this about the PAPI/Topology? For this case, we don't have any
> config object across the board. We only do this in the DSL. Hence, I
> would propose to just follow the existing pattern in this KIP to keep
> the API consistent. For the DSL, it could make sense of course. -- Of
> course, if we think the PAPI could be improved with config objects, we
> could do this in a dedicate KIP.
>
>
> @Lucas:
>
> The PAPI is unfortunately (by design) much more open and less
> restrictive. If a users has a custom state store, we need some
> `Processor` code from them, because we cannot provide a built-in
> processor for an unknown store. The overload which won't take a
> processor would only work for the built-in key-value store, what I
> assume would cover most use-cases, however, we should keep the door open
> for other use cases. Otherwise, we disallow this optimization for custom
> stores. PAPI is really about flexibility, and yes, with great power
> comes great responsibility for the users :)
>
> But this actually highlights a different aspect: the overload not
> accepting a custom `Processor` but using a built-in processor, should
> not accept a generic `StoreBuilder<?>` but should restrict the type to
> `StoreBuilder<TimestampedKeyValueStore>`?
>
>
> -Matthias
>
>
>
> On 3/6/24 1:14 PM, Lucas Brutschy wrote:
> > Hey Walker
> >
> > Thanks for the KIP, and congrats on the KiBiKIP ;)
> >
> > My main point is that I'd vote against introducing
> > `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
> > just incorrect and should be removed or deprecated. If we think we
> > need to keep the old behavior around, renaming the methods, e.g., to
> > `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
> > behavior. But at a first glance, the old behavior just looks like a
> > bug to me and should just be removed.
> >
> > So for this KIP, I'd keep two variants as you propose and drop the
> > boolean parameter, but the two variants will be
> >   1) a copy-restore variant without custom processing, as you propose.
> >   2) a process-restore variant with custom processing (parameters the
> > same as before). This should be combined with a clear warning in the
> > Javadoc of the performance downside of this approach.
> >
> > Presentation:
> > 1) I wonder if you could make another pass on the motivation section.
> > I was lacking some context on this problem, and I think the nature of
> > the restore issue only became clear to me when I read through the
> > comments in the JIRA ticket you linked.
> > 2) If we decide to keep the parameter `reprocessOnRestore`, the
> > Javadoc on it should be extended. This is a somewhat subtle issue, and
> > I don't think `restore by reprocessing` is enough of an explanation.
> >
> > Nits:
> >
> > `{@link ValueTransformer ValueTransformer}` -> `{@link
> > ValueTransformer ValueTransformers}`
> > `user defined` -> `user-defined`
> >
> > Cheers,
> > Lucas
> >
> > On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna <cado...@apache.org> wrote:
> >>
> >> Hi Walker,
> >>
> >> Thanks for the KIP!
> >>
> >> Great that you are going to fix this long-standing issue!
> >>
> >> 1.
> >> I was wondering if we need the timestamp extractor as well as the key
> >> and value deserializer in Topology#addGlobalStore() that do not take a
> >> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
> >> Since those methods setup a global state store that does not process any
> >> records, do they still need to deserialize records and extract
> >> timestamps? Name might still be needed, right?
> >>
> >> 2.
> >>   From an API point of view, it might make sense to put all
> >> processor-related arguments into a parameter object. Something like:
> >> GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
> >> Just an idea, open for discussion.
> >>
> >> 3.
> >> Could you please go over the KIP and correct typos and other mistakes in
> >> the KIP?
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >> On 3/2/24 1:43 AM, Matthias J. Sax wrote:
> >>> Thanks for the KIP Walker.
> >>>
> >>> Fixing this issue, and providing users some flexibility to opt-in/out on
> >>> "restore reprocessing" is certainly a good improvement.
> >>>
> >>>   From an API design POV, I like the idea to not require passing in a
> >>> ProcessorSupplier to begin with. Given the current implementation of the
> >>> restore process, this might have been the better API from the beginning
> >>> on... Well, better late than never :)
> >>>
> >>> For this new method w/o a supplier, I am wondering if we want to keep
> >>> `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
> >>> thing via KIP-813. Just an idea.
> >>>
> >>> However, I am not convinced that adding a new boolean parameter is the
> >>> best way to design the API. Unfortunately, I don't have any elegant
> >>> proposal myself. Just a somewhat crazy idea to do a larger API change:
> >>>
> >>> Making a step back, a global store, is by definition a terminal node --
> >>> we don't support to add child nodes. Hence, while we expose a full
> >>> `ProcessorContext` interface, we actually limit what functionality it
> >>> supports. Thus, I am wondering if we should stop using the generic
> >>> `Processor` interface to begin with, but design a new one which is
> >>> tailored to the needs of global stores? -- This would of course be of
> >>> much larger scope than originally intended by this KIP, but it might be
> >>> a great opportunity to kill two birds with one stone?
> >>>
> >>> The only other question to consider is: do we believe that global stores
> >>> will never have child nodes, or could we actually allow for child nodes
> >>> in the future? If yes, it might not be smart to move off using
> >>> `Processor` interface.... In general, I could imagine, especially as we
> >>> now want to support "process on restore" to allow simple stateless
> >>> operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
> >>> custom global processors) at some point in the future?
> >>>
> >>> Just wanted to put this out to see what people think...
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 2/29/24 1:26 PM, Walker Carlson wrote:
> >>>> Hello everybody,
> >>>>
> >>>> I wanted to propose a change to our addGlobalStore methods so that the
> >>>> restore behavior can be controlled on a preprocessor level. This should
> >>>> help Kafka Stream users to better tune Global stores added with the
> >>>> processor API to better fit their needs.
> >>>>
> >>>> Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ
> >>>>
> >>>> Thanks,
> >>>> Walker
> >>>>

Reply via email to