I have a similar experience. Using 32 machines, I can see than number of tasks (partitions) assigned to executors (machines) is not even. Moreover, the distribution change every stage (iteration).
I wonder why Spark needs to move partitions around any way, should not the scheduler reduce network (and other IO) overhead by reducing such relocation. Thanks, -Khaled On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com> wrote: > can you try: > spark.shuffle.reduceLocality.enabled=false > > On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote: > >> Dear all, >> >> Thank you for your responses. >> >> Michael Slavitch: >> > Just to be sure: Has spark-env.sh and spark-defaults.conf been >> correctly propagated to all nodes? Are they identical? >> Yes; these files are stored on a shared memory directory accessible to >> all nodes. >> >> Koert Kuipers: >> > we ran into similar issues and it seems related to the new memory >> > management. can you try: >> > spark.memory.useLegacyMode = true >> I reran the exact same code with a restarted cluster using this >> modification, and did not observe any difference. The partitioning is >> still imbalanced. >> >> Ted Yu: >> > If the changes can be ported over to 1.6.1, do you mind reproducing the >> issue there ? >> Since the spark.memory.useLegacyMode setting did not impact my code >> execution, I will have to change the Spark dependency back to earlier >> versions to see if the issue persists and get back to you. >> >> Meanwhile, if anyone else has any other ideas or experience, please let >> me know. >> >> Mike >> >> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote: >> > we ran into similar issues and it seems related to the new memory >> > management. can you try: >> > spark.memory.useLegacyMode = true >> > >> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote: >> > >> >> [ CC'ing dev list since nearly identical questions have occurred in >> >> user list recently w/o resolution; >> >> c.f.: >> >> >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html >> >> >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html >> >> ] >> >> >> >> Hello, >> >> >> >> In short, I'm reporting a problem concerning load imbalance of RDD >> >> partitions across a standalone cluster. Though there are 16 cores >> >> available per node, certain nodes will have >16 partitions, and some >> >> will correspondingly have <16 (and even 0). >> >> >> >> In more detail: I am running some scalability/performance tests for >> >> vector-type operations. The RDDs I'm considering are simple block >> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs >> >> are generated with a fixed number of elements given by some multiple >> >> of the available cores, and subsequently hash-partitioned by their >> >> integer block index. >> >> >> >> I have verified that the hash partitioning key distribution, as well >> >> as the keys themselves, are both correct; the problem is truly that >> >> the partitions are *not* evenly distributed across the nodes. >> >> >> >> For instance, here is a representative output for some stages and >> >> tasks in an iterative program. This is a very simple test with 2 >> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two >> >> examples stages from the stderr log are stages 7 and 9: >> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272 >> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639 >> >> >> >> When counting the location of the partitions on the compute nodes from >> >> the stderr logs, however, you can clearly see the imbalance. Examples >> >> lines are: >> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196, >> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)& >> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197, >> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)& >> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198, >> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)& >> >> >> >> Grep'ing the full set of above lines for each hostname, himrod-?, >> >> shows the problem occurs in each stage. Below is the output, where the >> >> number of partitions stored on each node is given alongside its >> >> hostname as in (himrod-?,num_partitions): >> >> Stage 7: (himrod-1,0) (himrod-2,64) >> >> Stage 9: (himrod-1,16) (himrod-2,48) >> >> Stage 12: (himrod-1,0) (himrod-2,64) >> >> Stage 14: (himrod-1,16) (himrod-2,48) >> >> The imbalance is also visible when the executor ID is used to count >> >> the partitions operated on by executors. >> >> >> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch >> >> (but the modifications do not touch the scheduler, and are irrelevant >> >> for these particular tests). Has something changed radically in 1.6+ >> >> that would make a previously (<=1.5) correct configuration go haywire? >> >> Have new configuration settings been added of which I'm unaware that >> >> could lead to this problem? >> >> >> >> Please let me know if others in the community have observed this, and >> >> thank you for your time, >> >> Mike >> >> >> >> --------------------------------------------------------------------- >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> >> >> > >> >> >> -- >> Thanks, >> Mike >> > > -- Thanks, -Khaled