spark git commit: [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side
Repository: spark Updated Branches: refs/heads/branch-2.3 8509284e1 -> d35eb2f9b [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side ## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. ## How was this patch tested? tested in #21190 Author: Wenchen Fan Closes #21225 from cloud-fan/minor3. (cherry picked from commit e646ae67f2e793204bc819ab2b90815214c2bbf3) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d35eb2f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d35eb2f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d35eb2f9 Branch: refs/heads/branch-2.3 Commit: d35eb2f9b0af1a625749ca8b7f12d8eceed28766 Parents: 8509284 Author: Wenchen Fan Authored: Thu May 3 17:27:13 2018 -0700 Committer: gatorsmile Committed: Thu May 3 17:27:23 2018 -0700 -- .../spark/sql/execution/window/WindowExec.scala | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d35eb2f9/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 800a2ea..626f39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -112,9 +112,11 @@ case class WindowExec( * * @param frame to evaluate. This can either be a Row or Range frame. * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. * @return a bound ordering object. */ - private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = { + private[this] def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { (frame, bound) match { case (RowFrame, CurrentRow) => RowBoundOrdering(0) @@ -144,7 +146,7 @@ case class WindowExec( val boundExpr = (expr.dataType, boundOffset.dataType) match { case (DateType, IntegerType) => DateAdd(expr, boundOffset) case (TimestampType, CalendarIntervalType) => -TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone)) +TimeAdd(expr, boundOffset, Some(timeZone)) case (a, b) if a== b => Add(expr, boundOffset) } val bound = newMutableProjection(boundExpr :: Nil, child.output) @@ -197,6 +199,7 @@ case class WindowExec( // Map the groups to a (unbound) expression and frame factory pair. var numExpressions = 0 +val timeZone = conf.sessionLocalTimeZone framedFunctions.toSeq.map { case (key, (expressions, functionSeq)) => val ordinal = numExpressions @@ -237,7 +240,7 @@ case class WindowExec( new UnboundedPrecedingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, upper)) +createBoundOrdering(frameType, upper, timeZone)) } // Shrinking Frame. @@ -246,7 +249,7 @@ case class WindowExec( new UnboundedFollowingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, lower)) +createBoundOrdering(frameType, lower, timeZone)) } // Moving Frame. @@ -255,8 +258,8 @@ case class WindowExec( new SlidingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, lower), -createBoundOrdering(frameType, upper)) +createBoundOrdering(frameType, lower, timeZone), +createBoundOrdering(frameType, upper, timeZone)) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side
Repository: spark Updated Branches: refs/heads/master e3201e165 -> e646ae67f [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side ## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. ## How was this patch tested? tested in #21190 Author: Wenchen Fan Closes #21225 from cloud-fan/minor3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e646ae67 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e646ae67 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e646ae67 Branch: refs/heads/master Commit: e646ae67f2e793204bc819ab2b90815214c2bbf3 Parents: e3201e1 Author: Wenchen Fan Authored: Thu May 3 17:27:13 2018 -0700 Committer: gatorsmile Committed: Thu May 3 17:27:13 2018 -0700 -- .../spark/sql/execution/window/WindowExec.scala | 15 +-- 1 file changed, 9 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e646ae67/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 800a2ea..626f39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -112,9 +112,11 @@ case class WindowExec( * * @param frame to evaluate. This can either be a Row or Range frame. * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. * @return a bound ordering object. */ - private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = { + private[this] def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { (frame, bound) match { case (RowFrame, CurrentRow) => RowBoundOrdering(0) @@ -144,7 +146,7 @@ case class WindowExec( val boundExpr = (expr.dataType, boundOffset.dataType) match { case (DateType, IntegerType) => DateAdd(expr, boundOffset) case (TimestampType, CalendarIntervalType) => -TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone)) +TimeAdd(expr, boundOffset, Some(timeZone)) case (a, b) if a== b => Add(expr, boundOffset) } val bound = newMutableProjection(boundExpr :: Nil, child.output) @@ -197,6 +199,7 @@ case class WindowExec( // Map the groups to a (unbound) expression and frame factory pair. var numExpressions = 0 +val timeZone = conf.sessionLocalTimeZone framedFunctions.toSeq.map { case (key, (expressions, functionSeq)) => val ordinal = numExpressions @@ -237,7 +240,7 @@ case class WindowExec( new UnboundedPrecedingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, upper)) +createBoundOrdering(frameType, upper, timeZone)) } // Shrinking Frame. @@ -246,7 +249,7 @@ case class WindowExec( new UnboundedFollowingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, lower)) +createBoundOrdering(frameType, lower, timeZone)) } // Moving Frame. @@ -255,8 +258,8 @@ case class WindowExec( new SlidingWindowFunctionFrame( target, processor, -createBoundOrdering(frameType, lower), -createBoundOrdering(frameType, upper)) +createBoundOrdering(frameType, lower, timeZone), +createBoundOrdering(frameType, upper, timeZone)) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org