Well, the store must be byte-based to get fault-tolerance working. If we loose a global-state store and recover it, it's a different code path that does not go through the processor, but we take the plain bytes from the Kafka topic and put them directly into the state store. This only work if the state store is byte-based.
And yes, a `GlobalKTable` just implement the `Processor` and does the upsert of the data for you, and could give a wrong impression how it works under the hood. -Matthias On 11/4/20 1:24 PM, Ross Black wrote: > Hi Matthias, > > Thanks for your response. The Kafka issue you linked provides me lots of > useful context. > >> why do you want to write a custom state store? > > I really only wrote a custom store because I was struggling to understand > the behaviour when using the existing stores. > I initially started with a KGlobalTable (via StreamsBuilder.globalTable), > which led me to make some incorrect assumptions about the behaviour of the > global store. > Going through the exercise of writing the custom store helped me understand > how everything worked. > > One advantage with the custom store is that I can deserialise *all* data > from the global topic and store it in a memory structure to achieve the > fastest read path (with minimal locking). > In my code several lookups are made using the store for every incoming > event, with the most frequent path being that each store lookup will *not* > return a value. > The in-memory store is byte-based and so has some additional overhead of > deserialisation when not using the cache, and the cache adds some > additional overhead for lookup on demand. > Having said that though, I suspect that the in-memory store would probably > be fast enough. > > Thanks, > Ross > > > > On Thu, 5 Nov 2020 at 05:16, Matthias J. Sax <[email protected]> wrote: > >> What you say is correct. The API is not idea unfortunately (something we >> hope to improve on eventually). >> >> An important thing to keep in mind is, that the Processor should not >> modify the data but only `put` them into the state store as-is. >> Otherwise, store recovery breaks (cf >> https://issues.apache.org/jira/browse/KAFKA-7663). >> >> The store restore callback is for information purpose only during restore. >> >> What it unclear to me: why do you want to write a custom state store? >> You already use the in-memory state store (that is internally also just >> a HashMap). >> >> >> -Matthias >> >> On 11/4/20 6:15 AM, Ross Black wrote: >>> I think I figured it out after digging through the source code ... >>> >>> >>> - The Processor is called for each new message in the global topic, >> and >>> *must* write updated values to the state store >>> - The deserializers are invoked to decode each message key/value bytes >>> before passing the decoded values to the Processor. >>> - Restoring from existing messages in the global topic occurs through >> a >>> different path (via registering a StateRestoreCallback on context). >>> >>> >>> For my use-case it seems easiest to write a custom state store that >>> contains a hash-map updated by both the Processor update, and the >>> StateRestoreCallback. >>> >>> Any better ideas / suggestions ? >>> >>> Thanks, >>> Ross >>> >>> >>> On Wed, 4 Nov 2020 at 20:41, Ross Black <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> I am trying to implement some functionality using a global state store, >>>> and would like some help so that I can understand it. >>>> >>>> I am using the Processor API with Kafka 2.6. >>>> >>>> The code I have so far is : >>>> >>>> final StoreBuilder<KeyValueStore<String, String>> storeBuilder = >> Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), >>>> Serdes.String(), >>>> Serdes.String()) >>>> .withCachingEnabled() >>>> .withLoggingDisabled(); >>>> >>>> return new Topology() >>>> .addGlobalStore(storeBuilder, >>>> "global-source", >>>> Serdes.Integer().deserializer(), >>>> Serdes.Integer().deserializer(), >>>> "global-topic", >>>> "global-store-processor", >>>> StoreProcessor::new) >>>> >>>> >>>> private static class StoreProcessor implements Processor<String, >> String> { >>>> @Override >>>> public void init(final ProcessorContext context) { >>>> System.out.println("## init"); >>>> } >>>> >>>> @Override >>>> public void process(final String key, final String value) { >>>> System.out.println(String.format("## process: %s,%s", key, >> value)); >>>> } >>>> >>>> @Override >>>> public void close() { >>>> System.out.println("## close"); >>>> } >>>> } >>>> >>>> >>>> >>>> Can anybody provide information about the purpose of the Processor >>>> parameter passed to addGlobalStore ? >>>> My code seems to work ok without any functionality in StoreProcessor. >>>> Should it be interacting in any way with the store, or is it just a >>>> listener to be notified of new elements on the global topic? >>>> It does not appear to be notified of existing messages. >>>> >>>> I am also confused as to the deserializers passed into the >>>> addGlobalStore. How are they used? (I could only see the Serde from >> the >>>> storeBuilder being used) >>>> >>>> In my use-case, the global topic is very small, and the entire set of >>>> messages from that topic should easily fit in memory. >>>> I essentially just want an in-memory hash-map that holds all values from >>>> the global topic and allows for fast lookup by key. >>>> Ideally, all messages from the global topic should be loaded into memory >>>> at startup (and on updated by new messages from the global topic). >>>> Any suggestions as to how to best implement this? >>>> >>>> Thanks, >>>> Ross >>>> >>>> >>> >> >
