The receivers are submitted as tasks. They are supposed to be assigned
to the executors in a round-robin manner by
TaskSchedulerImpl.resourceOffers(). However, sometimes not all the
executors are registered when the receivers are submitted. That's why
the receivers fill up the registered executors first and then the
others. 1.0.0 tries to handle this problem by running a dummy batch
job before submitting the receivers in
ReceiverTracker.startReceivers():

      // Run the dummy Spark job to ensure that all slaves have registered.
      // This avoids all the receivers to be scheduled on the same node.
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x,
1)).reduceByKey(_ + _, 20).collect()
      }

Apparently, sometimes it doesn't work. You can solve this problem by
running a similar dummy job at a much larger scale before you start
the streaming job, like:

ssc.sparkContext.makeRDD(1 to 10000, 10000).map(x => (x,
1)).reduceByKey(_ + _, 1000).collect()

On Sun, Jun 1, 2014 at 6:06 PM, Guang Gao <birdeey...@gmail.com> wrote:
> Dear All,
>
> I'm running Spark Streaming (1.0.0) with Yarn (2.2.0) on a 10-node cluster.
> I setup 10 custom receivers to hear from 10 data streams. I want one
> receiver per node in order to maximize the network bandwidth. However, if I
> set "--executor-cores 4", the 10 receivers only run on 3 of the nodes in the
> cluster, each running 4, 4, 2 receivers; if I set "--executor-cores 1", each
> node will run exactly one receiver, and it seems that Spark can't make any
> progress to process theses streams.
>
> I read the documentation on configuration and also googled but didn't find a
> clue. Is there a way to configure how the receivers are distributed?
>
> Thanks!
>
> Here are some details:
> ================================
> How I created 10 receivers:
>
>     val conf = new SparkConf().setAppName(jobId)
>     val sc = new StreamingContext(conf, Seconds(1))
>     var lines:DStream[String] =
>       sc.receiverStream(
>           new CustomReceiver(...)
>           )
>     for(i <- 1 to 9) {
>     lines = lines.union(
>        sc.receiverStream(
>          new CustomReceiver(...)
>       )
>     }
>
> How I submit a job to Yarn:
>
> spark-submit \
>     --class $JOB_CLASS \
>     --master yarn-client \
>     --num-executors 10 \
>     --driver-memory 1g \
>     --executor-memory 2g \
>     --executor-cores 4 \
>     $JAR_NAME
>

Reply via email to