[spark] branch branch-3.0 updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 43b5116  [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
43b5116 is described below

commit 43b511644a0fd0d01f190dc137884fc1081c1933
Author: Takeshi Yamamuro 
AuthorDate: Sat Apr 18 13:37:12 2020 -0700

[SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

### What changes were proposed in this pull request?

This PR intends to add an ExpressionInfo entry for EXTRACT for better 
documentations.
This PR comes from the comment in 
https://github.com/apache/spark/pull/21479#discussion_r409900080

### Why are the changes needed?

To make SQL documentations complete.

### Does this PR introduce any user-facing change?

Yes, this PR updates the `Spark SQL, Built-in Functions` page.

### How was this patch tested?

Run the example tests.

Closes #28251 from maropu/AddExtractExpr.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../catalyst/expressions/datetimeExpressions.scala |  85 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |   2 +-
 .../resources/sql-tests/results/extract.sql.out| 124 ++---
 .../sql-tests/results/postgreSQL/date.sql.out  |  52 -
 5 files changed, 153 insertions(+), 111 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index c11186e..341a335 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -423,6 +423,7 @@ object FunctionRegistry {
 expression[MakeTimestamp]("make_timestamp"),
 expression[MakeInterval]("make_interval"),
 expression[DatePart]("date_part"),
+expression[Extract]("extract"),
 
 // collection functions
 expression[CreateArray]("array"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 391e0d0..323aa3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -2103,6 +2103,31 @@ object DatePart {
   }
 }
 
+object DatePartLike {
+
+  def toEquivalentExpr(field: Expression, source: Expression): Expression = {
+if (!field.foldable) {
+  throw new AnalysisException("The field parameter needs to be a foldable 
string value.")
+}
+val fieldEval = field.eval()
+if (fieldEval == null) {
+  Literal(null, DoubleType)
+} else {
+  val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
+  val errMsg = s"Literals of type '$fieldStr' are currently not supported 
" +
+s"for the ${source.dataType.catalogString} type."
+  if (source.dataType == CalendarIntervalType) {
+ExtractIntervalPart.parseExtractField(
+  fieldStr,
+  source,
+  throw new AnalysisException(errMsg))
+  } else {
+DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
+  }
+}
+  }
+}
+
 @ExpressionDescription(
   usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or 
interval source.",
   arguments = """
@@ -2156,28 +2181,8 @@ object DatePart {
 case class DatePart(field: Expression, source: Expression, child: Expression)
   extends RuntimeReplaceable {
 
-  def this(field: Expression, source: Expression) {
-this(field, source, {
-  if (!field.foldable) {
-throw new AnalysisException("The field parameter needs to be a 
foldable string value.")
-  }
-  val fieldEval = field.eval()
-  if (fieldEval == null) {
-Literal(null, DoubleType)
-  } else {
-val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
-val errMsg = s"Literals of type '$fieldStr' are currently not 
supported " +
-  s"for the ${source.dataType.catalogString} type."
-if (source.dataType == CalendarIntervalType) {
-  ExtractIntervalPart.parseExtractField(
-fieldStr,
-source,
-throw new AnalysisException(errMsg))
-} else {
-  DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
-}
-  }
-})

[spark] branch branch-3.0 updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 43b5116  [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
43b5116 is described below

commit 43b511644a0fd0d01f190dc137884fc1081c1933
Author: Takeshi Yamamuro 
AuthorDate: Sat Apr 18 13:37:12 2020 -0700

[SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

### What changes were proposed in this pull request?

This PR intends to add an ExpressionInfo entry for EXTRACT for better 
documentations.
This PR comes from the comment in 
https://github.com/apache/spark/pull/21479#discussion_r409900080

### Why are the changes needed?

To make SQL documentations complete.

### Does this PR introduce any user-facing change?

Yes, this PR updates the `Spark SQL, Built-in Functions` page.

### How was this patch tested?

Run the example tests.

Closes #28251 from maropu/AddExtractExpr.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../catalyst/expressions/datetimeExpressions.scala |  85 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |   2 +-
 .../resources/sql-tests/results/extract.sql.out| 124 ++---
 .../sql-tests/results/postgreSQL/date.sql.out  |  52 -
 5 files changed, 153 insertions(+), 111 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index c11186e..341a335 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -423,6 +423,7 @@ object FunctionRegistry {
 expression[MakeTimestamp]("make_timestamp"),
 expression[MakeInterval]("make_interval"),
 expression[DatePart]("date_part"),
+expression[Extract]("extract"),
 
 // collection functions
 expression[CreateArray]("array"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 391e0d0..323aa3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -2103,6 +2103,31 @@ object DatePart {
   }
 }
 
+object DatePartLike {
+
+  def toEquivalentExpr(field: Expression, source: Expression): Expression = {
+if (!field.foldable) {
+  throw new AnalysisException("The field parameter needs to be a foldable 
string value.")
+}
+val fieldEval = field.eval()
+if (fieldEval == null) {
+  Literal(null, DoubleType)
+} else {
+  val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
+  val errMsg = s"Literals of type '$fieldStr' are currently not supported 
" +
+s"for the ${source.dataType.catalogString} type."
+  if (source.dataType == CalendarIntervalType) {
+ExtractIntervalPart.parseExtractField(
+  fieldStr,
+  source,
+  throw new AnalysisException(errMsg))
+  } else {
+DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
+  }
+}
+  }
+}
+
 @ExpressionDescription(
   usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or 
interval source.",
   arguments = """
@@ -2156,28 +2181,8 @@ object DatePart {
 case class DatePart(field: Expression, source: Expression, child: Expression)
   extends RuntimeReplaceable {
 
-  def this(field: Expression, source: Expression) {
-this(field, source, {
-  if (!field.foldable) {
-throw new AnalysisException("The field parameter needs to be a 
foldable string value.")
-  }
-  val fieldEval = field.eval()
-  if (fieldEval == null) {
-Literal(null, DoubleType)
-  } else {
-val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
-val errMsg = s"Literals of type '$fieldStr' are currently not 
supported " +
-  s"for the ${source.dataType.catalogString} type."
-if (source.dataType == CalendarIntervalType) {
-  ExtractIntervalPart.parseExtractField(
-fieldStr,
-source,
-throw new AnalysisException(errMsg))
-} else {
-  DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
-}
-  }
-})

[spark] branch master updated (6c2bf82 -> 74aed8c)

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6c2bf82  [SPARK-31442][SQL] Print shuffle id at coalesce partitions 
target size
 add 74aed8c  [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../catalyst/expressions/datetimeExpressions.scala |  85 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |   2 +-
 .../resources/sql-tests/results/extract.sql.out| 124 ++---
 .../sql-tests/results/postgreSQL/date.sql.out  |  52 -
 5 files changed, 153 insertions(+), 111 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 74aed8c  [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
74aed8c is described below

commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40
Author: Takeshi Yamamuro 
AuthorDate: Sat Apr 18 13:37:12 2020 -0700

[SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT

### What changes were proposed in this pull request?

This PR intends to add an ExpressionInfo entry for EXTRACT for better 
documentations.
This PR comes from the comment in 
https://github.com/apache/spark/pull/21479#discussion_r409900080

### Why are the changes needed?

To make SQL documentations complete.

### Does this PR introduce any user-facing change?

Yes, this PR updates the `Spark SQL, Built-in Functions` page.

### How was this patch tested?

Run the example tests.

Closes #28251 from maropu/AddExtractExpr.

Authored-by: Takeshi Yamamuro 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   1 +
 .../catalyst/expressions/datetimeExpressions.scala |  85 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |   2 +-
 .../resources/sql-tests/results/extract.sql.out| 124 ++---
 .../sql-tests/results/postgreSQL/date.sql.out  |  52 -
 5 files changed, 153 insertions(+), 111 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index aba755c..7f879c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -423,6 +423,7 @@ object FunctionRegistry {
 expression[MakeTimestamp]("make_timestamp"),
 expression[MakeInterval]("make_interval"),
 expression[DatePart]("date_part"),
+expression[Extract]("extract"),
 
 // collection functions
 expression[CreateArray]("array"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 98536ca..da1152b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -2105,6 +2105,31 @@ object DatePart {
   }
 }
 
+object DatePartLike {
+
+  def toEquivalentExpr(field: Expression, source: Expression): Expression = {
+if (!field.foldable) {
+  throw new AnalysisException("The field parameter needs to be a foldable 
string value.")
+}
+val fieldEval = field.eval()
+if (fieldEval == null) {
+  Literal(null, DoubleType)
+} else {
+  val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
+  val errMsg = s"Literals of type '$fieldStr' are currently not supported 
" +
+s"for the ${source.dataType.catalogString} type."
+  if (source.dataType == CalendarIntervalType) {
+ExtractIntervalPart.parseExtractField(
+  fieldStr,
+  source,
+  throw new AnalysisException(errMsg))
+  } else {
+DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
+  }
+}
+  }
+}
+
 @ExpressionDescription(
   usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or 
interval source.",
   arguments = """
@@ -2158,28 +2183,8 @@ object DatePart {
 case class DatePart(field: Expression, source: Expression, child: Expression)
   extends RuntimeReplaceable {
 
-  def this(field: Expression, source: Expression) {
-this(field, source, {
-  if (!field.foldable) {
-throw new AnalysisException("The field parameter needs to be a 
foldable string value.")
-  }
-  val fieldEval = field.eval()
-  if (fieldEval == null) {
-Literal(null, DoubleType)
-  } else {
-val fieldStr = fieldEval.asInstanceOf[UTF8String].toString
-val errMsg = s"Literals of type '$fieldStr' are currently not 
supported " +
-  s"for the ${source.dataType.catalogString} type."
-if (source.dataType == CalendarIntervalType) {
-  ExtractIntervalPart.parseExtractField(
-fieldStr,
-source,
-throw new AnalysisException(errMsg))
-} else {
-  DatePart.parseExtractField(fieldStr, source, throw new 
AnalysisException(errMsg))
-}
-  }
-})
+  def this(field: Expression, source: Expression) = {
+this(field, source, 

[spark] branch branch-3.0 updated: [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 842fbed  [SPARK-31442][SQL] Print shuffle id at coalesce partitions 
target size
842fbed is described below

commit 842fbed1d43f2ce5224d1367a7891f636243382e
Author: ulysses 
AuthorDate: Sat Apr 18 09:27:44 2020 -0700

[SPARK-31442][SQL] Print shuffle id at coalesce partitions target size

### What changes were proposed in this pull request?

Minor change. Print shuffle id.

### Why are the changes needed?

Make log more clear.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Not need.

Closes #28211 from ulysses-you/print-shuffle-id.

Authored-by: ulysses 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 6c2bf8248a439951fe5157b34ad269cb31df0e16)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 208cc05..e10ed4f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -68,7 +68,9 @@ object ShufflePartitionsUtil extends Logging {
   math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
 val targetSize = math.min(maxTargetSize, advisoryTargetSize)
 
-logInfo(s"advisory target size: $advisoryTargetSize, actual target size 
$targetSize.")
+val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
+logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +
+  s"actual target size $targetSize.")
 
 // Make sure these shuffles have the same number of partitions.
 val distinctNumShufflePartitions =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 842fbed  [SPARK-31442][SQL] Print shuffle id at coalesce partitions 
target size
842fbed is described below

commit 842fbed1d43f2ce5224d1367a7891f636243382e
Author: ulysses 
AuthorDate: Sat Apr 18 09:27:44 2020 -0700

[SPARK-31442][SQL] Print shuffle id at coalesce partitions target size

### What changes were proposed in this pull request?

Minor change. Print shuffle id.

### Why are the changes needed?

Make log more clear.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Not need.

Closes #28211 from ulysses-you/print-shuffle-id.

Authored-by: ulysses 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 6c2bf8248a439951fe5157b34ad269cb31df0e16)
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 208cc05..e10ed4f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -68,7 +68,9 @@ object ShufflePartitionsUtil extends Logging {
   math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
 val targetSize = math.min(maxTargetSize, advisoryTargetSize)
 
-logInfo(s"advisory target size: $advisoryTargetSize, actual target size 
$targetSize.")
+val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")
+logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +
+  s"actual target size $targetSize.")
 
 // Make sure these shuffles have the same number of partitions.
 val distinctNumShufflePartitions =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6bf5f01 -> 6c2bf82)

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6bf5f01  [SPARK-31477][SQL] Dump codegen and compile time in 
BenchmarkQueryTest
 add 6c2bf82  [SPARK-31442][SQL] Print shuffle id at coalesce partitions 
target size

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Apply appropriate RPC handler to receive, receiveStream when auth enabled

2020-04-18 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 9416b7c  Apply appropriate RPC handler to receive, receiveStream when 
auth enabled
9416b7c is described below

commit 9416b7c54bdf5613c1a65e6d1779a87591c6c9bd
Author: Sean Owen 
AuthorDate: Fri Apr 17 13:25:12 2020 -0500

Apply appropriate RPC handler to receive, receiveStream when auth enabled
---
 .../spark/network/crypto/AuthRpcHandler.java   |  73 +++---
 .../apache/spark/network/sasl/SaslRpcHandler.java  |  60 +++-
 .../network/server/AbstractAuthRpcHandler.java | 107 +
 .../spark/network/crypto/AuthIntegrationSuite.java |  12 +--
 .../apache/spark/network/sasl/SparkSaslSuite.java  |   3 +-
 5 files changed, 142 insertions(+), 113 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 821cc7a..dd31c95 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -29,12 +29,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.StreamCallbackWithID;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.server.AbstractAuthRpcHandler;
 import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.util.TransportConf;
 
 /**
@@ -46,7 +45,7 @@ import org.apache.spark.network.util.TransportConf;
  * The delegate will only receive messages if the given connection has been 
successfully
  * authenticated. A connection may be authenticated at most once.
  */
-class AuthRpcHandler extends RpcHandler {
+class AuthRpcHandler extends AbstractAuthRpcHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(AuthRpcHandler.class);
 
   /** Transport configuration. */
@@ -55,36 +54,31 @@ class AuthRpcHandler extends RpcHandler {
   /** The client channel. */
   private final Channel channel;
 
-  /**
-   * RpcHandler we will delegate to for authenticated connections. When 
falling back to SASL
-   * this will be replaced with the SASL RPC handler.
-   */
-  @VisibleForTesting
-  RpcHandler delegate;
-
   /** Class which provides secret keys which are shared by server and client 
on a per-app basis. */
   private final SecretKeyHolder secretKeyHolder;
 
-  /** Whether auth is done and future calls should be delegated. */
+  /** RPC handler for auth handshake when falling back to SASL auth. */
   @VisibleForTesting
-  boolean doDelegate;
+  SaslRpcHandler saslHandler;
 
   AuthRpcHandler(
   TransportConf conf,
   Channel channel,
   RpcHandler delegate,
   SecretKeyHolder secretKeyHolder) {
+super(delegate);
 this.conf = conf;
 this.channel = channel;
-this.delegate = delegate;
 this.secretKeyHolder = secretKeyHolder;
   }
 
   @Override
-  public void receive(TransportClient client, ByteBuffer message, 
RpcResponseCallback callback) {
-if (doDelegate) {
-  delegate.receive(client, message, callback);
-  return;
+  protected boolean doAuthChallenge(
+  TransportClient client,
+  ByteBuffer message,
+  RpcResponseCallback callback) {
+if (saslHandler != null) {
+  return saslHandler.doAuthChallenge(client, message, callback);
 }
 
 int position = message.position();
@@ -98,18 +92,17 @@ class AuthRpcHandler extends RpcHandler {
   if (conf.saslFallback()) {
 LOG.warn("Failed to parse new auth challenge, reverting to SASL for 
client {}.",
   channel.remoteAddress());
-delegate = new SaslRpcHandler(conf, channel, delegate, 
secretKeyHolder);
+saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder);
 message.position(position);
 message.limit(limit);
-delegate.receive(client, message, callback);
-doDelegate = true;
+return saslHandler.doAuthChallenge(client, message, callback);
   } else {
 LOG.debug("Unexpected challenge message from client {}, closing 
channel.",
   channel.remoteAddress());
 callback.onFailure(new IllegalArgumentException("Unknown challenge 
message."));
 channel.close();
   }
-  return;
+  return false;
 }
 
 // Here we have the client challenge, so perform the new auth protocol and 
set up the channel.
@@ -131,7 +124,7 @@ class AuthRpcHandler 

[spark] branch branch-3.0 updated: Apply appropriate RPC handler to receive, receiveStream when auth enabled

2020-04-18 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new c80d5f7  Apply appropriate RPC handler to receive, receiveStream when 
auth enabled
c80d5f7 is described below

commit c80d5f7aa0fd9e7b37e1bf4175204750098a44a6
Author: Sean Owen 
AuthorDate: Fri Apr 17 13:25:12 2020 -0500

Apply appropriate RPC handler to receive, receiveStream when auth enabled
---
 .../spark/network/crypto/AuthRpcHandler.java   |  73 +++---
 .../apache/spark/network/sasl/SaslRpcHandler.java  |  60 +++-
 .../network/server/AbstractAuthRpcHandler.java | 107 +
 .../spark/network/crypto/AuthIntegrationSuite.java |  12 +--
 .../apache/spark/network/sasl/SparkSaslSuite.java  |   3 +-
 5 files changed, 142 insertions(+), 113 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 821cc7a..dd31c95 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -29,12 +29,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.StreamCallbackWithID;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.sasl.SaslRpcHandler;
+import org.apache.spark.network.server.AbstractAuthRpcHandler;
 import org.apache.spark.network.server.RpcHandler;
-import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.util.TransportConf;
 
 /**
@@ -46,7 +45,7 @@ import org.apache.spark.network.util.TransportConf;
  * The delegate will only receive messages if the given connection has been 
successfully
  * authenticated. A connection may be authenticated at most once.
  */
-class AuthRpcHandler extends RpcHandler {
+class AuthRpcHandler extends AbstractAuthRpcHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(AuthRpcHandler.class);
 
   /** Transport configuration. */
@@ -55,36 +54,31 @@ class AuthRpcHandler extends RpcHandler {
   /** The client channel. */
   private final Channel channel;
 
-  /**
-   * RpcHandler we will delegate to for authenticated connections. When 
falling back to SASL
-   * this will be replaced with the SASL RPC handler.
-   */
-  @VisibleForTesting
-  RpcHandler delegate;
-
   /** Class which provides secret keys which are shared by server and client 
on a per-app basis. */
   private final SecretKeyHolder secretKeyHolder;
 
-  /** Whether auth is done and future calls should be delegated. */
+  /** RPC handler for auth handshake when falling back to SASL auth. */
   @VisibleForTesting
-  boolean doDelegate;
+  SaslRpcHandler saslHandler;
 
   AuthRpcHandler(
   TransportConf conf,
   Channel channel,
   RpcHandler delegate,
   SecretKeyHolder secretKeyHolder) {
+super(delegate);
 this.conf = conf;
 this.channel = channel;
-this.delegate = delegate;
 this.secretKeyHolder = secretKeyHolder;
   }
 
   @Override
-  public void receive(TransportClient client, ByteBuffer message, 
RpcResponseCallback callback) {
-if (doDelegate) {
-  delegate.receive(client, message, callback);
-  return;
+  protected boolean doAuthChallenge(
+  TransportClient client,
+  ByteBuffer message,
+  RpcResponseCallback callback) {
+if (saslHandler != null) {
+  return saslHandler.doAuthChallenge(client, message, callback);
 }
 
 int position = message.position();
@@ -98,18 +92,17 @@ class AuthRpcHandler extends RpcHandler {
   if (conf.saslFallback()) {
 LOG.warn("Failed to parse new auth challenge, reverting to SASL for 
client {}.",
   channel.remoteAddress());
-delegate = new SaslRpcHandler(conf, channel, delegate, 
secretKeyHolder);
+saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder);
 message.position(position);
 message.limit(limit);
-delegate.receive(client, message, callback);
-doDelegate = true;
+return saslHandler.doAuthChallenge(client, message, callback);
   } else {
 LOG.debug("Unexpected challenge message from client {}, closing 
channel.",
   channel.remoteAddress());
 callback.onFailure(new IllegalArgumentException("Unknown challenge 
message."));
 channel.close();
   }
-  return;
+  return false;
 }
 
 // Here we have the client challenge, so perform the new auth protocol and 
set up the channel.
@@ -131,7 +124,7 @@ class AuthRpcHandler 

[spark] branch master updated: [SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest

2020-04-18 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6bf5f01  [SPARK-31477][SQL] Dump codegen and compile time in 
BenchmarkQueryTest
6bf5f01 is described below

commit 6bf5f01a4a8b7708ce563e0a0e9a49e8ff89c71e
Author: gatorsmile 
AuthorDate: Sat Apr 18 20:59:45 2020 +0900

[SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest

### What changes were proposed in this pull request?
This PR is to dump the codegen and compilation time for benchmark query 
tests.

### Why are the changes needed?
Measure the codegen and compilation time costs in TPC-DS queries

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Manual test in my local laptop:
```
23:13:12.845 WARN org.apache.spark.sql.TPCDSQuerySuite:
=== Metrics of Whole-stage Codegen ===
Total code generation time: 21.275102261 seconds
Total compilation time: 12.223771828 seconds
```

Closes #28252 from gatorsmile/testMastercode.

Authored-by: gatorsmile 
Signed-off-by: Takeshi Yamamuro 
---
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala|  2 +-
 .../apache/spark/sql/execution/WholeStageCodegenExec.scala  |  2 +-
 .../scala/org/apache/spark/sql/BenchmarkQueryTest.scala | 13 +
 .../test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 13 +++--
 4 files changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 3042a27..1cc7836 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1324,7 +1324,7 @@ object CodeGenerator extends Logging {
 
   // Reset compile time.
   // Visible for testing
-  def resetCompileTime: Unit = _compileTime.reset()
+  def resetCompileTime(): Unit = _compileTime.reset()
 
   /**
* Compile the Java source code into a Java class, using Janino.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 9f6e4fc..0244542 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -586,7 +586,7 @@ object WholeStageCodegenExec {
 
   // Reset generation time of Java source code.
   // Visible for testing
-  def resetCodeGenTime: Unit = _codeGenTime.set(0L)
+  def resetCodeGenTime(): Unit = _codeGenTime.set(0L)
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
index 07afd41..2c3b37a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, 
CodeFormatter, CodeGenerator}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
 import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.Utils
@@ -36,7 +37,17 @@ abstract class BenchmarkQueryTest extends QueryTest with 
SharedSparkSession {
   protected override def afterAll(): Unit = {
 try {
   // For debugging dump some statistics about how much time was spent in 
various optimizer rules
+  // code generation, and compilation.
   logWarning(RuleExecutor.dumpTimeSpent())
+  val codeGenTime = WholeStageCodegenExec.codeGenTime.toDouble / 
NANOS_PER_SECOND
+  val compileTime = CodeGenerator.compileTime.toDouble / NANOS_PER_SECOND
+  val codegenInfo =
+s"""
+   |=== Metrics of Whole-stage Codegen ===
+   |Total code generation time: $codeGenTime seconds
+   |Total compile time: $compileTime seconds
+ """.stripMargin
+  logWarning(codegenInfo)
   spark.sessionState.catalog.reset()
 } finally {
   super.afterAll()
@@ -46,6 +57,8 @@ abstract class BenchmarkQueryTest extends QueryTest with 
SharedSparkSession {
   override def beforeAll(): Unit = {
 super.beforeAll()
 RuleExecutor.resetMetrics()
+CodeGenerator.resetCompileTime()
+

[spark] branch branch-3.0 updated: [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 29f59623 [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields 
from ExtractBenchmark
29f59623 is described below

commit 29f5962372a1ac1d2101d79bf33c53e9ece43f3b
Author: Kent Yao 
AuthorDate: Sat Apr 18 00:32:42 2020 -0700

[SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from 
ExtractBenchmark

### What changes were proposed in this pull request?

In 
https://github.com/apache/spark/commit/697083c051304a5ac4f9c56a63274f2103caaef4,
 we remove  "MILLENNIUM", "CENTURY", "DECADE",  "QUARTER", "MILLISECONDS", 
"MICROSECONDS", "EPOCH" field for date_part and extract expression, this PR fix 
the related Benchmark.
### Why are the changes needed?

test fix.

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

passing Jenkins

Closes #28249 from yaooqinn/SPARK-31469-F.

Authored-by: Kent Yao 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 77cb7cde0d657967f619287a792a848d1f42adf5)
Signed-off-by: Dongjoon Hyun 
---
 .../benchmarks/ExtractBenchmark-jdk11-results.txt  | 214 ++---
 sql/core/benchmarks/ExtractBenchmark-results.txt   | 214 ++---
 .../sql/execution/benchmark/ExtractBenchmark.scala |   6 +-
 3 files changed, 201 insertions(+), 233 deletions(-)

diff --git a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt 
b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
index 8c56c0b..41bb42c 100644
--- a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
+++ b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
@@ -1,138 +1,124 @@
-Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3
+Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4
 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
 Invoke extract for timestamp: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-cast to timestamp   311331 
 18 32.2  31.1   1.0X
-MILLENNIUM of timestamp 873893 
 17 11.4  87.3   0.4X
-CENTURY of timestamp869873 
  5 11.5  86.9   0.4X
-DECADE of timestamp 851872 
 23 11.7  85.1   0.4X
-YEAR of timestamp   841856 
 14 11.9  84.1   0.4X
-ISOYEAR of timestamp927938 
 12 10.8  92.7   0.3X
-QUARTER of timestamp959963 
  6 10.4  95.9   0.3X
-MONTH of timestamp  852864 
 18 11.7  85.2   0.4X
-WEEK of timestamp  1124   1252 
112  8.9 112.4   0.3X
-DAY of timestamp848867 
 19 11.8  84.8   0.4X
-DAYOFWEEK of timestamp  977987 
 16 10.2  97.7   0.3X
-DOW of timestamp945964 
 18 10.6  94.5   0.3X
-ISODOW of timestamp 924929 
  5 10.8  92.4   0.3X
-DOY of timestamp852906 
 67 11.7  85.2   0.4X
-HOUR of timestamp   665671 
  5 15.0  66.5   0.5X
-MINUTE of timestamp 655670 
 15 15.3  65.5   0.5X
-SECOND of timestamp 757763 
  7 13.2  75.7   0.4X
-MILLISECONDS of timestamp   745761 
 14 13.4  74.5   0.4X
-MICROSECONDS of timestamp   691697 
  7 14.5  69.1   0.5X
-EPOCH of timestamp  794806 
 12 12.6  79.4   0.4X
+cast to timestamp   322327 
  4 31.1  32.2   1.0X
+MILLENNIUM of timestamp 834841 

[spark] branch master updated: [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 77cb7cd  [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields 
from ExtractBenchmark
77cb7cd is described below

commit 77cb7cde0d657967f619287a792a848d1f42adf5
Author: Kent Yao 
AuthorDate: Sat Apr 18 00:32:42 2020 -0700

[SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from 
ExtractBenchmark

### What changes were proposed in this pull request?

In 
https://github.com/apache/spark/commit/697083c051304a5ac4f9c56a63274f2103caaef4,
 we remove  "MILLENNIUM", "CENTURY", "DECADE",  "QUARTER", "MILLISECONDS", 
"MICROSECONDS", "EPOCH" field for date_part and extract expression, this PR fix 
the related Benchmark.
### Why are the changes needed?

test fix.

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

passing Jenkins

Closes #28249 from yaooqinn/SPARK-31469-F.

Authored-by: Kent Yao 
Signed-off-by: Dongjoon Hyun 
---
 .../benchmarks/ExtractBenchmark-jdk11-results.txt  | 214 ++---
 sql/core/benchmarks/ExtractBenchmark-results.txt   | 214 ++---
 .../sql/execution/benchmark/ExtractBenchmark.scala |   6 +-
 3 files changed, 201 insertions(+), 233 deletions(-)

diff --git a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt 
b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
index 8c56c0b..41bb42c 100644
--- a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
+++ b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt
@@ -1,138 +1,124 @@
-Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3
+Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4
 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
 Invoke extract for timestamp: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-cast to timestamp   311331 
 18 32.2  31.1   1.0X
-MILLENNIUM of timestamp 873893 
 17 11.4  87.3   0.4X
-CENTURY of timestamp869873 
  5 11.5  86.9   0.4X
-DECADE of timestamp 851872 
 23 11.7  85.1   0.4X
-YEAR of timestamp   841856 
 14 11.9  84.1   0.4X
-ISOYEAR of timestamp927938 
 12 10.8  92.7   0.3X
-QUARTER of timestamp959963 
  6 10.4  95.9   0.3X
-MONTH of timestamp  852864 
 18 11.7  85.2   0.4X
-WEEK of timestamp  1124   1252 
112  8.9 112.4   0.3X
-DAY of timestamp848867 
 19 11.8  84.8   0.4X
-DAYOFWEEK of timestamp  977987 
 16 10.2  97.7   0.3X
-DOW of timestamp945964 
 18 10.6  94.5   0.3X
-ISODOW of timestamp 924929 
  5 10.8  92.4   0.3X
-DOY of timestamp852906 
 67 11.7  85.2   0.4X
-HOUR of timestamp   665671 
  5 15.0  66.5   0.5X
-MINUTE of timestamp 655670 
 15 15.3  65.5   0.5X
-SECOND of timestamp 757763 
  7 13.2  75.7   0.4X
-MILLISECONDS of timestamp   745761 
 14 13.4  74.5   0.4X
-MICROSECONDS of timestamp   691697 
  7 14.5  69.1   0.5X
-EPOCH of timestamp  794806 
 12 12.6  79.4   0.4X
+cast to timestamp   322327 
  4 31.1  32.2   1.0X
+MILLENNIUM of timestamp 834841 
  8 12.0  83.4   0.4X
+CENTURY of timestamp828841   

[spark] branch branch-3.0 updated: [SPARK-31473][SQL] AQE should set active session during execution

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e1ed80a  [SPARK-31473][SQL] AQE should set active session during 
execution
e1ed80a is described below

commit e1ed80a134a2d613a9221a15847cad3274cd6f2b
Author: Maryann Xue 
AuthorDate: Sat Apr 18 00:08:36 2020 -0700

[SPARK-31473][SQL] AQE should set active session during execution

### What changes were proposed in this pull request?

AQE creates new SparkPlan nodes during execution. This PR makes sure that 
the active session is set correctly during this process and AQE execution is 
not disrupted by external session change.

### Why are the changes needed?

To prevent potential errors. If not changed, the physical plans generated 
by AQE would have the wrong SparkSession or even null SparkSession, which could 
lead to NPE.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #28247 from maryannxue/aqe-activesession.

Authored-by: Maryann Xue 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 6198f384054e7f86521891ceeb1a231f449a16a8)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala  |  9 +++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 -
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index b54a32f..2b46724 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-if (!isFinalPlan) {
+if (isFinalPlan) return currentPhysicalPlan
+
+// In case of this adaptive plan being executed out of `withActive` scoped 
functions, e.g.,
+// `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
+// created in the middle of the execution.
+context.session.withActive {
   // Subqueries do not have their own execution IDs and therefore rely on 
the main query to
   // update UI.
   val executionId = Option(context.session.sparkContext.getLocalProperty(
@@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec(
   isFinalPlan = true
   executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
   logOnLevel(s"Final plan: $currentPhysicalPlan")
+  currentPhysicalPlan
 }
-currentPhysicalPlan
   }
 
   override def executeCollect(): Array[InternalRow] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 64dd9aa..6da510f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,13 +23,14 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, 
SparkPlan}
 import 
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BuildRight, SortMergeJoinExec}
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -805,4 +806,14 @@ class AdaptiveQueryExecSuite
   
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
 }
   }
+
+  test("AQE should set active session during execution") {
+withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+  val df = spark.range(10).select(sum('id))
+  
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+  SparkSession.setActiveSession(null)
+  

[spark] branch master updated (142f436 -> 6198f38)

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 142f436  [SPARK-31390][SQL][DOCS] Document Window Function in SQL 
Syntax Section
 add 6198f38  [SPARK-31473][SQL] AQE should set active session during 
execution

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala  |  9 +++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 -
 2 files changed, 19 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-31473][SQL] AQE should set active session during execution

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e1ed80a  [SPARK-31473][SQL] AQE should set active session during 
execution
e1ed80a is described below

commit e1ed80a134a2d613a9221a15847cad3274cd6f2b
Author: Maryann Xue 
AuthorDate: Sat Apr 18 00:08:36 2020 -0700

[SPARK-31473][SQL] AQE should set active session during execution

### What changes were proposed in this pull request?

AQE creates new SparkPlan nodes during execution. This PR makes sure that 
the active session is set correctly during this process and AQE execution is 
not disrupted by external session change.

### Why are the changes needed?

To prevent potential errors. If not changed, the physical plans generated 
by AQE would have the wrong SparkSession or even null SparkSession, which could 
lead to NPE.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #28247 from maryannxue/aqe-activesession.

Authored-by: Maryann Xue 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 6198f384054e7f86521891ceeb1a231f449a16a8)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala  |  9 +++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 -
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index b54a32f..2b46724 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-if (!isFinalPlan) {
+if (isFinalPlan) return currentPhysicalPlan
+
+// In case of this adaptive plan being executed out of `withActive` scoped 
functions, e.g.,
+// `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
+// created in the middle of the execution.
+context.session.withActive {
   // Subqueries do not have their own execution IDs and therefore rely on 
the main query to
   // update UI.
   val executionId = Option(context.session.sparkContext.getLocalProperty(
@@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec(
   isFinalPlan = true
   executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
   logOnLevel(s"Final plan: $currentPhysicalPlan")
+  currentPhysicalPlan
 }
-currentPhysicalPlan
   }
 
   override def executeCollect(): Array[InternalRow] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 64dd9aa..6da510f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,13 +23,14 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, 
SparkPlan}
 import 
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BuildRight, SortMergeJoinExec}
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -805,4 +806,14 @@ class AdaptiveQueryExecSuite
   
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
 }
   }
+
+  test("AQE should set active session during execution") {
+withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+  val df = spark.range(10).select(sum('id))
+  
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+  SparkSession.setActiveSession(null)
+  

[spark] branch master updated: [SPARK-31473][SQL] AQE should set active session during execution

2020-04-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6198f38  [SPARK-31473][SQL] AQE should set active session during 
execution
6198f38 is described below

commit 6198f384054e7f86521891ceeb1a231f449a16a8
Author: Maryann Xue 
AuthorDate: Sat Apr 18 00:08:36 2020 -0700

[SPARK-31473][SQL] AQE should set active session during execution

### What changes were proposed in this pull request?

AQE creates new SparkPlan nodes during execution. This PR makes sure that 
the active session is set correctly during this process and AQE execution is 
not disrupted by external session change.

### Why are the changes needed?

To prevent potential errors. If not changed, the physical plans generated 
by AQE would have the wrong SparkSession or even null SparkSession, which could 
lead to NPE.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #28247 from maryannxue/aqe-activesession.

Authored-by: Maryann Xue 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala  |  9 +++--
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 -
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 217817e..3ac4ea5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-if (!isFinalPlan) {
+if (isFinalPlan) return currentPhysicalPlan
+
+// In case of this adaptive plan being executed out of `withActive` scoped 
functions, e.g.,
+// `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
+// created in the middle of the execution.
+context.session.withActive {
   // Subqueries do not have their own execution IDs and therefore rely on 
the main query to
   // update UI.
   val executionId = Option(context.session.sparkContext.getLocalProperty(
@@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec(
   isFinalPlan = true
   executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
   logOnLevel(s"Final plan: $currentPhysicalPlan")
+  currentPhysicalPlan
 }
-currentPhysicalPlan
   }
 
   override def executeCollect(): Array[InternalRow] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 35dec44..694be98 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,12 +23,13 @@ import java.net.URI
 import org.apache.log4j.Level
 
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, 
SparkPlan}
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, 
BuildRight, SortMergeJoinExec}
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -907,4 +908,14 @@ class AdaptiveQueryExecSuite
   
assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
 }
   }
+
+  test("AQE should set active session during execution") {
+withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+  val df = spark.range(10).select(sum('id))
+  
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+  SparkSession.setActiveSession(null)
+  checkAnswer(df, Seq(Row(45)))
+  SparkSession.setActiveSession(spark) // recover the active session.
+}
+  }
 }


-
To unsubscribe, e-mail: