This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f377efd3f3 [SPARK-38469][CORE] Use error class in 
org.apache.spark.network
6f377efd3f3 is described below

commit 6f377efd3f3b8db1909349a7c134929a2ec0bf60
Author: Bo Zhang <bo.zh...@databricks.com>
AuthorDate: Tue May 16 19:13:20 2023 +0300

    [SPARK-38469][CORE] Use error class in org.apache.spark.network
    
    ### What changes were proposed in this pull request?
    This PR aims to change exceptions created in package 
org.apache.spark.netrowk to use error class. This also adds an error class 
INTERNAL_ERROR_NETWORK and uses that for the internal errors in the package.
    
    ### Why are the changes needed?
    This is to move exceptions created in package org.apache.spark.network to 
error class.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #41140 from bozhang2820/spark-38469.
    
    Lead-authored-by: Bo Zhang <bo.zh...@databricks.com>
    Co-authored-by: Bo Zhang <bozhang2...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../main/scala/org/apache/spark/SparkException.scala  |  3 ++-
 core/src/main/resources/error/error-classes.json      |  6 ++++++
 .../spark/network/netty/NettyBlockRpcServer.scala     | 19 ++++++++++++-------
 .../network/netty/NettyBlockTransferService.scala     |  2 +-
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 4abf0fdf498..feb7bf5b66e 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -118,7 +118,8 @@ private[spark] case class SparkUserAppException(exitCode: 
Int)
  * Exception thrown when the relative executor to access is dead.
  */
 private[spark] case class ExecutorDeadException(message: String)
-  extends SparkException(message)
+  extends SparkException(errorClass = "INTERNAL_ERROR_NETWORK",
+    messageParameters = Map("message" -> message), cause = null)
 
 /**
  * Exception thrown when Spark returns different result after upgrading to a 
new version.
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index edc5a5a66e5..24f972a5006 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -830,6 +830,12 @@
     ],
     "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_NETWORK" : {
+    "message" : [
+      "<message>"
+    ],
+    "sqlState" : "XX000"
+  },
   "INTERVAL_ARITHMETIC_OVERFLOW" : {
     "message" : [
       "<message>.<alternative>"
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index f2a1fe49fcf..16ad848a326 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.BlockDataManager
 import org.apache.spark.network.buffer.NioManagedBuffer
@@ -93,8 +94,8 @@ class NettyBlockRpcServer(
           } else {
             val startAndEndId = fetchShuffleBlocks.reduceIds(index)
             if (startAndEndId.length != 2) {
-              throw new IllegalStateException(s"Invalid shuffle fetch request 
when batch mode " +
-                s"is enabled: $fetchShuffleBlocks")
+              throw SparkException.internalError("Invalid shuffle fetch 
request when batch mode " +
+                s"is enabled: $fetchShuffleBlocks", category = "NETWORK")
             }
             Array(blockManager.getLocalBlockData(
               ShuffleBlockBatchId(
@@ -125,8 +126,10 @@ class NettyBlockRpcServer(
         if (blockStored) {
           responseContext.onSuccess(ByteBuffer.allocate(0))
         } else {
-          val exception = new Exception(s"Upload block for $blockId failed. 
This mostly happens " +
-            s"when there is not sufficient space available to store the 
block.")
+          val exception = SparkException.internalError(
+            s"Upload block for $blockId failed. This mostly happens " +
+            "when there is not sufficient space available to store the block.",
+            category = "NETWORK")
           responseContext.onFailure(exception)
         }
 
@@ -137,13 +140,15 @@ class NettyBlockRpcServer(
           val errorMsg = "Invalid GetLocalDirsForExecutors request: " +
             s"${if (isIncorrectAppId) s"incorrect application id: 
${getLocalDirs.appId};"}" +
             s"${if (execNum != 1) s"incorrect executor number: $execNum 
(expected 1);"}"
-          responseContext.onFailure(new IllegalStateException(errorMsg))
+          responseContext.onFailure(
+            SparkException.internalError(errorMsg, category = "NETWORK"))
         } else {
           val expectedExecId = 
blockManager.asInstanceOf[BlockManager].executorId
           val actualExecId = getLocalDirs.execIds.head
           if (actualExecId != expectedExecId) {
-            responseContext.onFailure(new IllegalStateException(
-              s"Invalid executor id: $actualExecId, expected 
$expectedExecId."))
+            responseContext.onFailure(SparkException.internalError(
+              s"Invalid executor id: $actualExecId, expected $expectedExecId.",
+              category = "NETWORK"))
           } else {
             responseContext.onSuccess(new LocalDirsForExecutors(
               Map(actualExecId -> 
blockManager.getLocalDiskDirs).asJava).toByteBuffer)
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index d04d2eeef0b..2bd7be8ebd9 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ private[spark] class NettyBlockTransferService(
                 driverEndPointRef.askSync[Boolean](IsExecutorAlive(execId))
               } match {
                 case Success(v) if v == false =>
-                  throw new ExecutorDeadException(s"The relative remote 
executor(Id: $execId)," +
+                  throw ExecutorDeadException(s"The relative remote 
executor(Id: $execId)," +
                     " which maintains the block data to fetch is dead.")
                 case _ => throw e
               }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to