@Matthias: Thanks, I didn't realize that we need processors for any custom store. Are we sure we cannot build a generic processor to load data into a custom key-value store? I'm not sure, but you know the code better than me.
One other alternative is to allow the user to provide a state transformer `Function<ConsumerRecord<byte[], byte[]>, ConsumerRecord<byte[], byte[]>>` to adapt the state before loading it, defaulting to identity. This would provide the ability to do efficient, non-deserializing transformations like <K,V> => <V,K> On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax <mj...@apache.org> wrote: > > @Bruno: > > (1), I think you are spot for the ts-extractor: on the restore code > path, we only support record-ts, but there is no need for a custom-ts > because for regular changelog topics KS sets the ts, and thus, the > optimization this KIP proposes required that the global topic follow the > changelog format, ie, the ts must be in the record-ts. > > However, for the regular processing path, I am not sure if we can omit > deserializers. The way the PAPI is wired up, seems to require that we > give proper types to _other_ Processor that read from the global state > store. For this reason, the store (which takes `Serdes` with proper > types) is wrapped with a `MeteredStore` (like all others) to do the > Serde work, and this MeteredStore is also exposed to the > global-Processor? Might be good for Walker to dig into this to find out > the details? > > If would of course be nice if we could avoid the unnecessary > deserialization on topic read, and re-serialization on global-store put > for this case, but it seems not to be straightforward to do... > > > (2). Is this about the PAPI/Topology? For this case, we don't have any > config object across the board. We only do this in the DSL. Hence, I > would propose to just follow the existing pattern in this KIP to keep > the API consistent. For the DSL, it could make sense of course. -- Of > course, if we think the PAPI could be improved with config objects, we > could do this in a dedicate KIP. > > > @Lucas: > > The PAPI is unfortunately (by design) much more open and less > restrictive. If a users has a custom state store, we need some > `Processor` code from them, because we cannot provide a built-in > processor for an unknown store. The overload which won't take a > processor would only work for the built-in key-value store, what I > assume would cover most use-cases, however, we should keep the door open > for other use cases. Otherwise, we disallow this optimization for custom > stores. PAPI is really about flexibility, and yes, with great power > comes great responsibility for the users :) > > But this actually highlights a different aspect: the overload not > accepting a custom `Processor` but using a built-in processor, should > not accept a generic `StoreBuilder<?>` but should restrict the type to > `StoreBuilder<TimestampedKeyValueStore>`? > > > -Matthias > > > > On 3/6/24 1:14 PM, Lucas Brutschy wrote: > > Hey Walker > > > > Thanks for the KIP, and congrats on the KiBiKIP ;) > > > > My main point is that I'd vote against introducing > > `reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is > > just incorrect and should be removed or deprecated. If we think we > > need to keep the old behavior around, renaming the methods, e.g., to > > `addGlobalReadOnlyStore`, is a great opportunity to deprecate the old > > behavior. But at a first glance, the old behavior just looks like a > > bug to me and should just be removed. > > > > So for this KIP, I'd keep two variants as you propose and drop the > > boolean parameter, but the two variants will be > > 1) a copy-restore variant without custom processing, as you propose. > > 2) a process-restore variant with custom processing (parameters the > > same as before). This should be combined with a clear warning in the > > Javadoc of the performance downside of this approach. > > > > Presentation: > > 1) I wonder if you could make another pass on the motivation section. > > I was lacking some context on this problem, and I think the nature of > > the restore issue only became clear to me when I read through the > > comments in the JIRA ticket you linked. > > 2) If we decide to keep the parameter `reprocessOnRestore`, the > > Javadoc on it should be extended. This is a somewhat subtle issue, and > > I don't think `restore by reprocessing` is enough of an explanation. > > > > Nits: > > > > `{@link ValueTransformer ValueTransformer}` -> `{@link > > ValueTransformer ValueTransformers}` > > `user defined` -> `user-defined` > > > > Cheers, > > Lucas > > > > On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna <cado...@apache.org> wrote: > >> > >> Hi Walker, > >> > >> Thanks for the KIP! > >> > >> Great that you are going to fix this long-standing issue! > >> > >> 1. > >> I was wondering if we need the timestamp extractor as well as the key > >> and value deserializer in Topology#addGlobalStore() that do not take a > >> ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? > >> Since those methods setup a global state store that does not process any > >> records, do they still need to deserialize records and extract > >> timestamps? Name might still be needed, right? > >> > >> 2. > >> From an API point of view, it might make sense to put all > >> processor-related arguments into a parameter object. Something like: > >> GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore() > >> Just an idea, open for discussion. > >> > >> 3. > >> Could you please go over the KIP and correct typos and other mistakes in > >> the KIP? > >> > >> > >> Best, > >> Bruno > >> > >> > >> > >> On 3/2/24 1:43 AM, Matthias J. Sax wrote: > >>> Thanks for the KIP Walker. > >>> > >>> Fixing this issue, and providing users some flexibility to opt-in/out on > >>> "restore reprocessing" is certainly a good improvement. > >>> > >>> From an API design POV, I like the idea to not require passing in a > >>> ProcessorSupplier to begin with. Given the current implementation of the > >>> restore process, this might have been the better API from the beginning > >>> on... Well, better late than never :) > >>> > >>> For this new method w/o a supplier, I am wondering if we want to keep > >>> `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar > >>> thing via KIP-813. Just an idea. > >>> > >>> However, I am not convinced that adding a new boolean parameter is the > >>> best way to design the API. Unfortunately, I don't have any elegant > >>> proposal myself. Just a somewhat crazy idea to do a larger API change: > >>> > >>> Making a step back, a global store, is by definition a terminal node -- > >>> we don't support to add child nodes. Hence, while we expose a full > >>> `ProcessorContext` interface, we actually limit what functionality it > >>> supports. Thus, I am wondering if we should stop using the generic > >>> `Processor` interface to begin with, but design a new one which is > >>> tailored to the needs of global stores? -- This would of course be of > >>> much larger scope than originally intended by this KIP, but it might be > >>> a great opportunity to kill two birds with one stone? > >>> > >>> The only other question to consider is: do we believe that global stores > >>> will never have child nodes, or could we actually allow for child nodes > >>> in the future? If yes, it might not be smart to move off using > >>> `Processor` interface.... In general, I could imagine, especially as we > >>> now want to support "process on restore" to allow simple stateless > >>> operators like `map()` or `filter()` on a `GlobalTable` (or allow to add > >>> custom global processors) at some point in the future? > >>> > >>> Just wanted to put this out to see what people think... > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 2/29/24 1:26 PM, Walker Carlson wrote: > >>>> Hello everybody, > >>>> > >>>> I wanted to propose a change to our addGlobalStore methods so that the > >>>> restore behavior can be controlled on a preprocessor level. This should > >>>> help Kafka Stream users to better tune Global stores added with the > >>>> processor API to better fit their needs. > >>>> > >>>> Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ > >>>> > >>>> Thanks, > >>>> Walker > >>>>