[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7513 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122717523 I am merging it to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122716747 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122716729 [Test build #37782 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37782/console) for PR 7513 at commit [`4e69d08`](https://github.com/apache/spark/commit/4e69d08ab0292220a3cf67d3011947f80f931ed7). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class DayOfYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes ` * `case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCastInputTypes ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122709323 LGTM. Will merge it to master once it passes jenkins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122708899 [Test build #37782 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37782/consoleFull) for PR 7513 at commit [`4e69d08`](https://github.com/apache/spark/commit/4e69d08ab0292220a3cf67d3011947f80f931ed7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122708823 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122708829 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122708770 ah, I guess it is because I replied through email. Let's see if jenkins gets it this time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122708749 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122707719 @yhuai Don't think jenkins picked up your OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122689642 ok to test On Sun, Jul 19, 2015 at 9:02 AM -0700, "UCB AMPLab" wrote: Can one of the admins verify this patch? â Reply to this email directly or view it on GitHub. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122674562 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122674518 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7513#issuecomment-122674419 cc @yhuai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/7513 [SPARK-8638] [SQL] Window Function Performance Improvements - Cleanup This PR contains a few clean-ups that are a part of SPARK-8638: a few style issues got fixed, and a few tests were moved. Git commit message is wrong BTW :(... You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-8638-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7513.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7513 commit 4e69d08ab0292220a3cf67d3011947f80f931ed7 Author: Herman van Hovell Date: 2015-07-19T15:52:28Z Fixed Perfomance Regression for Shrinking Window Frames (+Rebase) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r3498 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,667 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame => +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + // Create the projection which returns the current 'value'. + val current = newMutableProjection(expr :: Nil, child.output)() + // Flip the sign of the offset when processing the order is descending + val boundOffset = if (sortExpr.direction == Descending) -offset + else offset + // Create the projection which returns the current 'value' modified by adding the offset. + val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r3496 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,667 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame => +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + // Create the projection which returns the current 'value'. + val current = newMutableProjection(expr :: Nil, child.output)() + // Flip the sign of the offset when processing the order is descending + val boundOffset = if (sortExpr.direction == Descending) -offset + else offset --- End diff -- ``` val boundOffset = if (sortExpr.direction == Descending) -offset else offset ``` --- If your project is set up for it, you can reply to th
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r3497 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,667 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame => +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { --- End diff -- `} else if` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34955549 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { --- End diff -- Seems we do not need to create a new suite, right? We can just use `HiveDataFrameWindowSuite`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7057 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122633626 @hvanhovell Overall looks good. I am merging it to master. I will leave a few comments for minor changes. Can you submit a follow-up PR to address them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122611251 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122611245 [Test build #37743 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37743/console) for PR 7057 at commit [`3bfdc49`](https://github.com/apache/spark/commit/3bfdc491a3c1cad3cf07585388cc448b04bacd98). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122607711 [Test build #37743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37743/consoleFull) for PR 7057 at commit [`3bfdc49`](https://github.com/apache/spark/commit/3bfdc491a3c1cad3cf07585388cc448b04bacd98). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122607604 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122607610 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122161494 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122161458 [Test build #37567 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37567/console) for PR 7057 at commit [`7207ef5`](https://github.com/apache/spark/commit/7207ef58ded5feb9fcd8651650153e1e82416971). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122158569 @yhuai the benchmarking results are attached. It might be interesting to see how the operator performs on different datasets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122151112 [Test build #37567 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37567/consoleFull) for PR 7057 at commit [`7207ef5`](https://github.com/apache/spark/commit/7207ef58ded5feb9fcd8651650153e1e82416971). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34857571 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { + + test("reverse sliding range frame") { +val df = Seq( + (1, "Thin", "Cell Phone", 6000), + (2, "Normal", "Tablet", 1500), + (3, "Mini", "Tablet", 5500), + (4, "Ultra thin", "Cell Phone", 5500), + (5, "Very thin", "Cell Phone", 6000), + (6, "Big", "Tablet", 2500), + (7, "Bendable", "Cell Phone", 3000), + (8, "Foldable", "Cell Phone", 3000), + (9, "Pro", "Tablet", 4500), + (10, "Pro2", "Tablet", 6500)). + toDF("id", "product", "category", "revenue") +checkAnswer( + df.select( +$"id", +avg($"revenue").over(Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L)). + cast("int")), +Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) :: + Row(10, 5500) :: Nil) --- End diff -- Yeah that was a mistake on my behalf. I didn't realise that offsets should be flipped on when the order is descending (I swapped the high and low offsets). I pushed a fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122150655 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122150637 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122132838 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122132788 [Test build #37545 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37545/console) for PR 7057 at commit [`c3e4287`](https://github.com/apache/spark/commit/c3e428719140a21a84ad88294b94778d48f7d768). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122130906 @hvanhovell I remember you have some benchmarking results. Can you add results to the description? Also, does your benchmark include tests for all of four kinds of frames (entire partition, growing frame, shrinking frame, and moving frame)? It will be good if we can have results for all these kinds of frames and we make sure there is no performance regression (I think it is unlikely that we introduce regression. But, it still good to have benchmarking results for different kinds of cases). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34848738 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { + + test("reverse sliding range frame") { +val df = Seq( + (1, "Thin", "Cell Phone", 6000), + (2, "Normal", "Tablet", 1500), + (3, "Mini", "Tablet", 5500), + (4, "Ultra thin", "Cell Phone", 5500), + (5, "Very thin", "Cell Phone", 6000), + (6, "Big", "Tablet", 2500), + (7, "Bendable", "Cell Phone", 3000), + (8, "Foldable", "Cell Phone", 3000), + (9, "Pro", "Tablet", 4500), + (10, "Pro2", "Tablet", 6500)). + toDF("id", "product", "category", "revenue") +checkAnswer( + df.select( +$"id", +avg($"revenue").over(Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L)). + cast("int")), +Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) :: + Row(4, 5833) :: Row(5, 5833) :: Row(6, 2000) :: + Row(7, 3000) :: Row(8, 3000) :: Row(9, 4166) :: + Row(10, 5500) :: Nil) --- End diff -- Actually, this result seems not correct. For example, when we process `(10, "Pro2", "Tablet", 6500)`. The range of revenue for the frame will be [5500, 8500]. Since the ordering direction is `desc`, `2000 preceding` means `8500` and `1000 following` means `5500`. So, the avg value will be `6000`. The result I got with Spark 1.4 and Hive is. ``` +--+--+--+---++ |id| product| category|revenue| avg| +--+--+--+---++ |10| Pro2|Tablet| 6500|6000| | 3| Mini|Tablet| 5500|5500| | 9| Pro|Tablet| 4500|5500| | 6| Big|Tablet| 2500|2833| | 2|Normal|Tablet| 1500|2000| | 1| Thin|Cell Phone| 6000|5833| | 5| Very thin|Cell Phone| 6000|5833| | 4|Ultra thin|Cell Phone| 5500|5833| | 7| Bendable|Cell Phone| 3000|3000| | 8| Foldable|Cell Phone| 3000|3000| +--+--+--+---++ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34847515 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowSuite.scala --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + +/** + * Window expressions are tested extensively by the following test suites: + * [[org.apache.spark.sql.hive.HiveDataFrameWindowSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryWithoutCodeGenSuite]] + * [[org.apache.spark.sql.hive.execution.HiveWindowFunctionQueryFileWithoutCodeGenSuite]] + * However these suites do not cover all possible (i.e. more exotic) settings. This suite fill + * this gap. + * + * TODO Move this class to the sql/core project when we move to Native Spark UDAFs. + */ +class WindowSuite extends QueryTest { + + test("reverse sliding range frame") { +val df = Seq( + (1, "Thin", "Cell Phone", 6000), + (2, "Normal", "Tablet", 1500), + (3, "Mini", "Tablet", 5500), + (4, "Ultra thin", "Cell Phone", 5500), + (5, "Very thin", "Cell Phone", 6000), + (6, "Big", "Tablet", 2500), + (7, "Bendable", "Cell Phone", 3000), + (8, "Foldable", "Cell Phone", 3000), + (9, "Pro", "Tablet", 4500), + (10, "Pro2", "Tablet", 6500)). + toDF("id", "product", "category", "revenue") +checkAnswer( + df.select( +$"id", +avg($"revenue").over(Window. + partitionBy($"category"). + orderBy($"revenue".desc). + rangeBetween(-2000L, 1000L)). + cast("int")), --- End diff -- How about we format it like ``` val window = Window .partitionBy($"category") .orderBy($"revenue".desc) .rangeBetween(-2000L, 1000L)) checkAnswer( df.select($"id", avg($"revenue").over(window).cast("int")), ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34847128 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,661 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame => +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + // Create the projection which returns the current 'value'. + val current = newMutableProjection(expr :: Nil, child.output)() + // Create the projection which returns the current 'value' modified by adding the offset. + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +else { + sys.error("No
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122120479 [Test build #37545 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37545/consoleFull) for PR 7057 at commit [`c3e4287`](https://github.com/apache/spark/commit/c3e428719140a21a84ad88294b94778d48f7d768). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34844502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDE
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34844383 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +84,661 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frameType to evaluate. This can either be Row or Range based. + * @param offset with respect to the row. + * @return a bound ordering object. + */ + private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = { +frameType match { + case RangeFrame => +val (exprs, current, bound) = if (offset == 0) { + // Use the entire order expression when the offset is 0. + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +else if (windowSpec.orderSpec.size == 1) { + // Use only the first order expression when the offset is non-null. + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + // Create the projection which returns the current 'value'. + val current = newMutableProjection(expr :: Nil, child.output)() + // Create the projection which returns the current 'value' modified by adding the offset. + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +else { + sys.erro
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122119926 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-122119941 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34618909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDE
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34618831 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34617761 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDE
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34616373 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34616177 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDE
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34613697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34613255 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34612767 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34612470 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -38,443 +68,645 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) -} - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - // Create window functions. - private[this] def windowFunctions(): Array[WindowFunction] = { -val functions = new Array[WindowFunction](computedWindowExpressions.length) -var i = 0 -while (i < computedWindowExpressions.length) { - functions(i) = computedWindowExpressions(i).windowFunction.newInstance() - functions(i).init() - i += 1 + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. There + * are two types of boundaries that can be evaluated: + * - Row Based: A row based boundary is based on the position of the row within the partition. + * The offset indicates the number of rows above or below the current row, the frame for the + * current row starts or ends. For instance, given a row based sliding frame with a lower bound + * offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from + * index 4 to index 6. + * - Range based: A range based boundary is based on the actual value of the ORDER BY + * expression(s). The offset is used to alter the value of the ORDER BY expression, for + * instance if the current order by expression has a value of 10 and the lower bound offset + * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a + * number of constraints on the ORDER BY
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121348737 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121348609 [Test build #37244 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37244/console) for PR 7057 at commit [`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34604350 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- ah, I see. Sorry for missing it. No, we do not need to add this to the Window operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34603151 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Tiny follow-up. Now I know why I have removed it: ```org.apache.spark.sql.execution.UnaryNode```, the parent class of ```Window```, already implements this in exactly the same way: ``` private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] { self: Product => override def outputPartitioning: Partitioning = child.outputPartitioning } ``` So do I still need to add this to the ```Window``` class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34602077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Ahhh... Now I understand. I got confused with ```outputOrdering```, sorry about that. I'll add it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34599617 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Seems we are still missing `override def outputPartitioning: Partitioning = child.outputPartitioning`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34599594 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala --- @@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest { df.select( $"key", last("value").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(1, Long.MaxValue)) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0)) --- End diff -- I think the test dataset is not a useful one. How about we change it `val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value1")` and for the last_value's frame, we use `rangeBetween(-2, -1)`? Although `last_value` is not a deterministic function in general, if we apply it on the order by expression, the result will be deterministic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121323095 [Test build #37244 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37244/consoleFull) for PR 7057 at commit [`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121322206 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121322231 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121321874 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-121071373 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119763356 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119763236 [Test build #36850 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36850/console) for PR 7057 at commit [`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119752352 @yhuai I have updated the PR. As for the documentation. I will add another section to the general class documentation, which explains the inner workings of the operator. Let me know what else needs some more documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119750074 [Test build #36850 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36850/consoleFull) for PR 7057 at commit [`480bb05`](https://github.com/apache/spark/commit/480bb0521be005f937e2098333c4ac42ff65bd71). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119749448 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119749475 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34205943 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala --- @@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest { df.select( $"key", last("value").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(1, Long.MaxValue)) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0)) --- End diff -- So I have been looking at this, and I am not to sure what is being tested here. It is currently trivial because we are trying to get the LAST_VALUE of a column that is used in the partitioning statement - the result is a constant within a partition. @chenghao-intel I think you wrote this test, can you perhaps tell me what is supposed to be tested here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34182441 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { --- End diff -- I'll look into it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34182012 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { --- End diff -- Lets move createBoundOrdering in a separate method. This actually what scalac already does already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-119689504 @hvanhovell I have finished my first round. Sorry for taking a long time. I think I understand the new workflow of the operator and it looks pretty good. I think it will be great if we can have more comments to explain how it works (specially for some important methods like `createBoundOrdering`). I will focus more on readability of the code in my next round. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34180871 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { --- End diff -- Let's add some examples to demonstrate how it works for different types of frames, different frame boundaries and different ordering direction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34180730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { --- End diff -- This code block is pretty long, can we break it to multiple parts? Or, can we take `createBoundOrdering`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34180206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def win
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34180117 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def win
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34179282 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def windowFu
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34178681 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def windowFu
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34175570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def win
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34174597 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +67,615 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = { -// The required child ordering has two parts. -// The first part is the expressions in the partition specification. -// We add these expressions to the required ordering to make sure input rows are grouped -// based on the partition specification. So, we only need to process a single partition -// at a time. -// The second part is the expressions specified in the ORDER BY cluase. -// Basically, we first use sort to group rows based on partition specifications and then sort -// Rows in a group based on the order specification. -(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) :: Nil +} else ClusteredDistribution(windowSpec.partitionSpec) :: Nil } - // Since window functions basically add columns to input rows, this operator - // will not change the ordering of input rows. + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(windowSpec.partitionSpec.map(SortOrder(_, Ascending)) ++ windowSpec.orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering - case class ComputedWindow( -unbound: WindowExpression, -windowFunction: WindowFunction, -resultAttribute: AttributeReference) - - // A list of window functions that need to be computed for each group. - private[this] val computedWindowExpressions = windowExpression.flatMap { window => -window.collect { - case w: WindowExpression => -ComputedWindow( - w, - BindReferences.bindReference(w.windowFunction, child.output), - AttributeReference(s"windowResult:$w", w.dataType, w.nullable)()) + @transient + private[this] lazy val (factories, projection, columns) = { +// Helper method for creating bound ordering objects. +def createBoundOrdering(frameType: FrameType, offset: Int) = frameType match { + case RangeFrame => +// Use the entire order expression when the offset is 0. +val (exprs, current, bound) = if (offset == 0) { + val exprs = windowSpec.orderSpec.map(_.child) + val projection = newMutableProjection(exprs, child.output) + (windowSpec.orderSpec, projection(), projection()) +} +// Use only the first order expression when the offset is non-null. +else { + val sortExpr = windowSpec.orderSpec.head + val expr = sortExpr.child + val boundExpr = Add(expr, Cast(Literal.create(offset, IntegerType), expr.dataType)) + val current = newMutableProjection(expr :: Nil, child.output)() + val bound = newMutableProjection(boundExpr :: Nil, child.output)() + (sortExpr :: Nil, current, bound) +} +// Construct the ordering. +val (sortExprs, schema) = exprs.zipWithIndex.map { case (e, i) => + val ref = AttributeReference(s"c_$i", e.dataType, e.nullable)() + (SortOrder(ref, e.direction), ref) +}.unzip +val ordering = newOrdering(sortExprs, schema) +RangeBoundOrdering(ordering, current, bound) + case RowFrame => RowBoundOrdering(offset) } - }.toArray - - private[this] val windowFrame = -windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame] - // Create window functions. - private[this] def windowFu
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34169654 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- The line has moved after the documentation updates :(... it is now 84... My bad. This hasn't changed wrt the current Window implementation. The method only got moved, and comments got removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34168611 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- Seems line 77 is for `requiredChildDistribution`? At here, we have two concepts. `requiredChildDistribution` specifies the requirement of the distribution of input rows (e.g. input rows need to be grouped by column `a`). `outputPartitioning` indicates the actual partitioning scheme of the output rows of this physical operator (e.g. output rows of this operator is grouped by column `a` through a hash partitioning scheme with 20 partitions). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34168240 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -19,17 +19,47 @@ package org.apache.spark.sql.execution import java.util -import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer +import scala.collection.mutable /** * :: DeveloperApi :: - * For every row, evaluates `windowExpression` containing Window Functions and attaches - * the results with other regular expressions (presented by `projectList`). - * Evert operator handles a single Window Specification, `windowSpec`. + * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted) + * partition. The aggregates are calculated for each row in the group. Special processing + * instructions, frames, are used to calculate these aggregates. Frames are processed in the order + * specified in the window specification (the ORDER BY ... clause). There are four different frame + * types: + * - Entire partition: The frame is the entire partition, i.e. + * UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all + * rows as inputs and be evaluated once. + * - Growing frame: We only add new rows into the frame, i.e. UNBOUNDED PRECEDING AND + * Every time we move to a new row to process, we add some rows to the frame. We do not remove + * rows from this frame. + * - Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING. + * Every time we move to a new row to process, we remove some rows from the frame. We do not add + * rows to this frame. The frame will originally contain all rows of the partition. --- End diff -- I made a mistake at here. When we have `n FOLLOWING AND UNBOUNDED FOLLOWING`, the frame will contain all rows of the partition at the beginning. So, we can remove `The frame will originally contain all rows of the partition.`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r34163185 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala --- @@ -189,7 +189,7 @@ class HiveDataFrameWindowSuite extends QueryTest { df.select( $"key", last("value").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(1, Long.MaxValue)) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 0)) --- End diff -- Yea. The test is wrong. The test data of this one is a bad one (`val df = Seq((1, "1"), (2, "2"), (2, "2"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")`). This dataset cannot test `last_value`. Can you change the dataset? Thank you for pointing out the problem! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117739502 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117739311 [Test build #36267 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36267/console) for PR 7057 at commit [`34f63f8`](https://github.com/apache/spark/commit/34f63f8b3a9f96bd0eb32dee1ec6ee328c63ab5d). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117703601 [Test build #36267 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36267/consoleFull) for PR 7057 at commit [`34f63f8`](https://github.com/apache/spark/commit/34f63f8b3a9f96bd0eb32dee1ec6ee328c63ab5d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117702439 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117702352 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117461905 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117461635 **[Test build #36225 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36225/console)** for PR 7057 at commit [`27b0329`](https://github.com/apache/spark/commit/27b03293c51ab20b572958fa207071360ed2418c) after a configured wait of `175m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117426446 [Test build #36225 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36225/consoleFull) for PR 7057 at commit [`27b0329`](https://github.com/apache/spark/commit/27b03293c51ab20b572958fa207071360ed2418c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117426229 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7057#issuecomment-117426218 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33642464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -19,17 +19,39 @@ package org.apache.spark.sql.execution import java.util -import org.apache.spark.rdd.RDD +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.CompactBuffer +import scala.collection.mutable /** * :: DeveloperApi :: - * For every row, evaluates `windowExpression` containing Window Functions and attaches - * the results with other regular expressions (presented by `projectList`). - * Evert operator handles a single Window Specification, `windowSpec`. + * This class calculates and outputs (windowed) aggregates over the rows in a single sorted group. + * The aggregates are calculated for each row in the group. An aggregate can take a few forms: + * - Global: The aggregate is calculated for the entire group. Every row has the same value. + * - Rows: The aggregate is calculated based on a subset of the window, and is unique for each + * row and depends on the position of the given row within the window. The group must be sorted + * for this to produce sensible output. Examples are moving averages, running sums and row + * numbers. + * - Range: The aggregate is calculated based on a subset of the window, and is unique for each + * value of the order by clause and depends on its ordering. The group must be sorted for this to + * produce sensible output. + * - Shifted: The aggregate is a displaced value relative to the position of the given row. + * Examples are Lead and Lag. --- End diff -- The PR also optimizes the processing of Moving and Shrinking frames: * For moving frame processing the number of comparisons are reduced. This didn't look like the most rewarding improvement, but I was surprised to find it did improved performance by quite a margin. * Shrinking frames are indeed processed in reverse order. Which makes building it as fast as the growing case (it uses more memory though). I share your concerns, and solving this at the root (the function itself) would indeed be the best. I'll revert this for now, and file a JIRA request for future reference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8638] [SQL] Window Function Performance...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/7057#discussion_r33642234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala --- @@ -37,443 +59,622 @@ case class Window( child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = -(projectList ++ windowExpression).map(_.toAttribute) + override def output: Seq[Attribute] = projectList ++ windowExpression.map(_.toAttribute) - override def requiredChildDistribution: Seq[Distribution] = + override def requiredChildDistribution: Seq[Distribution] = { if (windowSpec.partitionSpec.isEmpty) { - // This operator will be very expensive. + // Only show warning when the number of bytes is larger than 100 MB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " ++ "partition, this can cause serious performance degradation.") AllTuples :: Nil -} else { - ClusteredDistribution(windowSpec.partitionSpec) :: Nil -} - - // Since window functions are adding columns to the input rows, the child's outputPartitioning - // is preserved. - override def outputPartitioning: Partitioning = child.outputPartitioning --- End diff -- It is on line 77 in the new version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org