Repository: spark Updated Branches: refs/heads/master 5cdb8a23d -> 5c27b0d4f
[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in global limit ## What changes were proposed in this pull request? This is based on the discussion https://github.com/apache/spark/pull/16677/files#r212805327. As SQL standard doesn't mandate that a nested order by followed by a limit has to respect that ordering clause, this patch removes the `child.outputOrdering` check. ## How was this patch tested? Unit tests. Closes #22239 from viirya/improve-global-limit-parallelism-followup. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c27b0d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c27b0d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c27b0d4 Branch: refs/heads/master Commit: 5c27b0d4f8d378bd7889d26fb358f478479b9996 Parents: 5cdb8a2 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Mon Aug 27 14:02:50 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Aug 27 14:02:50 2018 +0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/execution/limit.scala | 10 +++++----- .../spark/sql/execution/TakeOrderedAndProjectSuite.scala | 10 ++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5c27b0d4/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 392ca13..fb46970 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { Nil } - // During global limit, try to evenly distribute limited rows across data - // partitions. If disabled, scanning data partitions sequentially until reaching limit number. - // Besides, if child output has certain ordering, we can't evenly pick up rows from - // each parititon. - val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && child.outputOrdering == Nil + // This is an optimization to evenly distribute limited rows across all partitions. + // When enabled, Spark goes to take rows at each partition repeatedly until reaching + // limit number. When disabled, Spark takes all rows at first partition, then rows + // at second partition ..., until reaching limit number. + val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit val shuffled = new ShuffledRowRDD(shuffleDependency) http://git-wip-us.apache.org/repos/asf/spark/blob/5c27b0d4/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala index 7e317a4..0a1c94c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -31,10 +32,19 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { private var rand: Random = _ private var seed: Long = 0 + private val originalLimitFlatGlobalLimit = SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT) + protected override def beforeAll(): Unit = { super.beforeAll() seed = System.currentTimeMillis() rand = new Random(seed) + + // Disable the optimization to make Sort-Limit match `TakeOrderedAndProject` semantics. + SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false) + } + + protected override def afterAll() = { + SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit) } private def generateRandomInputData(): DataFrame = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org