I think I figured it out after digging through the source code ...

   - The Processor is called for each new message in the global topic, and
   *must* write updated values to the state store
   - The deserializers are invoked to decode each message key/value bytes
   before passing the decoded values to the Processor.
   - Restoring from existing messages in the global topic occurs through a
   different path (via registering a StateRestoreCallback on context).


For my use-case it seems easiest to write a custom state store that
contains a hash-map updated by both the Processor update, and the
StateRestoreCallback.

Any better ideas / suggestions ?

Thanks,
Ross


On Wed, 4 Nov 2020 at 20:41, Ross Black <[email protected]> wrote:

> Hi,
>
> I am trying to implement some functionality using a global state store,
> and would like some help so that I can understand it.
>
> I am using the Processor API with Kafka 2.6.
>
> The code I have so far is :
>
> final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
> Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"),
>         Serdes.String(),
>         Serdes.String())
>         .withCachingEnabled()
>         .withLoggingDisabled();
>
> return new Topology()
>         .addGlobalStore(storeBuilder,
>                 "global-source",
>                 Serdes.Integer().deserializer(),
>                 Serdes.Integer().deserializer(),
>                 "global-topic",
>                 "global-store-processor",
>                 StoreProcessor::new)
>
>
> private static class StoreProcessor implements Processor<String, String> {
>     @Override
>     public void init(final ProcessorContext context) {
>         System.out.println("## init");
>     }
>
>     @Override
>     public void process(final String key, final String value) {
>         System.out.println(String.format("## process: %s,%s", key, value));
>     }
>
>     @Override
>     public void close() {
>         System.out.println("## close");
>     }
> }
>
>
>
> Can anybody provide information about the purpose of the Processor
> parameter passed to addGlobalStore ?
> My code seems to work ok without any functionality in StoreProcessor.
> Should it be interacting in any way with the store, or is it just a
> listener to be notified of new elements on the global topic?
> It does not appear to be notified of existing messages.
>
> I am also confused as to the deserializers passed into the
> addGlobalStore.  How are they used?  (I could only see the Serde from the
> storeBuilder being used)
>
> In my use-case, the global topic is very small, and the entire set of
> messages from that topic should easily fit in memory.
> I essentially just want an in-memory hash-map that holds all values from
> the global topic and allows for fast lookup by key.
> Ideally, all messages from the global topic should be loaded into memory
> at startup (and on updated by new messages from the global topic).
> Any suggestions as to how to best implement this?
>
> Thanks,
> Ross
>
>

Reply via email to