- You are launching up to 10 threads/topic per Receiver. Are you sure your receivers can support 10 threads each ? (i.e. in the default configuration, do they have 10 cores). If they have 2 cores, that would explain why this works with 20 partitions or less.
- If you have 90 partitions, why start 10 Streams, each consuming 10 partitions, and then removing the stream at index 0 ? Why not simply start 10 streams with 9 partitions ? Or, more simply, val kafkaStreams = (1 to numPartitions).map { _ => KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic -> 1), StorageLevel.MEMORY_ONLY_SER) - You’re consuming up to 10 local threads *per topic*, on each of your 10 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on a single machine. You mentioned having a single Kafka topic with 90 partitions. Why not have a single-element topicMap ? — FG On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha <me.mukesh....@gmail.com> wrote: > I understand that I've to create 10 parallel streams. My code is running > fine when the no of partitions is ~20, but when I increase the no of > partitions I keep getting in this issue. > Below is my code to create kafka streams, along with the configs used. > Map<String, String> kafkaConf = new HashMap<String, String>(); > kafkaConf.put("zookeeper.connect", kafkaZkQuorum); > kafkaConf.put("group.id", kafkaConsumerGroup); > kafkaConf.put("consumer.timeout.ms", "30000"); > kafkaConf.put("auto.offset.reset", "largest"); > kafkaConf.put("fetch.message.max.bytes", "20000000"); > kafkaConf.put("zookeeper.session.timeout.ms", "6000"); > kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); > kafkaConf.put("zookeeper.sync.time.ms", "2000"); > kafkaConf.put("rebalance.backoff.ms", "10000"); > kafkaConf.put("rebalance.max.retries", "20"); > String[] topics = kafkaTopicsList; > int numStreams = numKafkaThreads; // this is *10* > Map<String, Integer> topicMap = new HashMap<>(); > for (String topic: topics) { > topicMap.put(topic, numStreams); > } > List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new > ArrayList<>(numStreams); > for (int i = 0; i < numStreams; i++) { > kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, > byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, > topicMap, StorageLevel.MEMORY_ONLY_SER())); > } > JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0), > kafkaStreams); > On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <gerard.m...@gmail.com> wrote: >> Hi, >> >> Could you add the code where you create the Kafka consumer? >> >> -kr, Gerard. >> >> On Wed, Jan 7, 2015 at 3:43 PM, <francois.garil...@typesafe.com> wrote: >> >>> Hi Mukesh, >>> >>> If my understanding is correct, each Stream only has a single Receiver. >>> So, if you have each receiver consuming 9 partitions, you need 10 input >>> DStreams to create 10 concurrent receivers: >>> >>> >>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving >>> >>> Would you mind sharing a bit more on how you achieve this ? >>> >>> -- >>> FG >>> >>> >>> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh....@gmail.com> >>> wrote: >>> >>>> Hi Guys, >>>> >>>> I have a kafka topic having 90 partitions and I running >>>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 >>>> kafka-receivers. >>>> >>>> My streaming is running fine and there is no delay in processing, just >>>> that some partitions data is never getting picked up. From the kafka >>>> console I can see that each receiver is consuming data from 9 partitions >>>> but the lag for some offsets keeps on increasing. >>>> >>>> Below is my kafka-consumers parameters. >>>> >>>> Any of you have face this kind of issue, if so then do you have any >>>> pointers to fix it? >>>> >>>> Map<String, String> kafkaConf = new HashMap<String, String>(); >>>> kafkaConf.put("zookeeper.connect", kafkaZkQuorum); >>>> kafkaConf.put("group.id", kafkaConsumerGroup); >>>> kafkaConf.put("consumer.timeout.ms", "30000"); >>>> kafkaConf.put("auto.offset.reset", "largest"); >>>> kafkaConf.put("fetch.message.max.bytes", "20000000"); >>>> kafkaConf.put("zookeeper.session.timeout.ms", "6000"); >>>> kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); >>>> kafkaConf.put("zookeeper.sync.time.ms", "2000"); >>>> kafkaConf.put("rebalance.backoff.ms", "10000"); >>>> kafkaConf.put("rebalance.max.retries", "20"); >>>> >>>> -- >>>> Thanks & Regards, >>>> >>>> Mukesh Jha <me.mukesh....@gmail.com> >>>> >>> >>> >> > -- > Thanks & Regards, > *Mukesh Jha <me.mukesh....@gmail.com>*