Re: data locality, task distribution

2014-11-13 Thread Nathan Kronenfeld
I am seeing skewed execution times.  As far as I can tell, they are
attributable to differences in data locality - tasks with locality
PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.

This seems entirely as it should be - the question is, why the different
locality levels?

I am seeing skewed caching, as I mentioned before - in the case I isolated,
with 4 nodes, they were distributed at about 42%, 31%, 20%, and 6%.
However, the total amount was significantly less than the memory of any
single node, so I don't think they could have overpopulated their cache.  I
am occasionally seeing task failures, but the re-execute themselves, and
work fine the next time.  Yet I'm still seeing incomplete caching (from 65%
cached up to 100%, depending on the run).

I shouldn't have much variance in task time - this is simply a foreach over
the data, adding to an accumulator, and the data is completely randomly
distributed, so should be pretty even overall.

I am seeing GC regressions occasionally - they slow a request from about 2
seconds to about 5 seconds.  They 8 minute slowdown seems to be solely
attributable to the data locality issue, as far as I can tell.  There was
some further confusion though in the times I mentioned - the list I gave
(3.1 min, 2 seconds, ... 8 min) were not different runs with different
cache %s, they were iterations within a single run with 100% caching.

   -Nathan



On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson ilike...@gmail.com wrote:

 Spark's scheduling is pretty simple: it will allocate tasks to open cores
 on executors, preferring ones where the data is local. It even performs
 delay scheduling, which means waiting a bit to see if an executor where
 the data resides locally becomes available.

 Are yours tasks seeing very skewed execution times? If some tasks are
 taking a very long time and using all the resources on a node, perhaps the
 other nodes are quickly finishing many tasks, and actually overpopulating
 their caches. If a particular machine were not overpopulating its cache,
 and there are no failures, then you should see 100% cached after the first
 run.

 It's also strange that running totally uncached takes 3.1 minutes, but
 running 80-90% cached may take 8 minutes. Does your workload produce
 nondeterministic variance in task times? Was it a single straggler, or many
 tasks, that was keeping the job from finishing? It's not too uncommon to
 see occasional performance regressions while caching due to GC, though 2
 seconds to 8 minutes is a bit extreme.

 On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Sorry, I think I was not clear in what I meant.
 I didn't mean it went down within a run, with the same instance.

 I meant I'd run the whole app, and one time, it would cache 100%, and the
 next run, it might cache only 83%

 Within a run, it doesn't change.

 On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached
 data, accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory as I need overall, so it isn't running
 out.  But one time, 100% of the data will be cached; the next, 83%, the
 next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400
 partitions, and 4 workers (with, I believe, 3x replication), and on my last
 run, my distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that
 time.  Later times, when it manages to work in 2 seconds, it's because all
 the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be 

Re: data locality, task distribution

2014-11-13 Thread Aaron Davidson
You mentioned that the 3.1 min run was the one that did the actual caching,
so did that run before any data was cached, or after?

I would recommend checking the Storage tab of the UI, and clicking on the
RDD, to see both how full the executors' storage memory is (which may be
significantly less than the instance's memory). When a task completes over
data that should be cached, it will try to cache it, so it's pretty weird
that you're seeing 100% cache with memory to spare. It's possible that
some partitions are significantly larger than others, which may cause us to
not attempt to cache it (defined by spark.storage.unrollFraction).

You can also try increasing the spark.locality.wait flag to ensure that
Spark will wait longer for tasks to complete before running them
non-locally. One possible situation is that a node hits GC for a few
seconds, a task is scheduled non-locally, and then attempting to read from
the other executors' cache is much more expensive than computing the data
initially. Increasing the locality wait beyond the GC time would avoid this.

