This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6f377efd3f3 [SPARK-38469][CORE] Use error class in org.apache.spark.network 6f377efd3f3 is described below commit 6f377efd3f3b8db1909349a7c134929a2ec0bf60 Author: Bo Zhang <bo.zh...@databricks.com> AuthorDate: Tue May 16 19:13:20 2023 +0300 [SPARK-38469][CORE] Use error class in org.apache.spark.network ### What changes were proposed in this pull request? This PR aims to change exceptions created in package org.apache.spark.netrowk to use error class. This also adds an error class INTERNAL_ERROR_NETWORK and uses that for the internal errors in the package. ### Why are the changes needed? This is to move exceptions created in package org.apache.spark.network to error class. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #41140 from bozhang2820/spark-38469. Lead-authored-by: Bo Zhang <bo.zh...@databricks.com> Co-authored-by: Bo Zhang <bozhang2...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../main/scala/org/apache/spark/SparkException.scala | 3 ++- core/src/main/resources/error/error-classes.json | 6 ++++++ .../spark/network/netty/NettyBlockRpcServer.scala | 19 ++++++++++++------- .../network/netty/NettyBlockTransferService.scala | 2 +- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index 4abf0fdf498..feb7bf5b66e 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -118,7 +118,8 @@ private[spark] case class SparkUserAppException(exitCode: Int) * Exception thrown when the relative executor to access is dead. */ private[spark] case class ExecutorDeadException(message: String) - extends SparkException(message) + extends SparkException(errorClass = "INTERNAL_ERROR_NETWORK", + messageParameters = Map("message" -> message), cause = null) /** * Exception thrown when Spark returns different result after upgrading to a new version. diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index edc5a5a66e5..24f972a5006 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -830,6 +830,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_NETWORK" : { + "message" : [ + "<message>" + ], + "sqlState" : "XX000" + }, "INTERVAL_ARITHMETIC_OVERFLOW" : { "message" : [ "<message>.<alternative>" diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index f2a1fe49fcf..16ad848a326 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager import org.apache.spark.network.buffer.NioManagedBuffer @@ -93,8 +94,8 @@ class NettyBlockRpcServer( } else { val startAndEndId = fetchShuffleBlocks.reduceIds(index) if (startAndEndId.length != 2) { - throw new IllegalStateException(s"Invalid shuffle fetch request when batch mode " + - s"is enabled: $fetchShuffleBlocks") + throw SparkException.internalError("Invalid shuffle fetch request when batch mode " + + s"is enabled: $fetchShuffleBlocks", category = "NETWORK") } Array(blockManager.getLocalBlockData( ShuffleBlockBatchId( @@ -125,8 +126,10 @@ class NettyBlockRpcServer( if (blockStored) { responseContext.onSuccess(ByteBuffer.allocate(0)) } else { - val exception = new Exception(s"Upload block for $blockId failed. This mostly happens " + - s"when there is not sufficient space available to store the block.") + val exception = SparkException.internalError( + s"Upload block for $blockId failed. This mostly happens " + + "when there is not sufficient space available to store the block.", + category = "NETWORK") responseContext.onFailure(exception) } @@ -137,13 +140,15 @@ class NettyBlockRpcServer( val errorMsg = "Invalid GetLocalDirsForExecutors request: " + s"${if (isIncorrectAppId) s"incorrect application id: ${getLocalDirs.appId};"}" + s"${if (execNum != 1) s"incorrect executor number: $execNum (expected 1);"}" - responseContext.onFailure(new IllegalStateException(errorMsg)) + responseContext.onFailure( + SparkException.internalError(errorMsg, category = "NETWORK")) } else { val expectedExecId = blockManager.asInstanceOf[BlockManager].executorId val actualExecId = getLocalDirs.execIds.head if (actualExecId != expectedExecId) { - responseContext.onFailure(new IllegalStateException( - s"Invalid executor id: $actualExecId, expected $expectedExecId.")) + responseContext.onFailure(SparkException.internalError( + s"Invalid executor id: $actualExecId, expected $expectedExecId.", + category = "NETWORK")) } else { responseContext.onSuccess(new LocalDirsForExecutors( Map(actualExecId -> blockManager.getLocalDiskDirs).asJava).toByteBuffer) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index d04d2eeef0b..2bd7be8ebd9 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -137,7 +137,7 @@ private[spark] class NettyBlockTransferService( driverEndPointRef.askSync[Boolean](IsExecutorAlive(execId)) } match { case Success(v) if v == false => - throw new ExecutorDeadException(s"The relative remote executor(Id: $execId)," + + throw ExecutorDeadException(s"The relative remote executor(Id: $execId)," + " which maintains the block data to fetch is dead.") case _ => throw e } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org