Thanks TD and Jerry for suggestions. I have done some experiments and worked 
out a reasonable solution to the problem of spreading receivers to a set of 
worker hosts. It would be a bit too tedious to document in email. So I discuss 
the solution in a blog: 
http://scala4fun.tumblr.com/post/113172936582/how-to-distribute-receivers-over-worker-hosts-in
Please be free to give me feedback if you see any issue.
Thanks,Du 

     On Friday, March 6, 2015 4:10 PM, Tathagata Das <t...@databricks.com> 
wrote:
   

 Aaah, good point, about the same node. All right. Can you post this on the 
user mailing list for future reference to the community? Might be a good idea 
to post both methods with pros and cons, as different users may have different 
constraints. :)Thanks :)
TD
On Fri, Mar 6, 2015 at 4:07 PM, Du Li <l...@yahoo-inc.com> wrote:

Yes but the caveat may not exist if we do this when the streaming app is 
launched, since we're trying the start receivers before any other tasks. 
Discovered in this way will only include the worker hosts.
By using the API we may need some extra efforts to single out worker hosts. 
Sometimes we may run master and worker daemons on the same host while some 
other times we don't, depending on configuration.
Du 

     On Friday, March 6, 2015 3:59 PM, Tathagata Das <t...@databricks.com> 
wrote:
   

 That can definitely be done. In fact I have done that. Just one caveat. If one 
of the executors is fully occupied with a previous very-long job, then these 
fake tasks may not capture that worker even if there are lots of tasks. The 
executor storage status will work for sure, as long as you can filter out the 
master. 
TD
On Fri, Mar 6, 2015 at 3:49 PM, Du Li <l...@yahoo-inc.com> wrote:

Hi TD,
Thanks for your response and the information.
I just tried out the SparkContext getExecutorMemoryStatus and 
getExecutorStorageStatus methods. Due to their purposes, they do not 
differentiate master and worker nodes. However, for performance of my app, I 
prefer to distribute receivers only to the worker nodes.
This morning I worked out another solution: Create an accumulator and a fake 
workload, parallelize the workload with a high level of parallelism which does 
nothing but adds its hostname to the accumulator. Repeat this until the 
accumulator value stops growing. In the end I get the set of worker hostnames. 
It worked pretty well!
Thanks,Du 

     On Friday, March 6, 2015 3:11 PM, Tathagata Das <t...@databricks.com> 
wrote:
   

 What Saisai said is correct. There is no good API. However there are jacky 
ways of finding out the current workers. See 
sparkContext.getExecutorStorageStatus() and you can get the host names of the 
current executors. You could use those. 
TD
On Thu, Mar 5, 2015 at 6:55 AM, Du Li <l...@yahoo-inc.com> wrote:


| Hi TD. Do you have any suggestion? Thanks /Du


Sent from Yahoo Mail for iPhone
---- Begin Forwarded Message ----
From: Shao, Saisai<'saisai.s...@intel.com'>
Date: Mar 4, 2015, 10:35:44 PM
To: Du Li<'l...@yahoo-inc.com'>, User<'user@spark.apache.org'>
Subject: RE: distribution of receivers in spark streaming
Yes, hostname is enough.   I think currently it is hard for user code to get 
the worker list from standalone master. If you can get the Master object, you 
could get the worker list, but AFAIK may be it is difficult to get this object. 
All you could do is to manually get the worker list and assigned its hostname 
to each receiver.   Thanks Jerry   From: Du Li [mailto:l...@yahoo-inc.com]
Sent: Thursday, March 5, 2015 2:29 PM
To: Shao, Saisai; User
Subject: Re: distribution of receivers in spark streaming   Hi Jerry,   Thanks 
for your response.   Is there a way to get the list of currently 
registered/live workers? Even in order to provide preferredLocation, it would 
be safer to know which workers are active. Guess I only need to provide the 
hostname, right?   Thanks, Du   On Wednesday, March 4, 2015 10:08 PM, "Shao, 
Saisai" <saisai.s...@intel.com> wrote:   Hi Du,   You could try to sleep for 
several seconds after creating streaming context to let all the executors 
registered, then all the receivers can distribute to the nodes more evenly. 
Also setting locality is another way as you mentioned.   Thanks Jerry     From: 
Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming   Figured it out: I 
need to override method preferredLocation() in MyReceiver class.   On 
Wednesday, March 4, 2015 3:35 PM, Du Li <l...@yahoo-inc.com.INVALID> wrote:   
Hi,   I have a set of machines (say 5) and want to evenly launch a number (say 
8) of kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:  val streams = (1 to 
numReceivers).map(_ => ssc.receiverStream(new MyKafkaReceiver()))  
ssc.union(streams)   However, from the spark UI, I saw that some machines are 
not running any instance of the receiver while some get three. The mapping 
changed every time the system was restarted. This impacts the receiving and 
also the processing speeds.   I wonder if it's possible to control/suggest the 
distribution so that it would be more even. How is the decision made in spark?  
 Thanks, Du         
 |





    



    



   

Reply via email to