I do also have this problem. The total time for launching receivers seems
related to the total number of executors. In my case, when I run 400
executors with 200 receivers, it takes about a minute for all receivers
become active, but with 800 executors, it takes 3 minutes to activate all
receivers.

I am running on YARN on EMR 4.7.2, with Spark 1.6.2.

2015-10-21 12:15 GMT-07:00 Budde, Adam <bu...@amazon.com>:

> Hi all,
>
> My team uses Spark Streaming to implement the batch processing component
> of a lambda architecture with 5 min intervals. We process roughly 15 TB/day
> using three discrete Spark clusters and about 250 receivers per cluster.
> We've been having some issues migrating our platform from Spark 1.4.x to
> Spark 1.5.x.
>
> The first issue we've been having relates to receiver scheduling. Under
> Spark 1.4.x, each receiver becomes active almost immediately and the
> application quickly reaches its peak input throughput. Under the new
> receiver scheduling mechanism introduced in Spark 1.5.x (SPARK-8882
> <https://issues.apache.org/jira/browse/SPARK-8882>) we see that it takes
> quite a while for our receivers to become active. I haven't spent too much
> time gathering hard numbers on this, but my estimate would be that it takes
> over half an hour for half the receivers to become active and well over an
> hour for all of them to become active.
>
> I spent some time digging into the code for the *ReceiverTracker*,
> *ReceiverSchedulingPolicy*, and *ReceiverSupervisor* classes and
> recompiling Spark with some added debug logging. As far as I can tell, this
> is what is happening:
>
>    - On program start, the ReceiverTracker RPC endpoint receives a
>    *StartAllReceivers* message via its own *launchReceivers()* method
>    (itself invoked by *start()*)
>    - The handler for StartAllReceivers invokes
>    *ReceiverSchedulingPolicy.scheduleReceivers()* to generate a desired
>    receiver to executor mapping and calls
>    *ReceiverTracker.startReceiver()* for each receiver
>    - *startReceiver()* uses the SparkContext to submit a job that creates
>    an instance of ReceiverSupervisorImpl to run the receiver on a random
>    executor
>    - While bootstrapping the receiver, the
>    *ReceiverSupervisorImpl.onReceiverStart()* sends a* RegisterReceiver*
>    message to the ReceiverTracker RPC endpoint
>    - The handler for RegisterReceiver checks if the randomly-selected
>    executor was the one the receiver was assigned to by
>    ReceiverSchedulingPolicy.scheduleReceivers() and fails the job if it
>    isn't
>    - ReceiverTracker restarts the failed receiver job and this process
>    continues until all receivers are assigned to their proper executor
>
> Assuming this order of operations is correct, I have the following
> questions:
>
>    1. Is there any way to coerce SparkContext.submitJob() into scheduling
>    a job on a specific executor? Put another way, is there a mechanism we can
>    use to ensure that each receiver job is run on the executor it was assigned
>    to on the first call to ReceiverSchedulingPolicy.scheduleReceivers()?
>    2. If (1) is not possible, is there anything we can do to speed up the
>    StartReceiver -> RegisterReceiver -> RestartReceiver loop? Right now, it
>    seems to take about 30-40 sec between attempts to invoke RegisterReceiver
>    on a given receiver.
>
> Thanks for the help!
>
> Adam
>

Reply via email to