In the specific example stated, the user had two taskset if I
understood right ... the first taskset reads off db (dfs in your
example), and does some filter, etc and caches it.
Second which works off the cached data (which is, now, process local
locality level aware) to do map, group, etc.

The taskset(s) which work off the cached data would be sensitive to
PROCESS_LOCAL locality level.
But for the initial taskset (which loaded off hdfs/database, etc) no
tasks can be process local - since we do not have a way to specify
that in spark (which, imo, is a limitation).

Given this, the requirement seemed to be to relax locality level for
initial load taskset - since not scheduling on rack local or other
nodes seems to be hurting utilization and latency when no node local
executors are available.
But for tasksets which have process local tasks, user wants to ensure
that node/rack local schedule does not happen (based on the timeouts
and perf numbers).

Hence my suggestion on setting the individual locality level timeouts
- ofcourse, my suggestion was highly specific to the problem as stated
:-)
It is, by no means, a generalization - and I do agree we definitely do
need to address the larger scheduling issue.

Regards,
Mridul



On Fri, Nov 14, 2014 at 2:05 AM, Kay Ousterhout <k...@eecs.berkeley.edu> wrote:
> Hi Mridul,
>
> In the case Shivaram and I saw, and based on my understanding of Ma chong's
> description, I don't think that completely fixes the problem.
>
> To be very concrete, suppose your job has two tasks, t1 and t2, and they
> each have input data (in HDFS) on h1 and h2, respectively, and that h1 and
> h2 are on the same rack. Suppose your Spark job gets allocated two
> executors, one on h1 and another on h3 (a different host with no input
> data).  When the job gets submitted to the task set manager (TSM),
> TSM.computeValidLocalityLevels will determine that the valid levels are
> NODE_LOCAL (because t1 could be run on the NODE_LOCAL executor on h1),
> RACK_LOCAL, ANY.  As a result, the TSM will not schedule t2 until
> spark.locality.wait.NODE_LOCAL expires, even though t2 has no hope of being
> scheduled on a NODE_LOCAL machine (because the job wasn't given any
> executors on h2).  You could set spark.locality.wait.NODE_LOCAL to be low,
> but then it might cause t1 (or more generally, in a larger job, other tasks
> that have NODE_LOCAL executors where they can be scheduled) to get scheduled
> on h3 (and not on h1).
>
> Is there a way you were thinking of configuring things that avoids this
> problem?
>
> I'm pretty sure we could fix this problem by tracking more information about
> each task in the TSM -- for example, the TSM has enough information to know
> that there are no NODE_LOCAL executors where t2 could be scheduled in the
> above example (and that the best possible locality level for t2 is
> RACK_LOCAL), and could schedule t2 right away on a RACK_LOCAL machine.  Of
> course, this would add a bunch of complexity to the TSM, hence the earlier
> decision that the added complexity may not be worth it.
>
> -Kay
>
> On Thu, Nov 13, 2014 at 12:11 PM, Mridul Muralidharan <mri...@gmail.com>
> wrote:
>>
>> Instead of setting spark.locality.wait, try setting individual
>> locality waits specifically.
>>
>> Namely, spark.locality.wait.PROCESS_LOCAL to high value (so that
>> process local tasks are always scheduled in case the task set has
>> process local tasks).
>> Set spark.locality.wait.NODE_LOCAL and spark.locality.wait.RACK_LOCAL
>> to low value - so that in case task set has no process local tasks,
>> both node local and rack local tasks are scheduled asap.
>>
>> From your description, this will alleviate the problem you mentioned.
>>
>>
>> Kay's comment, IMO, is slightly general in nature - and I suspect
>> unless we overhaul how preferred locality is specified, and allow for
>> taskset specific hints for schedule, we cant resolve that IMO.
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>> On Thu, Nov 13, 2014 at 1:25 PM, MaChong <machon...@sina.com> wrote:
>> > Hi,
>> >
>> > We are running a time sensitive application with 70 partition and 800MB
>> > each parition size. The application first load data from database in
>> > different cluster, then apply a filter, cache the filted data, then apply a
>> > map and a reduce, finally collect results.
>> > The application will be finished in 20 seconds if we set
>> > spark.locality.wait to a large value (30 minutes). And it will use 100
>> > seconds, if we set spark.locality.wait a small value(less than 10 seconds)
>> > We have analysed the driver log and found lot of NODE_LOCAL and
>> > RACK_LOCAL level tasks, normally a PROCESS_LOCAL task only takes 15 
>> > seconds,
>> > but NODE_LOCAL or RACK_LOCAL tasks will take 70 seconds.
>> >
>> > So I think we'd better set spark.locality.wait to a large value(30
>> > minutes), until we meet this problem:
>> >
>> > Now our application will load data from hdfs in the same spark cluster,
>> > it will get NODE_LOCAL and RACK_LOCAL level tasks during loading stage, if
>> > the tasks in loading stage have same locality level, ether NODE_LOCAL or
>> > RACK_LOCAL it works fine.
>> > But if the tasks in loading stage get mixed locality level, such as 3
>> > NODE_LOCAL tasks, and 2 RACK_LOCAL tasks, then the TaskSetManager of 
>> > loading
>> > stage will submit the 3 NODE_LOCAL tasks as soon as resources were offered,
>> > then wait for spark.locality.wait.node, which was setted to 30 minutes, the
>> > 2 RACK_LOCAL tasks will wait 30 minutes even though resources are 
>> > avaliable.
>> >
>> >
>> > Does any one have met this problem? Do you have a nice solution?
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> >
>> > Ma chong
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org


Reply via email to