@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code path, we only support record-ts, but there is no need for a custom-ts because for regular changelog topics KS sets the ts, and thus, the optimization this KIP proposes required that the global topic follow the changelog format, ie, the ts must be in the record-ts.

However, for the regular processing path, I am not sure if we can omit deserializers. The way the PAPI is wired up, seems to require that we give proper types to _other_ Processor that read from the global state store. For this reason, the store (which takes `Serdes` with proper types) is wrapped with a `MeteredStore` (like all others) to do the Serde work, and this MeteredStore is also exposed to the global-Processor? Might be good for Walker to dig into this to find out the details?

If would of course be nice if we could avoid the unnecessary deserialization on topic read, and re-serialization on global-store put for this case, but it seems not to be straightforward to do...


(2). Is this about the PAPI/Topology? For this case, we don't have any config object across the board. We only do this in the DSL. Hence, I would propose to just follow the existing pattern in this KIP to keep the API consistent. For the DSL, it could make sense of course. -- Of course, if we think the PAPI could be improved with config objects, we could do this in a dedicate KIP.


@Lucas:

The PAPI is unfortunately (by design) much more open and less restrictive. If a users has a custom state store, we need some `Processor` code from them, because we cannot provide a built-in processor for an unknown store. The overload which won't take a processor would only work for the built-in key-value store, what I assume would cover most use-cases, however, we should keep the door open for other use cases. Otherwise, we disallow this optimization for custom stores. PAPI is really about flexibility, and yes, with great power comes great responsibility for the users :)

But this actually highlights a different aspect: the overload not accepting a custom `Processor` but using a built-in processor, should not accept a generic `StoreBuilder<?>` but should restrict the type to `StoreBuilder<TimestampedKeyValueStore>`?


-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:
Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
  1) a copy-restore variant without custom processing, as you propose.
  2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key
and value deserializer in Topology#addGlobalStore() that do not take a
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any
records, do they still need to deserialize records and extract
timestamps? Name might still be needed, right?

2.
  From an API point of view, it might make sense to put all
processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in
the KIP?


Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:
Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on
"restore reprocessing" is certainly a good improvement.

  From an API design POV, I like the idea to not require passing in a
ProcessorSupplier to begin with. Given the current implementation of the
restore process, this might have been the better API from the beginning
on... Well, better late than never :)

For this new method w/o a supplier, I am wondering if we want to keep
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
thing via KIP-813. Just an idea.

However, I am not convinced that adding a new boolean parameter is the
best way to design the API. Unfortunately, I don't have any elegant
proposal myself. Just a somewhat crazy idea to do a larger API change:

Making a step back, a global store, is by definition a terminal node --
we don't support to add child nodes. Hence, while we expose a full
`ProcessorContext` interface, we actually limit what functionality it
supports. Thus, I am wondering if we should stop using the generic
`Processor` interface to begin with, but design a new one which is
tailored to the needs of global stores? -- This would of course be of
much larger scope than originally intended by this KIP, but it might be
a great opportunity to kill two birds with one stone?

The only other question to consider is: do we believe that global stores
will never have child nodes, or could we actually allow for child nodes
in the future? If yes, it might not be smart to move off using
`Processor` interface.... In general, I could imagine, especially as we
now want to support "process on restore" to allow simple stateless
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
custom global processors) at some point in the future?

Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:
Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker

Reply via email to