I think I figured it out after digging through the source code ...
- The Processor is called for each new message in the global topic, and *must* write updated values to the state store - The deserializers are invoked to decode each message key/value bytes before passing the decoded values to the Processor. - Restoring from existing messages in the global topic occurs through a different path (via registering a StateRestoreCallback on context). For my use-case it seems easiest to write a custom state store that contains a hash-map updated by both the Processor update, and the StateRestoreCallback. Any better ideas / suggestions ? Thanks, Ross On Wed, 4 Nov 2020 at 20:41, Ross Black <[email protected]> wrote: > Hi, > > I am trying to implement some functionality using a global state store, > and would like some help so that I can understand it. > > I am using the Processor API with Kafka 2.6. > > The code I have so far is : > > final StoreBuilder<KeyValueStore<String, String>> storeBuilder = > Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), > Serdes.String(), > Serdes.String()) > .withCachingEnabled() > .withLoggingDisabled(); > > return new Topology() > .addGlobalStore(storeBuilder, > "global-source", > Serdes.Integer().deserializer(), > Serdes.Integer().deserializer(), > "global-topic", > "global-store-processor", > StoreProcessor::new) > > > private static class StoreProcessor implements Processor<String, String> { > @Override > public void init(final ProcessorContext context) { > System.out.println("## init"); > } > > @Override > public void process(final String key, final String value) { > System.out.println(String.format("## process: %s,%s", key, value)); > } > > @Override > public void close() { > System.out.println("## close"); > } > } > > > > Can anybody provide information about the purpose of the Processor > parameter passed to addGlobalStore ? > My code seems to work ok without any functionality in StoreProcessor. > Should it be interacting in any way with the store, or is it just a > listener to be notified of new elements on the global topic? > It does not appear to be notified of existing messages. > > I am also confused as to the deserializers passed into the > addGlobalStore. How are they used? (I could only see the Serde from the > storeBuilder being used) > > In my use-case, the global topic is very small, and the entire set of > messages from that topic should easily fit in memory. > I essentially just want an in-memory hash-map that holds all values from > the global topic and allows for fast lookup by key. > Ideally, all messages from the global topic should be loaded into memory > at startup (and on updated by new messages from the global topic). > Any suggestions as to how to best implement this? > > Thanks, > Ross > >
