Thanks TD and Jerry for suggestions. I have done some experiments and worked out a reasonable solution to the problem of spreading receivers to a set of worker hosts. It would be a bit too tedious to document in email. So I discuss the solution in a blog: http://scala4fun.tumblr.com/post/113172936582/how-to-distribute-receivers-over-worker-hosts-in Please be free to give me feedback if you see any issue. Thanks,Du
On Friday, March 6, 2015 4:10 PM, Tathagata Das <t...@databricks.com> wrote: Aaah, good point, about the same node. All right. Can you post this on the user mailing list for future reference to the community? Might be a good idea to post both methods with pros and cons, as different users may have different constraints. :)Thanks :) TD On Fri, Mar 6, 2015 at 4:07 PM, Du Li <l...@yahoo-inc.com> wrote: Yes but the caveat may not exist if we do this when the streaming app is launched, since we're trying the start receivers before any other tasks. Discovered in this way will only include the worker hosts. By using the API we may need some extra efforts to single out worker hosts. Sometimes we may run master and worker daemons on the same host while some other times we don't, depending on configuration. Du On Friday, March 6, 2015 3:59 PM, Tathagata Das <t...@databricks.com> wrote: That can definitely be done. In fact I have done that. Just one caveat. If one of the executors is fully occupied with a previous very-long job, then these fake tasks may not capture that worker even if there are lots of tasks. The executor storage status will work for sure, as long as you can filter out the master. TD On Fri, Mar 6, 2015 at 3:49 PM, Du Li <l...@yahoo-inc.com> wrote: Hi TD, Thanks for your response and the information. I just tried out the SparkContext getExecutorMemoryStatus and getExecutorStorageStatus methods. Due to their purposes, they do not differentiate master and worker nodes. However, for performance of my app, I prefer to distribute receivers only to the worker nodes. This morning I worked out another solution: Create an accumulator and a fake workload, parallelize the workload with a high level of parallelism which does nothing but adds its hostname to the accumulator. Repeat this until the accumulator value stops growing. In the end I get the set of worker hostnames. It worked pretty well! Thanks,Du On Friday, March 6, 2015 3:11 PM, Tathagata Das <t...@databricks.com> wrote: What Saisai said is correct. There is no good API. However there are jacky ways of finding out the current workers. See sparkContext.getExecutorStorageStatus() and you can get the host names of the current executors. You could use those. TD On Thu, Mar 5, 2015 at 6:55 AM, Du Li <l...@yahoo-inc.com> wrote: | Hi TD. Do you have any suggestion? Thanks /Du Sent from Yahoo Mail for iPhone ---- Begin Forwarded Message ---- From: Shao, Saisai<'saisai.s...@intel.com'> Date: Mar 4, 2015, 10:35:44 PM To: Du Li<'l...@yahoo-inc.com'>, User<'user@spark.apache.org'> Subject: RE: distribution of receivers in spark streaming Yes, hostname is enough. I think currently it is hard for user code to get the worker list from standalone master. If you can get the Master object, you could get the worker list, but AFAIK may be it is difficult to get this object. All you could do is to manually get the worker list and assigned its hostname to each receiver. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com] Sent: Thursday, March 5, 2015 2:29 PM To: Shao, Saisai; User Subject: Re: distribution of receivers in spark streaming Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks, Du On Wednesday, March 4, 2015 10:08 PM, "Shao, Saisai" <saisai.s...@intel.com> wrote: Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li <l...@yahoo-inc.com.INVALID> wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ => ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du |