[ https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102285#comment-17102285 ]
Xianjin YE commented on SPARK-24193: ------------------------------------ Hi, [~jinxing6...@126.com] [~cloud_fan] the fallback config has a correctness issue, we may need to revert this change in Spark 2.4 and Spark 3.0 Way to reproduce: {code:java} val spark = SparkSession .builder .appName("Spark TopK test") .master("local-cluster[8, 1, 1024]") .getOrCreate() val data = spark.range(100000, 0, -1, 10).toDF("id").selectExpr("id + 1 as id") spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100) val topKInSort = data.orderBy("id").limit(200).rdd.collect() spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue) val topKInMemory = data.orderBy("id").limit(200).rdd.collect() println(topKInMemory.map(_.getLong(0)).mkString("[", ",", "]")) println(topKInSort.map(_.getLong(0)).mkString("[", ",", "]")) assert(topKInMemory sameElements topKInSort) {code} The issue: `CollectLimitExec`'s core idea is `sortedRDD.mapPartitionsInternal(_.take(limit)).repartition(1).mapPartitionsInternal(_.take(limit))` which doesn't guarantee the ordering semantics, so we cannot simply fallback to CollectLimitExec. Proposal to fix: # revert the fallback logic # implements CollectLimitExec similar with CollectTailExec, however it may not suitable if the k in TopK is large enough. # Another one is to do a similar calculation of CollectTailExec, however we only collect record number of each partition and we can take the exact number of records in each partition by leverage that information. > Sort by disk when number of limit is big in TakeOrderedAndProjectExec > --------------------------------------------------------------------- > > Key: SPARK-24193 > URL: https://issues.apache.org/jira/browse/SPARK-24193 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 2.3.0 > Reporter: Jin Xing > Assignee: Jin Xing > Priority: Major > Fix For: 2.4.0 > > > Physical plan of "_select colA from t order by colB limit M_" is > _TakeOrderedAndProject_; > Currently _TakeOrderedAndProject_ sorts data in memory, see > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158 > > Shall we add a config -- if the number of limit (M) is too big, we can sort > by disk ? Thus memory issue can be resolved. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org