I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct" -> 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512)
Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_=> KafkaUtils.createStream(........)} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen <so...@cloudera.com> wrote: > Are you using multiple Dstreams? repartitioning does not affect how > many receivers you have. It's on 2 nodes for each receiver. You need > multiple partitions in the queue, each consumed by a DStream, if you > mean to parallelize consuming the queue. > > On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith <secs...@gmail.com> wrote: > > Good to see I am not the only one who cannot get incoming Dstreams to > > repartition. I tried repartition(512) but still no luck - the app > > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at > > release notes for 1.0.1 and 1.0.2, I don't see anything that says this > > was an issue and has been fixed. > > > > How do I debug the repartition() statement to see what's the flow > > after the job hits that statement? > > > > > > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat <bvenkat.sp...@gmail.com> > wrote: > >> Chris, > >> > >> I did the Dstream.repartition mentioned in the document on parallelism > in > >> receiving, as well as set "spark.default.parallelism" and it still uses > only > >> 2 nodes in my cluster. I notice there is another email thread on the > same > >> topic: > >> > >> > http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html > >> > >> My code is in Java and here is what I have: > >> > >> JavaPairReceiverInputDStream<String, String> messages = > >> > >> KafkaUtils.createStream(ssc, zkQuorum, > >> "cse-job-play-consumer", kafkaTopicMap); > >> > >> JavaPairDStream<String, String> newMessages = > >> messages.repartition(partitionSize);// partitionSize=30 > >> > >> JavaDStream<String> lines = newMessages.map(new > >> Function<Tuple2<String, String>, String>() { > >> ... > >> > >> public String call(Tuple2<String, String> tuple2) { > >> return tuple2._2(); > >> } > >> }); > >> > >> JavaDStream<String> words = lines.flatMap(new > >> MetricsComputeFunction() > >> ); > >> > >> JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > >> new PairFunction<String, String, Integer>() { > >> ... > >> } > >> ); > >> > >> wordCounts.foreachRDD(new Function<JavaPairRDD<String, > Integer>, > >> Void>() {...}); > >> > >> Thanks, > >> Bharat > >> > >> > >> > >> -- > >> View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html > >> Sent from the Apache Spark User List mailing list archive at Nabble.com. > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > >