AFAIK, there're two levels of parallelism related to the Spark Kafka consumer:
At JVM level: For each receiver, one can specify the number of threads for a given topic, provided as a map [topic -> nthreads]. This will effectively start n JVM threads consuming partitions of that kafka topic. At Cluster level: One can create several DStreams, and each will have one receiver and use 1 executor core in Spark each DStream will have its receiver as defined in the previous line. What you need to ensure is that there's a consumer attached to each partition of your kafka topic. That is, nthreads * nReceivers = #kafka_partitions(topic) e.g: Given nPartitions = #partitions of your topic nThreads = #of threads per receiver val kafkaStreams = (1 to nPartitions/nThreads).map{ i => KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> nThreads), StorageLevel.MEMORY_ONLY_SER) For this to work, you need at least (nPartitions/nThreads +1) cores in your Spark cluster, although I would recommend to have 2-3x (nPartitions/nThreads). (and don't forget to union the streams after creation) -kr, Gerard. On Wed, Jan 7, 2015 at 4:43 PM, <francois.garil...@typesafe.com> wrote: > - You are launching up to 10 threads/topic per Receiver. Are you sure your > receivers can support 10 threads each ? (i.e. in the default configuration, > do they have 10 cores). If they have 2 cores, that would explain why this > works with 20 partitions or less. > > - If you have 90 partitions, why start 10 Streams, each consuming 10 > partitions, and then removing the stream at index 0 ? Why not simply start > 10 streams with 9 partitions ? Or, more simply, > > val kafkaStreams = (1 to numPartitions).map { _ => > KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1), > StorageLevel.MEMORY_ONLY_SER) > > - You’re consuming up to 10 local threads *per topic*, on each of your 10 > receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located > on a single machine. You mentioned having a single Kafka topic with 90 > partitions. Why not have a single-element topicMap ? > > — > FG > > > On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me.mukesh....@gmail.com> > wrote: > >> I understand that I've to create 10 parallel streams. My code is >> running fine when the no of partitions is ~20, but when I increase the no >> of partitions I keep getting in this issue. >> >> Below is my code to create kafka streams, along with the configs used. >> >> Map<String, String> kafkaConf = new HashMap<String, String>(); >> kafkaConf.put("zookeeper.connect", kafkaZkQuorum); >> kafkaConf.put("group.id", kafkaConsumerGroup); >> kafkaConf.put("consumer.timeout.ms", "30000"); >> kafkaConf.put("auto.offset.reset", "largest"); >> kafkaConf.put("fetch.message.max.bytes", "20000000"); >> kafkaConf.put("zookeeper.session.timeout.ms", "6000"); >> kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); >> kafkaConf.put("zookeeper.sync.time.ms", "2000"); >> kafkaConf.put("rebalance.backoff.ms", "10000"); >> kafkaConf.put("rebalance.max.retries", "20"); >> String[] topics = kafkaTopicsList; >> int numStreams = numKafkaThreads; // this is *10* >> Map<String, Integer> topicMap = new HashMap<>(); >> for (String topic: topics) { >> topicMap.put(topic, numStreams); >> } >> >> List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new >> ArrayList<>(numStreams); >> for (int i = 0; i < numStreams; i++) { >> kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, >> byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, >> topicMap, StorageLevel.MEMORY_ONLY_SER())); >> } >> JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0), >> kafkaStreams); >> >> >> On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Could you add the code where you create the Kafka consumer? >>> >>> -kr, Gerard. >>> >>> On Wed, Jan 7, 2015 at 3:43 PM, <francois.garil...@typesafe.com> wrote: >>> >>>> Hi Mukesh, >>>> >>>> If my understanding is correct, each Stream only has a single Receiver. >>>> So, if you have each receiver consuming 9 partitions, you need 10 input >>>> DStreams to create 10 concurrent receivers: >>>> >>>> >>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving >>>> >>>> Would you mind sharing a bit more on how you achieve this ? >>>> >>>> — >>>> FG >>>> >>>> >>>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh....@gmail.com> >>>> wrote: >>>> >>>>> Hi Guys, >>>>> >>>>> I have a kafka topic having 90 partitions and I running >>>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 >>>>> kafka-receivers. >>>>> >>>>> My streaming is running fine and there is no delay in processing, just >>>>> that some partitions data is never getting picked up. From the kafka >>>>> console I can see that each receiver is consuming data from 9 partitions >>>>> but the lag for some offsets keeps on increasing. >>>>> >>>>> Below is my kafka-consumers parameters. >>>>> >>>>> Any of you have face this kind of issue, if so then do you have any >>>>> pointers to fix it? >>>>> >>>>> Map<String, String> kafkaConf = new HashMap<String, String>(); >>>>> kafkaConf.put("zookeeper.connect", kafkaZkQuorum); >>>>> kafkaConf.put("group.id", kafkaConsumerGroup); >>>>> kafkaConf.put("consumer.timeout.ms", "30000"); >>>>> kafkaConf.put("auto.offset.reset", "largest"); >>>>> kafkaConf.put("fetch.message.max.bytes", "20000000"); >>>>> kafkaConf.put("zookeeper.session.timeout.ms", "6000"); >>>>> kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); >>>>> kafkaConf.put("zookeeper.sync.time.ms", "2000"); >>>>> kafkaConf.put("rebalance.backoff.ms", "10000"); >>>>> kafkaConf.put("rebalance.max.retries", "20"); >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> >>>>> Mukesh Jha <me.mukesh....@gmail.com> >>>>> >>>> >>>> >>> >> >> >> -- >> >> >> Thanks & Regards, >> >> Mukesh Jha <me.mukesh....@gmail.com> >> > >