This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 35c70e4 [SPARK-34853][SQL] Remove duplicated definition of output partitioning/ordering for limit operator 35c70e4 is described below commit 35c70e417d8c6e3958e0da8a4bec731f9e394a28 Author: Cheng Su <chen...@fb.com> AuthorDate: Wed Mar 24 23:06:35 2021 +0900 [SPARK-34853][SQL] Remove duplicated definition of output partitioning/ordering for limit operator ### What changes were proposed in this pull request? Both local limit and global limit define the output partitioning and output ordering in the same way and this is duplicated (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L159-L175 ). We can move the output partitioning and ordering into their parent trait - `BaseLimitExec`. This is doable as `BaseLimitExec` has no more other child class. This is a minor code refactoring. ### Why are the changes needed? Clean up the code a little bit. Better readability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pure refactoring. Rely on existing unit tests. Closes #31950 from c21/limit-cleanup. Authored-by: Cheng Su <chen...@fb.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../main/scala/org/apache/spark/sql/execution/limit.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index d8f67fb..e5a2995 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -113,6 +113,10 @@ object BaseLimitExec { trait BaseLimitExec extends LimitExec with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } @@ -156,12 +160,7 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { /** * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ -case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning -} +case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec /** * Take the first `limit` elements of the child's single output partition. @@ -169,10 +168,6 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org