Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key and value deserializer in Topology#addGlobalStore() that do not take a ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()? Since those methods setup a global state store that does not process any records, do they still need to deserialize records and extract timestamps? Name might still be needed, right?

2.
From an API point of view, it might make sense to put all processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in the KIP?


Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:
Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on "restore reprocessing" is certainly a good improvement.

From an API design POV, I like the idea to not require passing in a ProcessorSupplier to begin with. Given the current implementation of the restore process, this might have been the better API from the beginning on... Well, better late than never :)

For this new method w/o a supplier, I am wondering if we want to keep `addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar thing via KIP-813. Just an idea.

However, I am not convinced that adding a new boolean parameter is the best way to design the API. Unfortunately, I don't have any elegant proposal myself. Just a somewhat crazy idea to do a larger API change:

Making a step back, a global store, is by definition a terminal node -- we don't support to add child nodes. Hence, while we expose a full `ProcessorContext` interface, we actually limit what functionality it supports. Thus, I am wondering if we should stop using the generic `Processor` interface to begin with, but design a new one which is tailored to the needs of global stores? -- This would of course be of much larger scope than originally intended by this KIP, but it might be a great opportunity to kill two birds with one stone?

The only other question to consider is: do we believe that global stores will never have child nodes, or could we actually allow for child nodes in the future? If yes, it might not be smart to move off using `Processor` interface.... In general, I could imagine, especially as we now want to support "process on restore" to allow simple stateless operators like `map()` or `filter()` on a `GlobalTable` (or allow to add custom global processors) at some point in the future?

Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:
Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker

Reply via email to