Hi Bruno,

Thanks for your quick reply!

I decided to use a global state store for two reasons. If the application 
crashes, the store is populated properly once the reason for the crash has been 
fixed and the app starts again, i.e. I feel that it gives me a certain 
resiliency. Second we will be running multiple instances of the application and 
using a global state store provides the state across all instances.

I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions 
on better ways to handle the flow I need.

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-----Ursprüngliche Nachricht-----
Von: Bruno Cadonna <br...@confluent.io> 
Gesendet: Dienstag, 19. Mai 2020 10:52
An: Users <users@kafka.apache.org>
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

From your description, I do not see why you need to use a global state instead 
of a local one. Are there any specific reasons for that? With a local state 
store you would have the previous record immediately available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
<georg.schmidt-dum...@de.bosch.com.invalid> wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The 
> incoming messages are validated and transformed. The transformed messages are 
> then published to a global state store via topic A as well as to an 
> additional topic A for consumption by other applications further down the 
> processing pipeline.
>
> As part of the transformation I access the global state store in order to get 
> the values from the previous message and use them in the transformation of 
> the current message. The messages only contain changed values and these 
> changes are merged with the complete data set before being sent on, hence I 
> always hold the latest state in the global store in order to merge it with 
> the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the 
> latest state. The update of the store takes too long so when I access it in 
> the transformation I either get no values or values which do not represent 
> the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” ); final 
> StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> “global-store-topic”,  Consumed.with( Serdes.String(), new 
> JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore<String, JSONObject> stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>    stateStore = (KeyValueStore<String, JSONObject>) 
> context.getStateStore( “global-store” ); }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>    log.info( "Update state store for {}: {}.", key, state );
>    lastRecentStateStore.put( key, state ); }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream<String, JSONObject> stream = builder.stream( “input-topic”, 
> Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>                    .transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>                                    .orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 
> 0 and by disabling the cache on the store using “.withCachingDisabled()”. I 
> also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in 
> the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com<http://www.bosch.com/>
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>

Reply via email to