If the custom store is a key-value store, yes, we could do this. But the interface does not enforce a key-value store, it's just a most generic `StateStore` that we pass in, and thus it could be something totally unknown to us, and we cannot apply a cast...

The underlying idea is really about 100% flexibility in the PAPI layer.

That's also the reason why all stores need to provide a callback for the restore path. Kafka Streams runtime can only read the record from the changelog, but it cannot put it into the store, as the runtime only sees the `StateStore` interface -- thus, we invoke a store specific callback (`StateRestoreCallback` interface) that needs to actually put the data into the store for us. For our built-in store, we of course provide these callbacks, but the point is, that the runtime does not know anything about the nature of the store but is fully agnostic to it, to allow the plugin of any custom store with any custom interface (which just needs to implement `StateStore`).


Not sure if I understand what you mean by this transformation step?



-Matthias


On 3/12/24 3:04 AM, Lucas Brutschy wrote:
@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function<ConsumerRecord<byte[], byte[]>,
ConsumerRecord<byte[], byte[]>>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like <K,V> => <V,K>

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax <mj...@apache.org> wrote:

@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code
path, we only support record-ts, but there is no need for a custom-ts
because for regular changelog topics KS sets the ts, and thus, the
optimization this KIP proposes required that the global topic follow the
changelog format, ie, the ts must be in the record-ts.

However, for the regular processing path, I am not sure if we can omit
deserializers. The way the PAPI is wired up, seems to require that we
give proper types to _other_ Processor that read from the global state
store. For this reason, the store (which takes `Serdes` with proper
types) is wrapped with a `MeteredStore` (like all others) to do the
Serde work, and this MeteredStore is also exposed to the
global-Processor? Might be good for Walker to dig into this to find out
the details?

If would of course be nice if we could avoid the unnecessary
deserialization on topic read, and re-serialization on global-store put
for this case, but it seems not to be straightforward to do...


(2). Is this about the PAPI/Topology? For this case, we don't have any
config object across the board. We only do this in the DSL. Hence, I
would propose to just follow the existing pattern in this KIP to keep
the API consistent. For the DSL, it could make sense of course. -- Of
course, if we think the PAPI could be improved with config objects, we
could do this in a dedicate KIP.


@Lucas:

The PAPI is unfortunately (by design) much more open and less
restrictive. If a users has a custom state store, we need some
`Processor` code from them, because we cannot provide a built-in
processor for an unknown store. The overload which won't take a
processor would only work for the built-in key-value store, what I
assume would cover most use-cases, however, we should keep the door open
for other use cases. Otherwise, we disallow this optimization for custom
stores. PAPI is really about flexibility, and yes, with great power
comes great responsibility for the users :)

But this actually highlights a different aspect: the overload not
accepting a custom `Processor` but using a built-in processor, should
not accept a generic `StoreBuilder<?>` but should restrict the type to
`StoreBuilder<TimestampedKeyValueStore>`?


-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:
Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
   1) a copy-restore variant without custom processing, as you propose.
   2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key
and value deserializer in Topology#addGlobalStore() that do not take a
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any
records, do they still need to deserialize records and extract
timestamps? Name might still be needed, right?

2.
   From an API point of view, it might make sense to put all
processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in
the KIP?


Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:
Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on
"restore reprocessing" is certainly a good improvement.

   From an API design POV, I like the idea to not require passing in a
ProcessorSupplier to begin with. Given the current implementation of the
restore process, this might have been the better API from the beginning
on... Well, better late than never :)

For this new method w/o a supplier, I am wondering if we want to keep
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
thing via KIP-813. Just an idea.

However, I am not convinced that adding a new boolean parameter is the
best way to design the API. Unfortunately, I don't have any elegant
proposal myself. Just a somewhat crazy idea to do a larger API change:

Making a step back, a global store, is by definition a terminal node --
we don't support to add child nodes. Hence, while we expose a full
`ProcessorContext` interface, we actually limit what functionality it
supports. Thus, I am wondering if we should stop using the generic
`Processor` interface to begin with, but design a new one which is
tailored to the needs of global stores? -- This would of course be of
much larger scope than originally intended by this KIP, but it might be
a great opportunity to kill two birds with one stone?

The only other question to consider is: do we believe that global stores
will never have child nodes, or could we actually allow for child nodes
in the future? If yes, it might not be smart to move off using
`Processor` interface.... In general, I could imagine, especially as we
now want to support "process on restore" to allow simple stateless
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
custom global processors) at some point in the future?

Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:
Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker

Reply via email to