What you say is correct. The API is not idea unfortunately (something we hope to improve on eventually).
An important thing to keep in mind is, that the Processor should not modify the data but only `put` them into the state store as-is. Otherwise, store recovery breaks (cf https://issues.apache.org/jira/browse/KAFKA-7663). The store restore callback is for information purpose only during restore. What it unclear to me: why do you want to write a custom state store? You already use the in-memory state store (that is internally also just a HashMap). -Matthias On 11/4/20 6:15 AM, Ross Black wrote: > I think I figured it out after digging through the source code ... > > > - The Processor is called for each new message in the global topic, and > *must* write updated values to the state store > - The deserializers are invoked to decode each message key/value bytes > before passing the decoded values to the Processor. > - Restoring from existing messages in the global topic occurs through a > different path (via registering a StateRestoreCallback on context). > > > For my use-case it seems easiest to write a custom state store that > contains a hash-map updated by both the Processor update, and the > StateRestoreCallback. > > Any better ideas / suggestions ? > > Thanks, > Ross > > > On Wed, 4 Nov 2020 at 20:41, Ross Black <[email protected]> wrote: > >> Hi, >> >> I am trying to implement some functionality using a global state store, >> and would like some help so that I can understand it. >> >> I am using the Processor API with Kafka 2.6. >> >> The code I have so far is : >> >> final StoreBuilder<KeyValueStore<String, String>> storeBuilder = >> Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), >> Serdes.String(), >> Serdes.String()) >> .withCachingEnabled() >> .withLoggingDisabled(); >> >> return new Topology() >> .addGlobalStore(storeBuilder, >> "global-source", >> Serdes.Integer().deserializer(), >> Serdes.Integer().deserializer(), >> "global-topic", >> "global-store-processor", >> StoreProcessor::new) >> >> >> private static class StoreProcessor implements Processor<String, String> { >> @Override >> public void init(final ProcessorContext context) { >> System.out.println("## init"); >> } >> >> @Override >> public void process(final String key, final String value) { >> System.out.println(String.format("## process: %s,%s", key, value)); >> } >> >> @Override >> public void close() { >> System.out.println("## close"); >> } >> } >> >> >> >> Can anybody provide information about the purpose of the Processor >> parameter passed to addGlobalStore ? >> My code seems to work ok without any functionality in StoreProcessor. >> Should it be interacting in any way with the store, or is it just a >> listener to be notified of new elements on the global topic? >> It does not appear to be notified of existing messages. >> >> I am also confused as to the deserializers passed into the >> addGlobalStore. How are they used? (I could only see the Serde from the >> storeBuilder being used) >> >> In my use-case, the global topic is very small, and the entire set of >> messages from that topic should easily fit in memory. >> I essentially just want an in-memory hash-map that holds all values from >> the global topic and allows for fast lookup by key. >> Ideally, all messages from the global topic should be loaded into memory >> at startup (and on updated by new messages from the global topic). >> Any suggestions as to how to best implement this? >> >> Thanks, >> Ross >> >> >
