[ https://issues.apache.org/jira/browse/SPARK-39771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuanjian Li resolved SPARK-39771. --------------------------------- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 45266 [https://github.com/apache/spark/pull/45266] > If spark.default.parallelism is unset, RDD defaultPartitioner may pick a > value that is too large to successfully run > -------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-39771 > URL: https://issues.apache.org/jira/browse/SPARK-39771 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.0.0 > Reporter: Josh Rosen > Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > [According to its > docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], > {{Partitioner.defaultPartitioner}} will use the maximum number of RDD > partitions as its partition count when {{spark.default.parallelism}} is not > set. If that number of upstream partitions is very large then this can result > in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can > cause various problems that prevent the job from successfully running. > To help users identify when they have run into this problem, I think we > should add warning logs to Spark. > As an example of the problem, let's say that I have an RDD with 100,000 > partitions and then do a {{reduceByKey}} on it without specifying an explicit > partitioner or partition count. In this case, Spark will plan a reduce stage > with 100,000 partitions: > {code:java} > scala> sc.parallelize(1 to 100000, 100000).map(x => (x, x)).reduceByKey(_ + > _).toDebugString > res7: String = > (100000) ShuffledRDD[21] at reduceByKey at <console>:25 [] > +-(100000) MapPartitionsRDD[20] at map at <console>:25 [] > | ParallelCollectionRDD[19] at parallelize at <console>:25 [] > {code} > This results in the creation of 10 billion shuffle blocks, so if this job > _does_ run it is likely to be extremely show. However, it's more likely that > the driver will crash when serializing map output statuses: if we were able > to use one bit per mapper / reducer pair (which is probably overly optimistic > in terms of compressibility) then the map statuses would be ~1.25 gigabytes > (and the actual size is probably much larger)! > I don't think that users are likely to intentionally wind up in this > scenario: it's more likely that either (a) their job depends on > {{spark.default.parallelism}} being set but it was run on an environment > lacking a value for that config, or (b) their input data significantly grew > in size. These scenarios may be rare, but they can be frustrating to debug > (especially if a failure occurs midway through a long-running job). > I think we should do something to handle this scenario. > A good starting point might be for {{Partitioner.defaultPartitioner}} to log > a warning when the default partition size exceeds some threshold. > In addition, I think it might be a good idea to log a similar warning in > {{MapOutputTrackerMaster}} right before we start trying to serialize map > statuses: in a real-world situation where this problem cropped up, the map > stage ran successfully but the driver crashed when serializing map statuses. > Putting a warning about partition counts here makes it more likely that users > will spot that error in the logs and be able to identify the source of the > problem (compared to a warning that appears much earlier in the job and > therefore much farther from the likely site of a crash). -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org