Hi Guozhang, Thanks for the information.
My requirement is some thing like this. 1. i want to read the data from one topic (which is continuously feeding), so i though of using the kafka streams with threads 2. want to store the data in one in memory data base (not the local data store per thread) If i have to write my own Statestore logic with handling of synchronization is it equal to having my own global data structure in all threads ? Any performance impact will be their with our own sync ? Can you pelase share if you have any sample programs or links describing on this . Thanks & Regards, Ranjit On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Ranjit, > > Note that the "testStore" instance you are passing is a StateStoreSupplier > which will generate a new StateStore instance for each thread's task. > > If you really want to have all the thread's share the same state store you > should implement your own StateStoreSupplier that only return the same > StateStore instance in its "get()" call; however, keep in mind that in this > case this state store could be concurrently accessed by multi-threads which > is not protected by the library itself (by default single-thread access is > guaranteed on the state stores). > > > Guozhang > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com> wrote: > > > Hi All, > > > > I want to use one state store in all my kafka stream threads in my > > application, how can i do it. > > > > 1. i created one topic (name: test2) with 3 partitions . > > 2. wrote kafka stream with num.stream.threads = 3 in java code > > 3. using state store (name: count2) in my application. > > > > But state store (count2) is acting like local to thread, but it should be > > unique to entire application and the same value to be reflected every > where > > how can i do it ? > > > > Do i need to take care any synch also ? > > > > Code: > > ==== > > package com.javatpoint; > > import org.apache.kafka.common.serialization.Serdes; > > import org.apache.kafka.streams.KafkaStreams; > > import org.apache.kafka.streams.StreamsConfig; > > import org.apache.kafka.streams.processor.Processor; > > import org.apache.kafka.streams.processor.ProcessorContext; > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > import org.apache.kafka.streams.processor.TopologyBuilder; > > import org.apache.kafka.streams.state.Stores; > > > > import org.apache.kafka.streams.kstream.KStreamBuilder; > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > import org.apache.kafka.streams.state.KeyValueStore; > > > > import java.util.Properties; > > import java.lang.*; > > > > /** > > * Hello world! > > * > > */ > > public class App > > { > > public static void main( String[] args ) > > { > > /* StateStoreSupplier testStore = Stores.create("count2") > > .withKeys(Serdes.String()) > > .withValues(Serdes.Long()) > > .persistent() > > .build();*/ > > StateStoreSupplier testStore = Stores.create("count2") > > .withStringKeys() > > .withLongValues() > > .persistent() > > .build(); > > > > // TopologyBuilder builder = new TopologyBuilder(); > > final KStreamBuilder builder = new KStreamBuilder(); > > > > builder.addSource("source", "test2").addProcessor("process", > > TestProcessor::new, "source").addStateStore(testStore, "process"); > > > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092"); > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.String().getClass()); > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String().getClass()); > > // props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.ByteArray().getClass().getName()); > > // props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.ByteArray().getClass().getName()); > > > > props.put("auto.offset.reset", "latest"); > > props.put("num.stream.threads", 3); > > > > System.out.println("test1"); > > KafkaStreams streams = new KafkaStreams(builder, props); > > System.out.println("test2"); > > streams.start(); > > } > > > > // public static class TestProcessor implements Processor<byte[], > > byte[]> { > > public static class TestProcessor implements Processor<String, > String> > > { > > private KeyValueStore<String, Long> kvStore; > > private ProcessorContext context; > > > > @Override > > public void init(ProcessorContext context) { > > this.context = context; > > // context.getStateStore("count2"); > > System.out.println("Initialized"); > > this.kvStore = (KeyValueStore<String, Long>) > > context.getStateStore("count2"); > > > > } > > > > @Override > > public void process(String k, String v) { > > // public void process(byte[] k, byte[] v) { > > System.out.println("Processing " + k + " -> " + v); > > try { > > > > > > Long oldValue = this.kvStore.get(v); > > System.out.println("Oldval " + oldValue + " -> Key " + v); > > if (oldValue == null) { > > this.kvStore.put(v, 1L); > > } > > else > > { > > this.kvStore.put(v, oldValue + 1L); > > } > > Thread.sleep(10000); > > } catch (Exception e) { > > System.out.println(e); > > } > > } > > > > @Override > > public void punctuate(long timestamp) { > > > > } > > > > @Override > > public void close() { > > > > } > > } > > } > > > > > > -- > -- Guozhang >