Daan,

thanks for the KIP. I personally find the motivation section a little bit confusing. If I understand the KIP correctly, you want to read a topic into a state store (ie, materialize it). This is already possible today.

Of course, today a "second" changelog topic would be created. It seems the KIP aims to avoid the additional changelog topic, and to allow to re-use the original input topic (this optimization is already available for the DSL, but not for the PAPI).

If my observation is correct, we can simplify the motivation accordingly (the fact that you want to use this feature to share state across different applications more efficiently seems to be secondary and we could omit it IMHO to keep the motivation focused).

As a result, we also don't need to concept of "leader" and "follower". In the end, Kafka Streams cannot reason/enforce any usage patterns across different apps, but we can only guarantee stuff within a single application (ie, don't create a changelog but reuse an input topic as changelog). It would simplify the KIP if we remove these parts.



For the API, I am wondering why you propose to pass in `processorNames`? To me, it seems more reasonable to pass a `ProcessorSupplier` instead (similar to what we do for `addGlobalStore`)? The provided `Processor` must implement a certain pattern, ie, take each input record an apply it unmodified to the state store (ie, the Processor will be solely responsible to maintain the state store). We might also need to pass in other argument similar to `addGlobalStore` into this method). (More below.)

If other processors need to read the state store, they can be connected to it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid approach to keep `processorName` would also be possible, but IMHO all those should only _read_ the state store (but not modify it), to keep a clear conceptual separation.

About the method name: wondering if we should use a different name to be more explicit what the method does? Maybe `addReadOnlyStateStore`?



Btw: please omit any code snippets and only put the newly added method signature in the KIP.

What I don't yet understand is the section "Allow state stores to continue listening for changes from their changelog". Can you elaborate?

About:

Since a changelog topic is created with the application id in it’s name, it 
would allow us to check in the follower if the changelog topic starts with our 
application id. If it doesn’t, we are not allowed to send a log.

The DSL implements this differently, and just disabled the changelog for the state store (ie, for the "follower"). We could do the same thing (either enforcing that the provided `StoreBuilder` has changelogging disabled, or by just ignoring it and disabled it hard coded).


Ie, overall I would prefer the "source-procssor appraoch" that you put into rejected alternatives. Note that the problem you call out, namely

Problem with this approach is the lack of having restoring support within the 
state store

does not apply. A restore it absolutely possible and the DSL already supports it.


Or is your concern with regard to performance? The "source-processor approach" would have the disadvantage that input data is first deserialized, fed into the Processor, and than serialized again when put into the state store. Re-using the state restore code is a good idea from a performance point of view, but it might require quite some internal changes (your proposal to "not stop restoring" might not work as it could trigger quite some undesired side-effects given the current architecture of Kafka Streams).


-Matthias




On 1/16/22 11:52 PM, Daan Gertis wrote:
Hey everyone,

Just created a KIP on sharing statestore state across multiple applications 
without duplicating the data on multiple changelog topics. Have a look and tell 
me what you think or what to improve. This is my first one, so please be gentle 
😉

https://cwiki.apache.org/confluence/x/q53kCw

Cheers!
D.

Reply via email to