Hi Guozhang,

Thanks for the information.

My requirement is some thing like this.

1. i want to read the data from one topic (which is continuously feeding),
so i though of using the kafka streams with threads
2. want to store the data in one in memory data base (not the local data
store per thread)

If i have to write my own Statestore logic with handling of synchronization
is it equal to having my own global data structure in all threads ?

Any performance impact will be their with our own sync ? Can you pelase
share if you have any sample programs or links describing on this .

Thanks & Regards,
Ranjit

On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Ranjit,
>
> Note that the "testStore" instance you are passing is a StateStoreSupplier
> which will generate a new StateStore instance for each thread's task.
>
> If you really want to have all the thread's share the same state store you
> should implement your own StateStoreSupplier that only return the same
> StateStore instance in its "get()" call; however, keep in mind that in this
> case this state store could be concurrently accessed by multi-threads which
> is not protected by the library itself (by default single-thread access is
> guaranteed on the state stores).
>
>
> Guozhang
>
> On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar <ranjit...@gmail.com> wrote:
>
> > Hi All,
> >
> > I want to use one state store in all my kafka stream threads in my
> > application, how can i do it.
> >
> > 1. i created one topic (name: test2) with 3 partitions .
> > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > 3. using state store (name: count2) in my application.
> >
> > But state store (count2) is acting like local to thread, but it should be
> > unique to entire application and the same value to be reflected every
> where
> > how can i do it ?
> >
> > Do i need to take care any synch also ?
> >
> > Code:
> > ====
> > package com.javatpoint;
> > import org.apache.kafka.common.serialization.Serdes;
> > import org.apache.kafka.streams.KafkaStreams;
> > import org.apache.kafka.streams.StreamsConfig;
> > import org.apache.kafka.streams.processor.Processor;
> > import org.apache.kafka.streams.processor.ProcessorContext;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.processor.TopologyBuilder;
> > import org.apache.kafka.streams.state.Stores;
> >
> > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.state.KeyValueStore;
> >
> > import java.util.Properties;
> > import java.lang.*;
> >
> > /**
> >  * Hello world!
> >  *
> >  */
> > public class App
> > {
> >     public static void main( String[] args )
> >     {
> > /*        StateStoreSupplier testStore = Stores.create("count2")
> >                 .withKeys(Serdes.String())
> >                 .withValues(Serdes.Long())
> >                 .persistent()
> >                 .build();*/
> >         StateStoreSupplier testStore = Stores.create("count2")
> >                 .withStringKeys()
> >                 .withLongValues()
> >                 .persistent()
> >                 .build();
> >
> > //        TopologyBuilder builder = new TopologyBuilder();
> >         final KStreamBuilder builder = new KStreamBuilder();
> >
> >         builder.addSource("source", "test2").addProcessor("process",
> > TestProcessor::new, "source").addStateStore(testStore, "process");
> >
> >         Properties props = new Properties();
> >         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> >         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > //        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> > //        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> >
> >         props.put("auto.offset.reset", "latest");
> >         props.put("num.stream.threads", 3);
> >
> >             System.out.println("test1");
> >         KafkaStreams streams = new KafkaStreams(builder, props);
> >             System.out.println("test2");
> >         streams.start();
> >     }
> >
> > //    public static class TestProcessor implements Processor<byte[],
> > byte[]> {
> >     public static class TestProcessor implements Processor<String,
> String>
> > {
> >          private  KeyValueStore<String, Long> kvStore;
> >             private ProcessorContext context;
> >
> >         @Override
> >         public void init(ProcessorContext context) {
> >                 this.context = context;
> > //            context.getStateStore("count2");
> >             System.out.println("Initialized");
> >                 this.kvStore = (KeyValueStore<String, Long>)
> >                                     context.getStateStore("count2");
> >
> >         }
> >
> >         @Override
> >        public void process(String k, String v) {
> >  //       public void process(byte[] k, byte[] v) {
> >             System.out.println("Processing " + k + " -> " + v);
> >                 try {
> >
> >
> >                         Long oldValue = this.kvStore.get(v);
> >             System.out.println("Oldval " + oldValue + " -> Key " + v);
> >                         if (oldValue == null) {
> >                                 this.kvStore.put(v, 1L);
> >                         }
> >                         else
> >                         {
> >                                 this.kvStore.put(v, oldValue + 1L);
> >                         }
> >                           Thread.sleep(10000);
> >            } catch (Exception e) {
> >              System.out.println(e);
> >            }
> >         }
> >
> >         @Override
> >         public void punctuate(long timestamp) {
> >
> >         }
> >
> >         @Override
> >         public void close() {
> >
> >         }
> >     }
> > }
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to