spark git commit: [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

2018-05-03 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8509284e1 -> d35eb2f9b


[SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`WindowExec#createBoundOrdering` is called on executor side, so we can't use 
`conf.sessionLocalTimezone` there.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan 

Closes #21225 from cloud-fan/minor3.

(cherry picked from commit e646ae67f2e793204bc819ab2b90815214c2bbf3)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d35eb2f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d35eb2f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d35eb2f9

Branch: refs/heads/branch-2.3
Commit: d35eb2f9b0af1a625749ca8b7f12d8eceed28766
Parents: 8509284
Author: Wenchen Fan 
Authored: Thu May 3 17:27:13 2018 -0700
Committer: gatorsmile 
Committed: Thu May 3 17:27:23 2018 -0700

--
 .../spark/sql/execution/window/WindowExec.scala  | 15 +--
 1 file changed, 9 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d35eb2f9/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 800a2ea..626f39d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -112,9 +112,11 @@ case class WindowExec(
*
* @param frame to evaluate. This can either be a Row or Range frame.
* @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
* @return a bound ordering object.
*/
-  private[this] def createBoundOrdering(frame: FrameType, bound: Expression): 
BoundOrdering = {
+  private[this] def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
 (frame, bound) match {
   case (RowFrame, CurrentRow) =>
 RowBoundOrdering(0)
@@ -144,7 +146,7 @@ case class WindowExec(
 val boundExpr = (expr.dataType, boundOffset.dataType) match {
   case (DateType, IntegerType) => DateAdd(expr, boundOffset)
   case (TimestampType, CalendarIntervalType) =>
-TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+TimeAdd(expr, boundOffset, Some(timeZone))
   case (a, b) if a== b => Add(expr, boundOffset)
 }
 val bound = newMutableProjection(boundExpr :: Nil, child.output)
@@ -197,6 +199,7 @@ case class WindowExec(
 
 // Map the groups to a (unbound) expression and frame factory pair.
 var numExpressions = 0
+val timeZone = conf.sessionLocalTimeZone
 framedFunctions.toSeq.map {
   case (key, (expressions, functionSeq)) =>
 val ordinal = numExpressions
@@ -237,7 +240,7 @@ case class WindowExec(
   new UnboundedPrecedingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, upper))
+createBoundOrdering(frameType, upper, timeZone))
 }
 
   // Shrinking Frame.
@@ -246,7 +249,7 @@ case class WindowExec(
   new UnboundedFollowingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, lower))
+createBoundOrdering(frameType, lower, timeZone))
 }
 
   // Moving Frame.
@@ -255,8 +258,8 @@ case class WindowExec(
   new SlidingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, lower),
-createBoundOrdering(frameType, upper))
+createBoundOrdering(frameType, lower, timeZone),
+createBoundOrdering(frameType, upper, timeZone))
 }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

2018-05-03 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master e3201e165 -> e646ae67f


[SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`WindowExec#createBoundOrdering` is called on executor side, so we can't use 
`conf.sessionLocalTimezone` there.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan 

Closes #21225 from cloud-fan/minor3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e646ae67
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e646ae67
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e646ae67

Branch: refs/heads/master
Commit: e646ae67f2e793204bc819ab2b90815214c2bbf3
Parents: e3201e1
Author: Wenchen Fan 
Authored: Thu May 3 17:27:13 2018 -0700
Committer: gatorsmile 
Committed: Thu May 3 17:27:13 2018 -0700

--
 .../spark/sql/execution/window/WindowExec.scala  | 15 +--
 1 file changed, 9 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e646ae67/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 800a2ea..626f39d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -112,9 +112,11 @@ case class WindowExec(
*
* @param frame to evaluate. This can either be a Row or Range frame.
* @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
* @return a bound ordering object.
*/
-  private[this] def createBoundOrdering(frame: FrameType, bound: Expression): 
BoundOrdering = {
+  private[this] def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
 (frame, bound) match {
   case (RowFrame, CurrentRow) =>
 RowBoundOrdering(0)
@@ -144,7 +146,7 @@ case class WindowExec(
 val boundExpr = (expr.dataType, boundOffset.dataType) match {
   case (DateType, IntegerType) => DateAdd(expr, boundOffset)
   case (TimestampType, CalendarIntervalType) =>
-TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+TimeAdd(expr, boundOffset, Some(timeZone))
   case (a, b) if a== b => Add(expr, boundOffset)
 }
 val bound = newMutableProjection(boundExpr :: Nil, child.output)
@@ -197,6 +199,7 @@ case class WindowExec(
 
 // Map the groups to a (unbound) expression and frame factory pair.
 var numExpressions = 0
+val timeZone = conf.sessionLocalTimeZone
 framedFunctions.toSeq.map {
   case (key, (expressions, functionSeq)) =>
 val ordinal = numExpressions
@@ -237,7 +240,7 @@ case class WindowExec(
   new UnboundedPrecedingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, upper))
+createBoundOrdering(frameType, upper, timeZone))
 }
 
   // Shrinking Frame.
@@ -246,7 +249,7 @@ case class WindowExec(
   new UnboundedFollowingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, lower))
+createBoundOrdering(frameType, lower, timeZone))
 }
 
   // Moving Frame.
@@ -255,8 +258,8 @@ case class WindowExec(
   new SlidingWindowFunctionFrame(
 target,
 processor,
-createBoundOrdering(frameType, lower),
-createBoundOrdering(frameType, upper))
+createBoundOrdering(frameType, lower, timeZone),
+createBoundOrdering(frameType, upper, timeZone))
 }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org