I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_=> KafkaUtils.createStream(........)}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen <so...@cloudera.com> wrote:

> Are you using multiple Dstreams? repartitioning does not affect how
> many receivers you have. It's on 2 nodes for each receiver. You need
> multiple partitions in the queue, each consumed by a DStream, if you
> mean to parallelize consuming the queue.
>
> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith <secs...@gmail.com> wrote:
> > Good to see I am not the only one who cannot get incoming Dstreams to
> > repartition. I tried repartition(512) but still no luck - the app
> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
> > was an issue and has been fixed.
> >
> > How do I debug the repartition() statement to see what's the flow
> > after the job hits that statement?
> >
> >
> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat <bvenkat.sp...@gmail.com>
> wrote:
> >> Chris,
> >>
> >> I did the Dstream.repartition mentioned in the document on parallelism
> in
> >> receiving, as well as set "spark.default.parallelism" and it still uses
> only
> >> 2 nodes in my cluster.  I notice there is another email thread on the
> same
> >> topic:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
> >>
> >> My code is in Java and here is what I have:
> >>
> >>        JavaPairReceiverInputDStream<String, String> messages =
> >>
> >>                 KafkaUtils.createStream(ssc, zkQuorum,
> >> "cse-job-play-consumer", kafkaTopicMap);
> >>
> >>         JavaPairDStream<String, String> newMessages =
> >> messages.repartition(partitionSize);// partitionSize=30
> >>
> >>         JavaDStream<String> lines = newMessages.map(new
> >> Function<Tuple2&lt;String, String>, String>() {
> >>             ...
> >>
> >>             public String call(Tuple2<String, String> tuple2) {
> >>               return tuple2._2();
> >>             }
> >>           });
> >>
> >>         JavaDStream<String> words = lines.flatMap(new
> >> MetricsComputeFunction()
> >>                         );
> >>
> >>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> >>             new PairFunction<String, String, Integer>() {
> >>                ...
> >>             }
> >>         );
> >>
> >>          wordCounts.foreachRDD(new Function<JavaPairRDD&lt;String,
> Integer>,
> >> Void>() {...});
> >>
> >> Thanks,
> >> Bharat
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>

Reply via email to