[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147653#comment-17147653
 ] 

Chen Zhang commented on SPARK-31635:
------------------------------------

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_
 The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + 
global bucket Sorting, and the required number of data is returned after the 
global sorting result is obtained.All major computs are performed in the 
executor process.
 The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by 
PriorityQueue) in each RDD partition in the executor process, and then return 
each TOPK result to the driver process for merging.
 To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

 

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for calculation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -------------------------------------------------
>
>                 Key: SPARK-31635
>                 URL: https://issues.apache.org/jira/browse/SPARK-31635
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: George George
>            Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following 
> exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized 
> results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize 
> (100.0 MB)
> [Stage 0:======>                                                 (12 + 3) / 
> 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total 
> size of serialized results of 13 tasks (100.1 MB) is bigger than 
> spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same 
> arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get 
> killed for our use case. The interesting thing is that when using the RDD API 
> directly the problem is not there. *Looks like there is a bug in dataframe 
> sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to 
> a smaller size, but in our application I've tried setting it to 8GB but as 
> mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to