Hello All, I am revisiting this topic as now I am actually configuring a partitioned topic and would like multiple threads of my streams application running on different instances to process this partitioned topic in parallel.
So I have once source topic partitioned into 40 partitions. The messages it receives are all keyed message and at my producer side I take care that a particular key's messages are sent to a particular partition only. So since my processing is based on that key, I can process individual partitions independently. Now say to process these in parallel I am planning to run a cluster on three machines and each having say 8 threads. So I have configured streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); 1. Now what I wanted to know is that for separate machines running same instance of the streams application, my application.id would be same right. If yes then how does kafka cluser know which partition to assign to which machine and which thread. Because what I see that on same machine each thread has its unique name, so it will get message from a given partition(s) only, but how does kafka cluster know that each machines thread are different from some other machines. Like how does it distinguish thread-1 from machine A vs machine B. Do I need to configure something here. 2. Also my stream processing creates an internal changelog topic which is backed by rocksDB state store. - So should I have to partition that topic too in same number of partitions as my source topic. (Here I am creating that change log topic manually) 3. If I don't create that change log topic manually and let kafka stream create that automatically, then does what number of partitions it uses. 4. Suppose my change log topic has single partition (if that is allowed) and now we will have multiple threads accessing that. Is there any deadlock situation I need to worry about. 5. Also now multiple threads will access the same state store and attempt to recreate from change log topic if there is a need. How does this work. 6. Lastly say one instance fails then other instances will try to balance off the load, now when i bring that instance up, how does partition get re assigned to it? Like at what point does some old thread stops processing that partition and new thread of new instance takes over. Is there any configuration needed gere? Thanks Sachin On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Sachin, > > What you have suggested will never happen. If there is only 1 partition > there will only ever be one consumer of that partition. So if you had 2 > instances of your streams application, and only a single input partition, > only 1 instance would be processing the data. > If you are running like this, then you might want to set > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the > State Store that is generated by the aggregation is kept up to date on the > instance that is not processing the data. So in the event that the active > instance fails, the standby instance should be able to continue without too > much of a gap in processing time. > > Thanks, > Damian > > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi, > > I followed the document and I have few questions. > > Say I have a single partition input key topic and say I run 2 streams > > application from machine1 and machine2. > > Both the application have same application id are have identical code. > > Say topic1 has messages like > > (k1, v11) > > (k1, v12) > > (k1, v13) > > (k2, v21) > > (k2, v22) > > (k2, v23) > > When I was running single application I was getting results like > > (k1, agg(v11, v12, v13)) > > (k2, agg(v21, v22, v23)) > > > > Now when 2 applications are run and say messages are read in round robin > > fashion. > > v11 v13 v22 - machine 1 > > v12 v21 v23 - machine 2 > > > > The aggregation at machine 1 would be > > (k1, agg(v11, v13)) > > (k2, agg(v22)) > > > > The aggregation at machine 2 would be > > (k1, agg(v12)) > > (k2, agg(v21, v23)) > > > > So now where do I join the independent results of these 2 aggregation to > > get the final result as expected when single instance was running. > > > > Note my high level dsl is sometime like > > srcSTopic.aggragate(...).foreach(key, aggregation) { > > //process aggragated value and push it to some external storage > > } > > > > So I want this each to be running against the final set of aggregated > > value. Do I need to add another step before foreach to make sure the > > different results from 2 machines are joined to get the final one as > > expected. If yes what does that step 2. > > > > Thanks > > Sachin > > > > > > > > > > > > > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak < > > mathieu.fenn...@replicon.com> wrote: > > > > > Hi Sachin, > > > > > > Some quick answers, and a link to some documentation to read more: > > > > > > - If you restart the application, it will start from the point it > crashed > > > (possibly reprocessing a small window of records). > > > > > > - You can run more than one instance of the application. They'll > > > coordinate by virtue of being part of a Kafka consumer group; if one > > > crashes, the partitions that it was reading from will be picked up by > > other > > > instances. > > > > > > - When running more than one instance, the tasks will be distributed > > > between the instances. > > > > > > Confluent's docs on the Kafka Streams architecture goes into a lot more > > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html > > > > > > > > > > > > > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sjmit...@gmail.com> > > wrote: > > > > > > > Hi All, > > > > We were able to run a stream processing application against a fairly > > > decent > > > > load of messages in production environment. > > > > > > > > To make the system robust say the stream processing application > > crashes, > > > is > > > > there a way to make it auto start from the point when it crashed? > > > > > > > > Also is there any concept like running the same application in a > > cluster, > > > > where one fails, other takes over, until we bring back up the failed > > node > > > > of streams application. > > > > > > > > If yes, is there any guidelines or some knowledge base we can look at > > to > > > > understand how this would work. > > > > > > > > Is there way like in spark, where the driver program distributes the > > > tasks > > > > across various nodes in a cluster, is there something similar in > kafka > > > > streaming too. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > >