Hi,
I am trying to implement some functionality using a global state store, and
would like some help so that I can understand it.
I am using the Processor API with Kafka 2.6.
The code I have so far is :
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String())
.withCachingEnabled()
.withLoggingDisabled();
return new Topology()
.addGlobalStore(storeBuilder,
"global-source",
Serdes.Integer().deserializer(),
Serdes.Integer().deserializer(),
"global-topic",
"global-store-processor",
StoreProcessor::new)
private static class StoreProcessor implements Processor<String, String> {
@Override
public void init(final ProcessorContext context) {
System.out.println("## init");
}
@Override
public void process(final String key, final String value) {
System.out.println(String.format("## process: %s,%s", key, value));
}
@Override
public void close() {
System.out.println("## close");
}
}
Can anybody provide information about the purpose of the Processor
parameter passed to addGlobalStore ?
My code seems to work ok without any functionality in StoreProcessor.
Should it be interacting in any way with the store, or is it just a
listener to be notified of new elements on the global topic?
It does not appear to be notified of existing messages.
I am also confused as to the deserializers passed into the addGlobalStore.
How are they used? (I could only see the Serde from the storeBuilder being
used)
In my use-case, the global topic is very small, and the entire set of
messages from that topic should easily fit in memory.
I essentially just want an in-memory hash-map that holds all values from
the global topic and allows for fast lookup by key.
Ideally, all messages from the global topic should be loaded into memory at
startup (and on updated by new messages from the global topic).
Any suggestions as to how to best implement this?
Thanks,
Ross