Ok, so I did this:
val kInStreams = (1 to 10).map{_ =>
KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
-> 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a "Stream 0". Now I
have "Streams [0-9]". Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If "Streams" are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith <secs...@gmail.com> wrote:

> I create my DStream very simply as:
> val kInMsg =
> KafkaUtils.createStream(ssc,"zkhost1:2181/zk_kafka","testApp",Map("rawunstruct"
> -> 8))
> .
> .
> eventually, before I operate on the DStream, I repartition it:
> kInMsg.repartition(512)
>
> Are you saying that ^^ repartition doesn't split by dstream into multiple
> smaller streams? Should I manually create multiple Dstreams like this?:
> val kInputs = (1 to 10).map {_=> KafkaUtils.createStream(........)}
>
> Then I apply some custom logic to it as:
> val outdata = kInMsg.map(x=>normalizeLog(x._2,configMap)) //where
> normalizeLog takes a String and Map of regex and returns a string
>
> In my case, I think I have traced the issue to the receiver executor being
> killed by Yarn:
> 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
> node-dn1-4-acme.com: remote Akka client disassociated
>
> This be the root cause?
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
> https://issues.apache.org/jira/browse/SPARK-2121
>
>
>
>
>
> On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> Are you using multiple Dstreams? repartitioning does not affect how
>> many receivers you have. It's on 2 nodes for each receiver. You need
>> multiple partitions in the queue, each consumed by a DStream, if you
>> mean to parallelize consuming the queue.
>>
>> On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith <secs...@gmail.com> wrote:
>> > Good to see I am not the only one who cannot get incoming Dstreams to
>> > repartition. I tried repartition(512) but still no luck - the app
>> > stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
>> > release notes for 1.0.1 and 1.0.2, I don't see anything that says this
>> > was an issue and has been fixed.
>> >
>> > How do I debug the repartition() statement to see what's the flow
>> > after the job hits that statement?
>> >
>> >
>> > On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat <bvenkat.sp...@gmail.com>
>> wrote:
>> >> Chris,
>> >>
>> >> I did the Dstream.repartition mentioned in the document on parallelism
>> in
>> >> receiving, as well as set "spark.default.parallelism" and it still
>> uses only
>> >> 2 nodes in my cluster.  I notice there is another email thread on the
>> same
>> >> topic:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
>> >>
>> >> My code is in Java and here is what I have:
>> >>
>> >>        JavaPairReceiverInputDStream<String, String> messages =
>> >>
>> >>                 KafkaUtils.createStream(ssc, zkQuorum,
>> >> "cse-job-play-consumer", kafkaTopicMap);
>> >>
>> >>         JavaPairDStream<String, String> newMessages =
>> >> messages.repartition(partitionSize);// partitionSize=30
>> >>
>> >>         JavaDStream<String> lines = newMessages.map(new
>> >> Function<Tuple2&lt;String, String>, String>() {
>> >>             ...
>> >>
>> >>             public String call(Tuple2<String, String> tuple2) {
>> >>               return tuple2._2();
>> >>             }
>> >>           });
>> >>
>> >>         JavaDStream<String> words = lines.flatMap(new
>> >> MetricsComputeFunction()
>> >>                         );
>> >>
>> >>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>> >>             new PairFunction<String, String, Integer>() {
>> >>                ...
>> >>             }
>> >>         );
>> >>
>> >>          wordCounts.foreachRDD(new Function<JavaPairRDD&lt;String,
>> Integer>,
>> >> Void>() {...});
>> >>
>> >> Thanks,
>> >> Bharat
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>

Reply via email to