On Thu, Nov 13, 2014 at 9:08 AM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 I am seeing skewed execution times.  As far as I can tell, they are
 attributable to differences in data locality - tasks with locality
 PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.

 This seems entirely as it should be - the question is, why the different
 locality levels?

 I am seeing skewed caching, as I mentioned before - in the case I
 isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and
 6%.  However, the total amount was significantly less than the memory of
 any single node, so I don't think they could have overpopulated their
 cache.  I am occasionally seeing task failures, but the re-execute
 themselves, and work fine the next time.  Yet I'm still seeing incomplete
 caching (from 65% cached up to 100%, depending on the run).

 I shouldn't have much variance in task time - this is simply a foreach
 over the data, adding to an accumulator, and the data is completely
 randomly distributed, so should be pretty even overall.

 I am seeing GC regressions occasionally - they slow a request from about 2
 seconds to about 5 seconds.  They 8 minute slowdown seems to be solely
 attributable to the data locality issue, as far as I can tell.  There was
 some further confusion though in the times I mentioned - the list I gave
 (3.1 min, 2 seconds, ... 8 min) were not different runs with different
 cache %s, they were iterations within a single run with 100% caching.

-Nathan



 On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 Spark's scheduling is pretty simple: it will allocate tasks to open cores
 on executors, preferring ones where the data is local. It even performs
 delay scheduling, which means waiting a bit to see if an executor where
 the data resides locally becomes available.

 Are yours tasks seeing very skewed execution times? If some tasks are
 taking a very long time and using all the resources on a node, perhaps the
 other nodes are quickly finishing many tasks, and actually overpopulating
 their caches. If a particular machine were not overpopulating its cache,
 and there are no failures, then you should see 100% cached after the first
 run.

 It's also strange that running totally uncached takes 3.1 minutes, but
 running 80-90% cached may take 8 minutes. Does your workload produce
 nondeterministic variance in task times? Was it a single straggler, or many
 tasks, that was keeping the job from finishing? It's not too uncommon to
 see occasional performance regressions while caching due to GC, though 2
 seconds to 8 minutes is a bit extreme.

 On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Sorry, I think I was not clear in what I meant.
 I didn't mean it went down within a run, with the same instance.

 I meant I'd run the whole app, and one time, it would cache 100%, and
 the next run, it might cache only 83%

 Within a run, it doesn't change.

 On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to
 send what task, how it distributes them, and how it determines data
 locality?

 I'm trying a pretty simple task - it's doing a foreach over cached
 data, accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory 

Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
The fact that the caching percentage went down is highly suspicious. It
should generally not decrease unless other cached data took its place, or
if unless executors were dying. Do you know if either of these were the
case?

On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached data,
 accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e., reloading,
 recaching, etc), I will get different %'s cached each time.  I've got about
 5x as much memory as I need overall, so it isn't running out.  But one
 time, 100% of the data will be cached; the next, 83%, the next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400 partitions,
 and 4 workers (with, I believe, 3x replication), and on my last run, my
 distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that time.
 Later times, when it manages to work in 2 seconds, it's because all the
 tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be much appreciated.


 Thanks,

  -Nathan



 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



Re: data locality, task distribution

2014-11-12 Thread Nathan Kronenfeld
Sorry, I think I was not clear in what I meant.
I didn't mean it went down within a run, with the same instance.

I meant I'd run the whole app, and one time, it would cache 100%, and the
next run, it might cache only 83%

Within a run, it doesn't change.

On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached data,
 accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory as I need overall, so it isn't running
 out.  But one time, 100% of the data will be cached; the next, 83%, the
 next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400 partitions,
 and 4 workers (with, I believe, 3x replication), and on my last run, my
 distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that time.
 Later times, when it manages to work in 2 seconds, it's because all the
 tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be much appreciated.


 Thanks,

  -Nathan



 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com





-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: data locality, task distribution

2014-11-12 Thread Aaron Davidson
Spark's scheduling is pretty simple: it will allocate tasks to open cores
on executors, preferring ones where the data is local. It even performs
delay scheduling, which means waiting a bit to see if an executor where
the data resides locally becomes available.

Are yours tasks seeing very skewed execution times? If some tasks are
taking a very long time and using all the resources on a node, perhaps the
other nodes are quickly finishing many tasks, and actually overpopulating
their caches. If a particular machine were not overpopulating its cache,
and there are no failures, then you should see 100% cached after the first
run.

It's also strange that running totally uncached takes 3.1 minutes, but
running 80-90% cached may take 8 minutes. Does your workload produce
nondeterministic variance in task times? Was it a single straggler, or many
tasks, that was keeping the job from finishing? It's not too uncommon to
see occasional performance regressions while caching due to GC, though 2
seconds to 8 minutes is a bit extreme.

On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld 
nkronenf...@oculusinfo.com wrote:

 Sorry, I think I was not clear in what I meant.
 I didn't mean it went down within a run, with the same instance.

 I meant I'd run the whole app, and one time, it would cache 100%, and the
 next run, it might cache only 83%

 Within a run, it doesn't change.

 On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 The fact that the caching percentage went down is highly suspicious. It
 should generally not decrease unless other cached data took its place, or
 if unless executors were dying. Do you know if either of these were the
 case?

 On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld 
 nkronenf...@oculusinfo.com wrote:

 Can anyone point me to a good primer on how spark decides where to send
 what task, how it distributes them, and how it determines data locality?

 I'm trying a pretty simple task - it's doing a foreach over cached data,
 accumulating some (relatively complex) values.

 So I see several inconsistencies I don't understand:

 (1) If I run it a couple times, as separate applications (i.e.,
 reloading, recaching, etc), I will get different %'s cached each time.
 I've got about 5x as much memory as I need overall, so it isn't running
 out.  But one time, 100% of the data will be cached; the next, 83%, the
 next, 92%, etc.

 (2) Also, the data is very unevenly distributed. I've got 400
 partitions, and 4 workers (with, I believe, 3x replication), and on my last
 run, my distribution was 165/139/25/71.  Is there any way to get spark to
 distribute the tasks more evenly?

 (3) If I run the problem several times in the same execution (to take
 advantage of caching etc.), I get very inconsistent results.  My latest
 try, I get:

- 1st run: 3.1 min
- 2nd run: 2 seconds
- 3rd run: 8 minutes
- 4th run: 2 seconds
- 5th run: 2 seconds
- 6th run: 6.9 minutes
- 7th run: 2 seconds
- 8th run: 2 seconds
- 9th run: 3.9 minuts
- 10th run: 8 seconds

 I understand the difference for the first run; it was caching that
 time.  Later times, when it manages to work in 2 seconds, it's because all
 the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
 tasks end up with locality level ANY.  Why would that change when running
 the exact same task twice in a row on cached data?

 Any help or pointers that I could get would be much appreciated.


 Thanks,

  -Nathan



 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com





 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com



data locality, task distribution

2014-11-11 Thread Nathan Kronenfeld
Can anyone point me to a good primer on how spark decides where to send
what task, how it distributes them, and how it determines data locality?

I'm trying a pretty simple task - it's doing a foreach over cached data,
accumulating some (relatively complex) values.

So I see several inconsistencies I don't understand:

(1) If I run it a couple times, as separate applications (i.e., reloading,
recaching, etc), I will get different %'s cached each time.  I've got about
5x as much memory as I need overall, so it isn't running out.  But one
time, 100% of the data will be cached; the next, 83%, the next, 92%, etc.

(2) Also, the data is very unevenly distributed. I've got 400 partitions,
and 4 workers (with, I believe, 3x replication), and on my last run, my
distribution was 165/139/25/71.  Is there any way to get spark to
distribute the tasks more evenly?

(3) If I run the problem several times in the same execution (to take
advantage of caching etc.), I get very inconsistent results.  My latest
try, I get:

   - 1st run: 3.1 min
   - 2nd run: 2 seconds
   - 3rd run: 8 minutes
   - 4th run: 2 seconds
   - 5th run: 2 seconds
   - 6th run: 6.9 minutes
   - 7th run: 2 seconds
   - 8th run: 2 seconds
   - 9th run: 3.9 minuts
   - 10th run: 8 seconds

I understand the difference for the first run; it was caching that time.
Later times, when it manages to work in 2 seconds, it's because all the
tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the
tasks end up with locality level ANY.  Why would that change when running
the exact same task twice in a row on cached data?

Any help or pointers that I could get would be much appreciated.


Thanks,

 -Nathan



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com