I am seeing skewed execution times. As far as I can tell, they are attributable to differences in data locality - tasks with locality PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.
This seems entirely as it should be - the question is, why the different locality levels? I am seeing skewed caching, as I mentioned before - in the case I isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and 6%. However, the total amount was significantly less than the memory of any single node, so I don't think they could have overpopulated their cache. I am occasionally seeing task failures, but the re-execute themselves, and work fine the next time. Yet I'm still seeing incomplete caching (from 65% cached up to 100%, depending on the run). I shouldn't have much variance in task time - this is simply a foreach over the data, adding to an accumulator, and the data is completely randomly distributed, so should be pretty even overall. I am seeing GC regressions occasionally - they slow a request from about 2 seconds to about 5 seconds. They 8 minute slowdown seems to be solely attributable to the data locality issue, as far as I can tell. There was some further confusion though in the times I mentioned - the list I gave (3.1 min, 2 seconds, ... 8 min) were not different runs with different cache %s, they were iterations within a single run with 100% caching. -Nathan On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson <ilike...@gmail.com> wrote: > Spark's scheduling is pretty simple: it will allocate tasks to open cores > on executors, preferring ones where the data is local. It even performs > "delay scheduling", which means waiting a bit to see if an executor where > the data resides locally becomes available. > > Are yours tasks seeing very skewed execution times? If some tasks are > taking a very long time and using all the resources on a node, perhaps the > other nodes are quickly finishing many tasks, and actually overpopulating > their caches. If a particular machine were not overpopulating its cache, > and there are no failures, then you should see 100% cached after the first > run. > > It's also strange that running totally uncached takes 3.1 minutes, but > running 80-90% cached may take 8 minutes. Does your workload produce > nondeterministic variance in task times? Was it a single straggler, or many > tasks, that was keeping the job from finishing? It's not too uncommon to > see occasional performance regressions while caching due to GC, though 2 > seconds to 8 minutes is a bit extreme. > > On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld < > nkronenf...@oculusinfo.com> wrote: > >> Sorry, I think I was not clear in what I meant. >> I didn't mean it went down within a run, with the same instance. >> >> I meant I'd run the whole app, and one time, it would cache 100%, and the >> next run, it might cache only 83% >> >> Within a run, it doesn't change. >> >> On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson <ilike...@gmail.com> >> wrote: >> >>> The fact that the caching percentage went down is highly suspicious. It >>> should generally not decrease unless other cached data took its place, or >>> if unless executors were dying. Do you know if either of these were the >>> case? >>> >>> On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld < >>> nkronenf...@oculusinfo.com> wrote: >>> >>>> Can anyone point me to a good primer on how spark decides where to send >>>> what task, how it distributes them, and how it determines data locality? >>>> >>>> I'm trying a pretty simple task - it's doing a foreach over cached >>>> data, accumulating some (relatively complex) values. >>>> >>>> So I see several inconsistencies I don't understand: >>>> >>>> (1) If I run it a couple times, as separate applications (i.e., >>>> reloading, recaching, etc), I will get different %'s cached each time. >>>> I've got about 5x as much memory as I need overall, so it isn't running >>>> out. But one time, 100% of the data will be cached; the next, 83%, the >>>> next, 92%, etc. >>>> >>>> (2) Also, the data is very unevenly distributed. I've got 400 >>>> partitions, and 4 workers (with, I believe, 3x replication), and on my last >>>> run, my distribution was 165/139/25/71. Is there any way to get spark to >>>> distribute the tasks more evenly? >>>> >>>> (3) If I run the problem several times in the same execution (to take >>>> advantage of caching etc.), I get very inconsistent results. My latest >>>> try, I get: >>>> >>>> - 1st run: 3.1 min >>>> - 2nd run: 2 seconds >>>> - 3rd run: 8 minutes >>>> - 4th run: 2 seconds >>>> - 5th run: 2 seconds >>>> - 6th run: 6.9 minutes >>>> - 7th run: 2 seconds >>>> - 8th run: 2 seconds >>>> - 9th run: 3.9 minuts >>>> - 10th run: 8 seconds >>>> >>>> I understand the difference for the first run; it was caching that >>>> time. Later times, when it manages to work in 2 seconds, it's because all >>>> the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the >>>> tasks end up with locality level ANY. Why would that change when running >>>> the exact same task twice in a row on cached data? >>>> >>>> Any help or pointers that I could get would be much appreciated. >>>> >>>> >>>> Thanks, >>>> >>>> -Nathan >>>> >>>> >>>> >>>> -- >>>> Nathan Kronenfeld >>>> Senior Visualization Developer >>>> Oculus Info Inc >>>> 2 Berkeley Street, Suite 600, >>>> Toronto, Ontario M5A 4J5 >>>> Phone: +1-416-203-3003 x 238 >>>> Email: nkronenf...@oculusinfo.com >>>> >>> >>> >> >> >> -- >> Nathan Kronenfeld >> Senior Visualization Developer >> Oculus Info Inc >> 2 Berkeley Street, Suite 600, >> Toronto, Ontario M5A 4J5 >> Phone: +1-416-203-3003 x 238 >> Email: nkronenf...@oculusinfo.com >> > > -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com