[ https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16253554#comment-16253554 ]
Sean Owen commented on SPARK-19371: ----------------------------------- Yes, I get why you want to spread the partitions. It doesn't happen naturally because other factors can get in the way -- imbalanced hashing, or, locality. It won't bother to move data around preemptively as it's not obvious it's worth the effort; a task that scheduled on A because it was local to some data produced a cached partition on A and that's about the best guess about where work on that data should live. The idea behind setting locality to 0 is that it should mean tasks didn't 'clump' together on the few nodes with data in the first place, to produce cached partitions. It's a crude tool though because means setting this to 0 globally. It's more of a diagnostic, although, often, lowering it is a good idea. Usually the answer is to force a shuffle, and that thread ended above saying there wasn't a shuffle option, but Dataset.repartition() should shuffle. You don't have to change the number of partitions, even. If that's producing an imbalance, then I'm surprised, and would next have to figure out just what it's hashing the rows on, and whether that's the issue. Or else, why the tasks performing the shuffle aren't spread out, as they should be wide dependencies where locality isn't going to help. > Cannot spread cached partitions evenly across executors > ------------------------------------------------------- > > Key: SPARK-19371 > URL: https://issues.apache.org/jira/browse/SPARK-19371 > Project: Spark > Issue Type: Bug > Affects Versions: 1.6.1 > Reporter: Thunder Stumpges > Attachments: RDD Block Distribution on two executors.png, Unbalanced > RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and > resulting task imbalance.png, execution timeline.png > > > Before running an intensive iterative job (in this case a distributed topic > model training), we need to load a dataset and persist it across executors. > After loading from HDFS and persisting, the partitions are spread unevenly > across executors (based on the initial scheduling of the reads which are not > data locale sensitive). The partition sizes are even, just not their > distribution over executors. We currently have no way to force the partitions > to spread evenly, and as the iterative algorithm begins, tasks are > distributed to executors based on this initial load, forcing some very > unbalanced work. > This has been mentioned a > [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059] > of > [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html] > in > [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html] > user/dev group threads. > None of the discussions I could find had solutions that worked for me. Here > are examples of things I have tried. All resulted in partitions in memory > that were NOT evenly distributed to executors, causing future tasks to be > imbalanced across executors as well. > *Reduce Locality* > {code}spark.shuffle.reduceLocality.enabled=false/true{code} > *"Legacy" memory mode* > {code}spark.memory.useLegacyMode = true/false{code} > *Basic load and repartition* > {code} > val numPartitions = 48*16 > val df = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions). > persist > df.count > {code} > *Load and repartition to 2x partitions, then shuffle repartition down to > desired partitions* > {code} > val numPartitions = 48*16 > val df2 = sqlContext.read. > parquet("/data/folder_to_load"). > repartition(numPartitions*2) > val df = df2.repartition(numPartitions). > persist > df.count > {code} > It would be great if when persisting an RDD/DataFrame, if we could request > that those partitions be stored evenly across executors in preparation for > future tasks. > I'm not sure if this is a more general issue (I.E. not just involving > persisting RDDs), but for the persisted in-memory case, it can make a HUGE > difference in the over-all running time of the remaining work. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org