[spark] branch branch-3.0 updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 43b5116 [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT 43b5116 is described below commit 43b511644a0fd0d01f190dc137884fc1081c1933 Author: Takeshi Yamamuro AuthorDate: Sat Apr 18 13:37:12 2020 -0700 [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT ### What changes were proposed in this pull request? This PR intends to add an ExpressionInfo entry for EXTRACT for better documentations. This PR comes from the comment in https://github.com/apache/spark/pull/21479#discussion_r409900080 ### Why are the changes needed? To make SQL documentations complete. ### Does this PR introduce any user-facing change? Yes, this PR updates the `Spark SQL, Built-in Functions` page. ### How was this patch tested? Run the example tests. Closes #28251 from maropu/AddExtractExpr. Authored-by: Takeshi Yamamuro Signed-off-by: Dongjoon Hyun (cherry picked from commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40) Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/datetimeExpressions.scala | 85 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../resources/sql-tests/results/extract.sql.out| 124 ++--- .../sql-tests/results/postgreSQL/date.sql.out | 52 - 5 files changed, 153 insertions(+), 111 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index c11186e..341a335 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -423,6 +423,7 @@ object FunctionRegistry { expression[MakeTimestamp]("make_timestamp"), expression[MakeInterval]("make_interval"), expression[DatePart]("date_part"), +expression[Extract]("extract"), // collection functions expression[CreateArray]("array"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 391e0d0..323aa3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2103,6 +2103,31 @@ object DatePart { } } +object DatePartLike { + + def toEquivalentExpr(field: Expression, source: Expression): Expression = { +if (!field.foldable) { + throw new AnalysisException("The field parameter needs to be a foldable string value.") +} +val fieldEval = field.eval() +if (fieldEval == null) { + Literal(null, DoubleType) +} else { + val fieldStr = fieldEval.asInstanceOf[UTF8String].toString + val errMsg = s"Literals of type '$fieldStr' are currently not supported " + +s"for the ${source.dataType.catalogString} type." + if (source.dataType == CalendarIntervalType) { +ExtractIntervalPart.parseExtractField( + fieldStr, + source, + throw new AnalysisException(errMsg)) + } else { +DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) + } +} + } +} + @ExpressionDescription( usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or interval source.", arguments = """ @@ -2156,28 +2181,8 @@ object DatePart { case class DatePart(field: Expression, source: Expression, child: Expression) extends RuntimeReplaceable { - def this(field: Expression, source: Expression) { -this(field, source, { - if (!field.foldable) { -throw new AnalysisException("The field parameter needs to be a foldable string value.") - } - val fieldEval = field.eval() - if (fieldEval == null) { -Literal(null, DoubleType) - } else { -val fieldStr = fieldEval.asInstanceOf[UTF8String].toString -val errMsg = s"Literals of type '$fieldStr' are currently not supported " + - s"for the ${source.dataType.catalogString} type." -if (source.dataType == CalendarIntervalType) { - ExtractIntervalPart.parseExtractField( -fieldStr, -source, -throw new AnalysisException(errMsg)) -} else { - DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) -} - } -})
[spark] branch branch-3.0 updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 43b5116 [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT 43b5116 is described below commit 43b511644a0fd0d01f190dc137884fc1081c1933 Author: Takeshi Yamamuro AuthorDate: Sat Apr 18 13:37:12 2020 -0700 [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT ### What changes were proposed in this pull request? This PR intends to add an ExpressionInfo entry for EXTRACT for better documentations. This PR comes from the comment in https://github.com/apache/spark/pull/21479#discussion_r409900080 ### Why are the changes needed? To make SQL documentations complete. ### Does this PR introduce any user-facing change? Yes, this PR updates the `Spark SQL, Built-in Functions` page. ### How was this patch tested? Run the example tests. Closes #28251 from maropu/AddExtractExpr. Authored-by: Takeshi Yamamuro Signed-off-by: Dongjoon Hyun (cherry picked from commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40) Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/datetimeExpressions.scala | 85 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../resources/sql-tests/results/extract.sql.out| 124 ++--- .../sql-tests/results/postgreSQL/date.sql.out | 52 - 5 files changed, 153 insertions(+), 111 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index c11186e..341a335 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -423,6 +423,7 @@ object FunctionRegistry { expression[MakeTimestamp]("make_timestamp"), expression[MakeInterval]("make_interval"), expression[DatePart]("date_part"), +expression[Extract]("extract"), // collection functions expression[CreateArray]("array"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 391e0d0..323aa3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2103,6 +2103,31 @@ object DatePart { } } +object DatePartLike { + + def toEquivalentExpr(field: Expression, source: Expression): Expression = { +if (!field.foldable) { + throw new AnalysisException("The field parameter needs to be a foldable string value.") +} +val fieldEval = field.eval() +if (fieldEval == null) { + Literal(null, DoubleType) +} else { + val fieldStr = fieldEval.asInstanceOf[UTF8String].toString + val errMsg = s"Literals of type '$fieldStr' are currently not supported " + +s"for the ${source.dataType.catalogString} type." + if (source.dataType == CalendarIntervalType) { +ExtractIntervalPart.parseExtractField( + fieldStr, + source, + throw new AnalysisException(errMsg)) + } else { +DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) + } +} + } +} + @ExpressionDescription( usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or interval source.", arguments = """ @@ -2156,28 +2181,8 @@ object DatePart { case class DatePart(field: Expression, source: Expression, child: Expression) extends RuntimeReplaceable { - def this(field: Expression, source: Expression) { -this(field, source, { - if (!field.foldable) { -throw new AnalysisException("The field parameter needs to be a foldable string value.") - } - val fieldEval = field.eval() - if (fieldEval == null) { -Literal(null, DoubleType) - } else { -val fieldStr = fieldEval.asInstanceOf[UTF8String].toString -val errMsg = s"Literals of type '$fieldStr' are currently not supported " + - s"for the ${source.dataType.catalogString} type." -if (source.dataType == CalendarIntervalType) { - ExtractIntervalPart.parseExtractField( -fieldStr, -source, -throw new AnalysisException(errMsg)) -} else { - DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) -} - } -})
[spark] branch master updated (6c2bf82 -> 74aed8c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6c2bf82 [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size add 74aed8c [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/datetimeExpressions.scala | 85 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../resources/sql-tests/results/extract.sql.out| 124 ++--- .../sql-tests/results/postgreSQL/date.sql.out | 52 - 5 files changed, 153 insertions(+), 111 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 74aed8c [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT 74aed8c is described below commit 74aed8cc8b94cb459ff12a6e8f1f15cb7aea8c40 Author: Takeshi Yamamuro AuthorDate: Sat Apr 18 13:37:12 2020 -0700 [SPARK-31476][SQL] Add an ExpressionInfo entry for EXTRACT ### What changes were proposed in this pull request? This PR intends to add an ExpressionInfo entry for EXTRACT for better documentations. This PR comes from the comment in https://github.com/apache/spark/pull/21479#discussion_r409900080 ### Why are the changes needed? To make SQL documentations complete. ### Does this PR introduce any user-facing change? Yes, this PR updates the `Spark SQL, Built-in Functions` page. ### How was this patch tested? Run the example tests. Closes #28251 from maropu/AddExtractExpr. Authored-by: Takeshi Yamamuro Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../catalyst/expressions/datetimeExpressions.scala | 85 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../resources/sql-tests/results/extract.sql.out| 124 ++--- .../sql-tests/results/postgreSQL/date.sql.out | 52 - 5 files changed, 153 insertions(+), 111 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index aba755c..7f879c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -423,6 +423,7 @@ object FunctionRegistry { expression[MakeTimestamp]("make_timestamp"), expression[MakeInterval]("make_interval"), expression[DatePart]("date_part"), +expression[Extract]("extract"), // collection functions expression[CreateArray]("array"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 98536ca..da1152b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2105,6 +2105,31 @@ object DatePart { } } +object DatePartLike { + + def toEquivalentExpr(field: Expression, source: Expression): Expression = { +if (!field.foldable) { + throw new AnalysisException("The field parameter needs to be a foldable string value.") +} +val fieldEval = field.eval() +if (fieldEval == null) { + Literal(null, DoubleType) +} else { + val fieldStr = fieldEval.asInstanceOf[UTF8String].toString + val errMsg = s"Literals of type '$fieldStr' are currently not supported " + +s"for the ${source.dataType.catalogString} type." + if (source.dataType == CalendarIntervalType) { +ExtractIntervalPart.parseExtractField( + fieldStr, + source, + throw new AnalysisException(errMsg)) + } else { +DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) + } +} + } +} + @ExpressionDescription( usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or interval source.", arguments = """ @@ -2158,28 +2183,8 @@ object DatePart { case class DatePart(field: Expression, source: Expression, child: Expression) extends RuntimeReplaceable { - def this(field: Expression, source: Expression) { -this(field, source, { - if (!field.foldable) { -throw new AnalysisException("The field parameter needs to be a foldable string value.") - } - val fieldEval = field.eval() - if (fieldEval == null) { -Literal(null, DoubleType) - } else { -val fieldStr = fieldEval.asInstanceOf[UTF8String].toString -val errMsg = s"Literals of type '$fieldStr' are currently not supported " + - s"for the ${source.dataType.catalogString} type." -if (source.dataType == CalendarIntervalType) { - ExtractIntervalPart.parseExtractField( -fieldStr, -source, -throw new AnalysisException(errMsg)) -} else { - DatePart.parseExtractField(fieldStr, source, throw new AnalysisException(errMsg)) -} - } -}) + def this(field: Expression, source: Expression) = { +this(field, source,
[spark] branch branch-3.0 updated: [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 842fbed [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size 842fbed is described below commit 842fbed1d43f2ce5224d1367a7891f636243382e Author: ulysses AuthorDate: Sat Apr 18 09:27:44 2020 -0700 [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size ### What changes were proposed in this pull request? Minor change. Print shuffle id. ### Why are the changes needed? Make log more clear. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Not need. Closes #28211 from ulysses-you/print-shuffle-id. Authored-by: ulysses Signed-off-by: Dongjoon Hyun (cherry picked from commit 6c2bf8248a439951fe5157b34ad269cb31df0e16) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 208cc05..e10ed4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -68,7 +68,9 @@ object ShufflePartitionsUtil extends Logging { math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16) val targetSize = math.min(maxTargetSize, advisoryTargetSize) -logInfo(s"advisory target size: $advisoryTargetSize, actual target size $targetSize.") +val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ") +logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + + s"actual target size $targetSize.") // Make sure these shuffles have the same number of partitions. val distinctNumShufflePartitions = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 842fbed [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size 842fbed is described below commit 842fbed1d43f2ce5224d1367a7891f636243382e Author: ulysses AuthorDate: Sat Apr 18 09:27:44 2020 -0700 [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size ### What changes were proposed in this pull request? Minor change. Print shuffle id. ### Why are the changes needed? Make log more clear. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Not need. Closes #28211 from ulysses-you/print-shuffle-id. Authored-by: ulysses Signed-off-by: Dongjoon Hyun (cherry picked from commit 6c2bf8248a439951fe5157b34ad269cb31df0e16) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 208cc05..e10ed4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -68,7 +68,9 @@ object ShufflePartitionsUtil extends Logging { math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16) val targetSize = math.min(maxTargetSize, advisoryTargetSize) -logInfo(s"advisory target size: $advisoryTargetSize, actual target size $targetSize.") +val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ") +logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " + + s"actual target size $targetSize.") // Make sure these shuffles have the same number of partitions. val distinctNumShufflePartitions = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6bf5f01 -> 6c2bf82)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6bf5f01 [SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest add 6c2bf82 [SPARK-31442][SQL] Print shuffle id at coalesce partitions target size No new revisions were added by this update. Summary of changes: .../apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: Apply appropriate RPC handler to receive, receiveStream when auth enabled
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9416b7c Apply appropriate RPC handler to receive, receiveStream when auth enabled 9416b7c is described below commit 9416b7c54bdf5613c1a65e6d1779a87591c6c9bd Author: Sean Owen AuthorDate: Fri Apr 17 13:25:12 2020 -0500 Apply appropriate RPC handler to receive, receiveStream when auth enabled --- .../spark/network/crypto/AuthRpcHandler.java | 73 +++--- .../apache/spark/network/sasl/SaslRpcHandler.java | 60 +++- .../network/server/AbstractAuthRpcHandler.java | 107 + .../spark/network/crypto/AuthIntegrationSuite.java | 12 +-- .../apache/spark/network/sasl/SparkSaslSuite.java | 3 +- 5 files changed, 142 insertions(+), 113 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 821cc7a..dd31c95 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -29,12 +29,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.StreamCallbackWithID; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.sasl.SaslRpcHandler; +import org.apache.spark.network.server.AbstractAuthRpcHandler; import org.apache.spark.network.server.RpcHandler; -import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.util.TransportConf; /** @@ -46,7 +45,7 @@ import org.apache.spark.network.util.TransportConf; * The delegate will only receive messages if the given connection has been successfully * authenticated. A connection may be authenticated at most once. */ -class AuthRpcHandler extends RpcHandler { +class AuthRpcHandler extends AbstractAuthRpcHandler { private static final Logger LOG = LoggerFactory.getLogger(AuthRpcHandler.class); /** Transport configuration. */ @@ -55,36 +54,31 @@ class AuthRpcHandler extends RpcHandler { /** The client channel. */ private final Channel channel; - /** - * RpcHandler we will delegate to for authenticated connections. When falling back to SASL - * this will be replaced with the SASL RPC handler. - */ - @VisibleForTesting - RpcHandler delegate; - /** Class which provides secret keys which are shared by server and client on a per-app basis. */ private final SecretKeyHolder secretKeyHolder; - /** Whether auth is done and future calls should be delegated. */ + /** RPC handler for auth handshake when falling back to SASL auth. */ @VisibleForTesting - boolean doDelegate; + SaslRpcHandler saslHandler; AuthRpcHandler( TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder) { +super(delegate); this.conf = conf; this.channel = channel; -this.delegate = delegate; this.secretKeyHolder = secretKeyHolder; } @Override - public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) { -if (doDelegate) { - delegate.receive(client, message, callback); - return; + protected boolean doAuthChallenge( + TransportClient client, + ByteBuffer message, + RpcResponseCallback callback) { +if (saslHandler != null) { + return saslHandler.doAuthChallenge(client, message, callback); } int position = message.position(); @@ -98,18 +92,17 @@ class AuthRpcHandler extends RpcHandler { if (conf.saslFallback()) { LOG.warn("Failed to parse new auth challenge, reverting to SASL for client {}.", channel.remoteAddress()); -delegate = new SaslRpcHandler(conf, channel, delegate, secretKeyHolder); +saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder); message.position(position); message.limit(limit); -delegate.receive(client, message, callback); -doDelegate = true; +return saslHandler.doAuthChallenge(client, message, callback); } else { LOG.debug("Unexpected challenge message from client {}, closing channel.", channel.remoteAddress()); callback.onFailure(new IllegalArgumentException("Unknown challenge message.")); channel.close(); } - return; + return false; } // Here we have the client challenge, so perform the new auth protocol and set up the channel. @@ -131,7 +124,7 @@ class AuthRpcHandler
[spark] branch branch-3.0 updated: Apply appropriate RPC handler to receive, receiveStream when auth enabled
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new c80d5f7 Apply appropriate RPC handler to receive, receiveStream when auth enabled c80d5f7 is described below commit c80d5f7aa0fd9e7b37e1bf4175204750098a44a6 Author: Sean Owen AuthorDate: Fri Apr 17 13:25:12 2020 -0500 Apply appropriate RPC handler to receive, receiveStream when auth enabled --- .../spark/network/crypto/AuthRpcHandler.java | 73 +++--- .../apache/spark/network/sasl/SaslRpcHandler.java | 60 +++- .../network/server/AbstractAuthRpcHandler.java | 107 + .../spark/network/crypto/AuthIntegrationSuite.java | 12 +-- .../apache/spark/network/sasl/SparkSaslSuite.java | 3 +- 5 files changed, 142 insertions(+), 113 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 821cc7a..dd31c95 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -29,12 +29,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.StreamCallbackWithID; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.sasl.SaslRpcHandler; +import org.apache.spark.network.server.AbstractAuthRpcHandler; import org.apache.spark.network.server.RpcHandler; -import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.util.TransportConf; /** @@ -46,7 +45,7 @@ import org.apache.spark.network.util.TransportConf; * The delegate will only receive messages if the given connection has been successfully * authenticated. A connection may be authenticated at most once. */ -class AuthRpcHandler extends RpcHandler { +class AuthRpcHandler extends AbstractAuthRpcHandler { private static final Logger LOG = LoggerFactory.getLogger(AuthRpcHandler.class); /** Transport configuration. */ @@ -55,36 +54,31 @@ class AuthRpcHandler extends RpcHandler { /** The client channel. */ private final Channel channel; - /** - * RpcHandler we will delegate to for authenticated connections. When falling back to SASL - * this will be replaced with the SASL RPC handler. - */ - @VisibleForTesting - RpcHandler delegate; - /** Class which provides secret keys which are shared by server and client on a per-app basis. */ private final SecretKeyHolder secretKeyHolder; - /** Whether auth is done and future calls should be delegated. */ + /** RPC handler for auth handshake when falling back to SASL auth. */ @VisibleForTesting - boolean doDelegate; + SaslRpcHandler saslHandler; AuthRpcHandler( TransportConf conf, Channel channel, RpcHandler delegate, SecretKeyHolder secretKeyHolder) { +super(delegate); this.conf = conf; this.channel = channel; -this.delegate = delegate; this.secretKeyHolder = secretKeyHolder; } @Override - public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) { -if (doDelegate) { - delegate.receive(client, message, callback); - return; + protected boolean doAuthChallenge( + TransportClient client, + ByteBuffer message, + RpcResponseCallback callback) { +if (saslHandler != null) { + return saslHandler.doAuthChallenge(client, message, callback); } int position = message.position(); @@ -98,18 +92,17 @@ class AuthRpcHandler extends RpcHandler { if (conf.saslFallback()) { LOG.warn("Failed to parse new auth challenge, reverting to SASL for client {}.", channel.remoteAddress()); -delegate = new SaslRpcHandler(conf, channel, delegate, secretKeyHolder); +saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder); message.position(position); message.limit(limit); -delegate.receive(client, message, callback); -doDelegate = true; +return saslHandler.doAuthChallenge(client, message, callback); } else { LOG.debug("Unexpected challenge message from client {}, closing channel.", channel.remoteAddress()); callback.onFailure(new IllegalArgumentException("Unknown challenge message.")); channel.close(); } - return; + return false; } // Here we have the client challenge, so perform the new auth protocol and set up the channel. @@ -131,7 +124,7 @@ class AuthRpcHandler
[spark] branch master updated: [SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6bf5f01 [SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest 6bf5f01 is described below commit 6bf5f01a4a8b7708ce563e0a0e9a49e8ff89c71e Author: gatorsmile AuthorDate: Sat Apr 18 20:59:45 2020 +0900 [SPARK-31477][SQL] Dump codegen and compile time in BenchmarkQueryTest ### What changes were proposed in this pull request? This PR is to dump the codegen and compilation time for benchmark query tests. ### Why are the changes needed? Measure the codegen and compilation time costs in TPC-DS queries ### Does this PR introduce any user-facing change? No ### How was this patch tested? Manual test in my local laptop: ``` 23:13:12.845 WARN org.apache.spark.sql.TPCDSQuerySuite: === Metrics of Whole-stage Codegen === Total code generation time: 21.275102261 seconds Total compilation time: 12.223771828 seconds ``` Closes #28252 from gatorsmile/testMastercode. Authored-by: gatorsmile Signed-off-by: Takeshi Yamamuro --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala| 2 +- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../scala/org/apache/spark/sql/BenchmarkQueryTest.scala | 13 + .../test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala | 13 +++-- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3042a27..1cc7836 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1324,7 +1324,7 @@ object CodeGenerator extends Logging { // Reset compile time. // Visible for testing - def resetCompileTime: Unit = _compileTime.reset() + def resetCompileTime(): Unit = _compileTime.reset() /** * Compile the Java source code into a Java class, using Janino. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9f6e4fc..0244542 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -586,7 +586,7 @@ object WholeStageCodegenExec { // Reset generation time of Java source code. // Visible for testing - def resetCodeGenTime: Unit = _codeGenTime.set(0L) + def resetCodeGenTime(): Unit = _codeGenTime.set(0L) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala index 07afd41..2c3b37a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodeGenerator} import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -36,7 +37,17 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession { protected override def afterAll(): Unit = { try { // For debugging dump some statistics about how much time was spent in various optimizer rules + // code generation, and compilation. logWarning(RuleExecutor.dumpTimeSpent()) + val codeGenTime = WholeStageCodegenExec.codeGenTime.toDouble / NANOS_PER_SECOND + val compileTime = CodeGenerator.compileTime.toDouble / NANOS_PER_SECOND + val codegenInfo = +s""" + |=== Metrics of Whole-stage Codegen === + |Total code generation time: $codeGenTime seconds + |Total compile time: $compileTime seconds + """.stripMargin + logWarning(codegenInfo) spark.sessionState.catalog.reset() } finally { super.afterAll() @@ -46,6 +57,8 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession { override def beforeAll(): Unit = { super.beforeAll() RuleExecutor.resetMetrics() +CodeGenerator.resetCompileTime() +
[spark] branch branch-3.0 updated: [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 29f59623 [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark 29f59623 is described below commit 29f5962372a1ac1d2101d79bf33c53e9ece43f3b Author: Kent Yao AuthorDate: Sat Apr 18 00:32:42 2020 -0700 [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark ### What changes were proposed in this pull request? In https://github.com/apache/spark/commit/697083c051304a5ac4f9c56a63274f2103caaef4, we remove "MILLENNIUM", "CENTURY", "DECADE", "QUARTER", "MILLISECONDS", "MICROSECONDS", "EPOCH" field for date_part and extract expression, this PR fix the related Benchmark. ### Why are the changes needed? test fix. ### Does this PR introduce any user-facing change? no ### How was this patch tested? passing Jenkins Closes #28249 from yaooqinn/SPARK-31469-F. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun (cherry picked from commit 77cb7cde0d657967f619287a792a848d1f42adf5) Signed-off-by: Dongjoon Hyun --- .../benchmarks/ExtractBenchmark-jdk11-results.txt | 214 ++--- sql/core/benchmarks/ExtractBenchmark-results.txt | 214 ++--- .../sql/execution/benchmark/ExtractBenchmark.scala | 6 +- 3 files changed, 201 insertions(+), 233 deletions(-) diff --git a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt index 8c56c0b..41bb42c 100644 --- a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt @@ -1,138 +1,124 @@ -Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3 +Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz Invoke extract for timestamp: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -cast to timestamp 311331 18 32.2 31.1 1.0X -MILLENNIUM of timestamp 873893 17 11.4 87.3 0.4X -CENTURY of timestamp869873 5 11.5 86.9 0.4X -DECADE of timestamp 851872 23 11.7 85.1 0.4X -YEAR of timestamp 841856 14 11.9 84.1 0.4X -ISOYEAR of timestamp927938 12 10.8 92.7 0.3X -QUARTER of timestamp959963 6 10.4 95.9 0.3X -MONTH of timestamp 852864 18 11.7 85.2 0.4X -WEEK of timestamp 1124 1252 112 8.9 112.4 0.3X -DAY of timestamp848867 19 11.8 84.8 0.4X -DAYOFWEEK of timestamp 977987 16 10.2 97.7 0.3X -DOW of timestamp945964 18 10.6 94.5 0.3X -ISODOW of timestamp 924929 5 10.8 92.4 0.3X -DOY of timestamp852906 67 11.7 85.2 0.4X -HOUR of timestamp 665671 5 15.0 66.5 0.5X -MINUTE of timestamp 655670 15 15.3 65.5 0.5X -SECOND of timestamp 757763 7 13.2 75.7 0.4X -MILLISECONDS of timestamp 745761 14 13.4 74.5 0.4X -MICROSECONDS of timestamp 691697 7 14.5 69.1 0.5X -EPOCH of timestamp 794806 12 12.6 79.4 0.4X +cast to timestamp 322327 4 31.1 32.2 1.0X +MILLENNIUM of timestamp 834841
[spark] branch master updated: [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 77cb7cd [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark 77cb7cd is described below commit 77cb7cde0d657967f619287a792a848d1f42adf5 Author: Kent Yao AuthorDate: Sat Apr 18 00:32:42 2020 -0700 [SPARK-31469][SQL][TESTS][FOLLOWUP] Remove unsupported fields from ExtractBenchmark ### What changes were proposed in this pull request? In https://github.com/apache/spark/commit/697083c051304a5ac4f9c56a63274f2103caaef4, we remove "MILLENNIUM", "CENTURY", "DECADE", "QUARTER", "MILLISECONDS", "MICROSECONDS", "EPOCH" field for date_part and extract expression, this PR fix the related Benchmark. ### Why are the changes needed? test fix. ### Does this PR introduce any user-facing change? no ### How was this patch tested? passing Jenkins Closes #28249 from yaooqinn/SPARK-31469-F. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../benchmarks/ExtractBenchmark-jdk11-results.txt | 214 ++--- sql/core/benchmarks/ExtractBenchmark-results.txt | 214 ++--- .../sql/execution/benchmark/ExtractBenchmark.scala | 6 +- 3 files changed, 201 insertions(+), 233 deletions(-) diff --git a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt index 8c56c0b..41bb42c 100644 --- a/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/ExtractBenchmark-jdk11-results.txt @@ -1,138 +1,124 @@ -Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.3 +Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz Invoke extract for timestamp: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -cast to timestamp 311331 18 32.2 31.1 1.0X -MILLENNIUM of timestamp 873893 17 11.4 87.3 0.4X -CENTURY of timestamp869873 5 11.5 86.9 0.4X -DECADE of timestamp 851872 23 11.7 85.1 0.4X -YEAR of timestamp 841856 14 11.9 84.1 0.4X -ISOYEAR of timestamp927938 12 10.8 92.7 0.3X -QUARTER of timestamp959963 6 10.4 95.9 0.3X -MONTH of timestamp 852864 18 11.7 85.2 0.4X -WEEK of timestamp 1124 1252 112 8.9 112.4 0.3X -DAY of timestamp848867 19 11.8 84.8 0.4X -DAYOFWEEK of timestamp 977987 16 10.2 97.7 0.3X -DOW of timestamp945964 18 10.6 94.5 0.3X -ISODOW of timestamp 924929 5 10.8 92.4 0.3X -DOY of timestamp852906 67 11.7 85.2 0.4X -HOUR of timestamp 665671 5 15.0 66.5 0.5X -MINUTE of timestamp 655670 15 15.3 65.5 0.5X -SECOND of timestamp 757763 7 13.2 75.7 0.4X -MILLISECONDS of timestamp 745761 14 13.4 74.5 0.4X -MICROSECONDS of timestamp 691697 7 14.5 69.1 0.5X -EPOCH of timestamp 794806 12 12.6 79.4 0.4X +cast to timestamp 322327 4 31.1 32.2 1.0X +MILLENNIUM of timestamp 834841 8 12.0 83.4 0.4X +CENTURY of timestamp828841
[spark] branch branch-3.0 updated: [SPARK-31473][SQL] AQE should set active session during execution
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e1ed80a [SPARK-31473][SQL] AQE should set active session during execution e1ed80a is described below commit e1ed80a134a2d613a9221a15847cad3274cd6f2b Author: Maryann Xue AuthorDate: Sat Apr 18 00:08:36 2020 -0700 [SPARK-31473][SQL] AQE should set active session during execution ### What changes were proposed in this pull request? AQE creates new SparkPlan nodes during execution. This PR makes sure that the active session is set correctly during this process and AQE execution is not disrupted by external session change. ### Why are the changes needed? To prevent potential errors. If not changed, the physical plans generated by AQE would have the wrong SparkSession or even null SparkSession, which could lead to NPE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28247 from maryannxue/aqe-activesession. Authored-by: Maryann Xue Signed-off-by: Dongjoon Hyun (cherry picked from commit 6198f384054e7f86521891ceeb1a231f449a16a8) Signed-off-by: Dongjoon Hyun --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 - 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index b54a32f..2b46724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec( } private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { -if (!isFinalPlan) { +if (isFinalPlan) return currentPhysicalPlan + +// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., +// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be +// created in the middle of the execution. +context.session.withActive { // Subqueries do not have their own execution IDs and therefore rely on the main query to // update UI. val executionId = Option(context.session.sparkContext.getLocalProperty( @@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec( isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) logOnLevel(s"Final plan: $currentPhysicalPlan") + currentPhysicalPlan } -currentPhysicalPlan } override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 64dd9aa..6da510f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,13 +23,14 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -805,4 +806,14 @@ class AdaptiveQueryExecSuite assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) } } + + test("AQE should set active session during execution") { +withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val df = spark.range(10).select(sum('id)) + assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + SparkSession.setActiveSession(null) +
[spark] branch master updated (142f436 -> 6198f38)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 142f436 [SPARK-31390][SQL][DOCS] Document Window Function in SQL Syntax Section add 6198f38 [SPARK-31473][SQL] AQE should set active session during execution No new revisions were added by this update. Summary of changes: .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 - 2 files changed, 19 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31473][SQL] AQE should set active session during execution
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e1ed80a [SPARK-31473][SQL] AQE should set active session during execution e1ed80a is described below commit e1ed80a134a2d613a9221a15847cad3274cd6f2b Author: Maryann Xue AuthorDate: Sat Apr 18 00:08:36 2020 -0700 [SPARK-31473][SQL] AQE should set active session during execution ### What changes were proposed in this pull request? AQE creates new SparkPlan nodes during execution. This PR makes sure that the active session is set correctly during this process and AQE execution is not disrupted by external session change. ### Why are the changes needed? To prevent potential errors. If not changed, the physical plans generated by AQE would have the wrong SparkSession or even null SparkSession, which could lead to NPE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28247 from maryannxue/aqe-activesession. Authored-by: Maryann Xue Signed-off-by: Dongjoon Hyun (cherry picked from commit 6198f384054e7f86521891ceeb1a231f449a16a8) Signed-off-by: Dongjoon Hyun --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 - 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index b54a32f..2b46724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec( } private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { -if (!isFinalPlan) { +if (isFinalPlan) return currentPhysicalPlan + +// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., +// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be +// created in the middle of the execution. +context.session.withActive { // Subqueries do not have their own execution IDs and therefore rely on the main query to // update UI. val executionId = Option(context.session.sparkContext.getLocalProperty( @@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec( isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) logOnLevel(s"Final plan: $currentPhysicalPlan") + currentPhysicalPlan } -currentPhysicalPlan } override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 64dd9aa..6da510f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,13 +23,14 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -805,4 +806,14 @@ class AdaptiveQueryExecSuite assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) } } + + test("AQE should set active session during execution") { +withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val df = spark.range(10).select(sum('id)) + assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + SparkSession.setActiveSession(null) +
[spark] branch master updated: [SPARK-31473][SQL] AQE should set active session during execution
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6198f38 [SPARK-31473][SQL] AQE should set active session during execution 6198f38 is described below commit 6198f384054e7f86521891ceeb1a231f449a16a8 Author: Maryann Xue AuthorDate: Sat Apr 18 00:08:36 2020 -0700 [SPARK-31473][SQL] AQE should set active session during execution ### What changes were proposed in this pull request? AQE creates new SparkPlan nodes during execution. This PR makes sure that the active session is set correctly during this process and AQE execution is not disrupted by external session change. ### Why are the changes needed? To prevent potential errors. If not changed, the physical plans generated by AQE would have the wrong SparkSession or even null SparkSession, which could lead to NPE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28247 from maryannxue/aqe-activesession. Authored-by: Maryann Xue Signed-off-by: Dongjoon Hyun --- .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +++-- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 13 - 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 217817e..3ac4ea5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -139,7 +139,12 @@ case class AdaptiveSparkPlanExec( } private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { -if (!isFinalPlan) { +if (isFinalPlan) return currentPhysicalPlan + +// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g., +// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be +// created in the middle of the execution. +context.session.withActive { // Subqueries do not have their own execution IDs and therefore rely on the main query to // update UI. val executionId = Option(context.session.sparkContext.getLocalProperty( @@ -225,8 +230,8 @@ case class AdaptiveSparkPlanExec( isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) logOnLevel(s"Final plan: $currentPhysicalPlan") + currentPhysicalPlan } -currentPhysicalPlan } override def executeCollect(): Array[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 35dec44..694be98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,12 +23,13 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -907,4 +908,14 @@ class AdaptiveQueryExecSuite assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec]) } } + + test("AQE should set active session during execution") { +withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val df = spark.range(10).select(sum('id)) + assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + SparkSession.setActiveSession(null) + checkAnswer(df, Seq(Row(45))) + SparkSession.setActiveSession(spark) // recover the active session. +} + } } - To unsubscribe, e-mail: