This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f6a4a737ee [SPARK-38476][CORE] Use error class in 
org.apache.spark.storage
0f6a4a737ee is described below

commit 0f6a4a737ee9457a0b0c336b7d079cdd878d20e8
Author: Bo Zhang <bo.zh...@databricks.com>
AuthorDate: Tue Jul 11 13:06:52 2023 +0300

    [SPARK-38476][CORE] Use error class in org.apache.spark.storage
    
    ### What changes were proposed in this pull request?
    This PR aims to change exceptions created in package 
org.apache.spark.shuffle to use error class. This also adds an error class 
INTERNAL_ERROR_STORAGE and uses that for the internal errors in the package.
    
    ### Why are the changes needed?
    This is to move exceptions created in package org.apache.spark.storage to 
error class.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Updated existing tests.
    
    Closes #41923 from bozhang2820/spark-38476.
    
    Authored-by: Bo Zhang <bo.zh...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 common/utils/src/main/resources/error/error-classes.json |  6 ++++++
 .../org/apache/spark/storage/BlockInfoManager.scala      |  7 ++++---
 .../scala/org/apache/spark/storage/BlockManager.scala    |  4 ++--
 .../org/apache/spark/storage/DiskBlockManager.scala      | 10 +++++-----
 .../org/apache/spark/storage/DiskBlockObjectWriter.scala |  4 +++-
 .../main/scala/org/apache/spark/storage/DiskStore.scala  |  7 ++++---
 .../scala/org/apache/spark/storage/FallbackStorage.scala |  5 +++--
 .../spark/storage/ShuffleBlockFetcherIterator.scala      |  5 +++--
 .../org/apache/spark/storage/memory/MemoryStore.scala    | 16 ++++++++++------
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala |  4 ++--
 .../spark/storage/DiskBlockObjectWriterSuite.scala       |  4 ++--
 .../spark/storage/PartiallySerializedBlockSuite.scala    | 14 +++++++-------
 docs/sql-error-conditions.md                             |  6 ++++++
 13 files changed, 57 insertions(+), 35 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 66305c20112..347ce026476 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1089,6 +1089,12 @@
     ],
     "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_STORAGE" : {
+    "message" : [
+      "<message>"
+    ],
+    "sqlState" : "XX000"
+  },
   "INTERVAL_ARITHMETIC_OVERFLOW" : {
     "message" : [
       "<message>.<alternative>"
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index fb532dd0736..45ebb6eafa6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
 import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset}
 import com.google.common.util.concurrent.Striped
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 
@@ -543,8 +543,9 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
     logTrace(s"Task $taskAttemptId trying to remove block $blockId")
     blockInfo(blockId) { (info, condition) =>
       if (info.writerTask != taskAttemptId) {
-        throw new IllegalStateException(
-          s"Task $taskAttemptId called remove() on block $blockId without a 
write lock")
+        throw SparkException.internalError(
+          s"Task $taskAttemptId called remove() on block $blockId without a 
write lock",
+          category = "STORAGE")
       } else {
         invisibleRDDBlocks.synchronized {
           blockInfoWrappers.remove(blockId)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b4453b4d35e..05d57c67576 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1171,8 +1171,8 @@ private[spark] class BlockManager(
         val buf = blockTransferService.fetchBlockSync(loc.host, loc.port, 
loc.executorId,
           blockId.toString, tempFileManager)
         if (blockSize > 0 && buf.size() == 0) {
-          throw new IllegalStateException("Empty buffer received for non empty 
block " +
-            s"when fetching remote block $blockId from $loc")
+          throw SparkException.internalError("Empty buffer received for non 
empty block " +
+            s"when fetching remote block $blockId from $loc", category = 
"STORAGE")
         }
         buf
       } catch {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 0427fbd9b62..1e8289287cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -27,7 +27,7 @@ import scala.collection.mutable.HashMap
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{config, Logging}
@@ -139,15 +139,15 @@ private[spark] class DiskBlockManager(
       case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
         getMergedShuffleFile(mergedMetaBlockId.name, dirs)
       case _ =>
-        throw new IllegalArgumentException(
-          s"Only merged block ID is supported, but got $blockId")
+        throw SparkException.internalError(
+          s"Only merged block ID is supported, but got $blockId", category = 
"STORAGE")
     }
   }
 
   private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
     if (!dirs.exists(_.nonEmpty)) {
-      throw new IllegalArgumentException(
-        s"Cannot read $filename because merged shuffle dirs is empty")
+      throw SparkException.internalError(
+        s"Cannot read $filename because merged shuffle dirs is empty", 
category = "STORAGE")
     }
     new File(ExecutorDiskUtils.getFilePath(dirs.get, subDirsPerLocalDir, 
filename))
   }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 3bdae2fe74f..f8bd73e6561 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -22,6 +22,7 @@ import java.nio.channels.{ClosedByInterruptException, 
FileChannel}
 import java.nio.file.Files
 import java.util.zip.Checksum
 
+import org.apache.spark.SparkException
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.MutableCheckedOutputStream
@@ -153,7 +154,8 @@ private[spark] class DiskBlockObjectWriter(
 
   def open(): DiskBlockObjectWriter = {
     if (hasBeenClosed) {
-      throw new IllegalStateException("Writer already closed. Cannot be 
reopened.")
+      throw SparkException.internalError(
+        "Writer already closed. Cannot be reopened.", category = "STORAGE")
     }
     if (!initialized) {
       initialize()
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index d45947db693..1cb5adef5f4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.Closeables
 import io.netty.channel.DefaultFileRegion
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
@@ -67,8 +67,9 @@ private[spark] class DiskStore(
         diskManager.getFile(blockId).delete()
       } catch {
         case e: Exception =>
-          throw new IllegalStateException(
-            s"Block $blockId is already present in the disk store and could 
not delete it $e")
+          throw SparkException.internalError(
+            s"Block $blockId is already present in the disk store and could 
not delete it $e",
+            category = "STORAGE")
       }
     }
     logDebug(s"Attempting to put block $blockId")
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala 
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 5aa5c6eff7b..eb23fb4b1c8 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP,
 STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
@@ -170,7 +170,8 @@ private[spark] object FallbackStorage extends Logging {
       case batchId: ShuffleBlockBatchId =>
         (batchId.shuffleId, batchId.mapId, batchId.startReduceId, 
batchId.endReduceId)
       case _ =>
-        throw new IllegalArgumentException("unexpected shuffle block id 
format: " + blockId)
+        throw SparkException.internalError(
+          s"unexpected shuffle block id format: $blockId", category = 
"STORAGE")
     }
 
     val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 7cd408caaa5..b21a2aa1c17 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -33,7 +33,7 @@ import io.netty.util.internal.OutOfDirectMemoryError
 import org.apache.commons.io.IOUtils
 import org.roaringbitmap.RoaringBitmap
 
-import org.apache.spark.{MapOutputTracker, TaskContext}
+import org.apache.spark.{MapOutputTracker, SparkException, TaskContext}
 import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
@@ -1143,7 +1143,8 @@ final class ShuffleBlockFetcherIterator(
         logWarning(diagnosisResponse)
         diagnosisResponse
       case unexpected: BlockId =>
-        throw new IllegalArgumentException(s"Unexpected type of BlockId, 
$unexpected")
+        throw SparkException.internalError(
+          s"Unexpected type of BlockId, $unexpected", category = "STORAGE")
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index e62f3c126b1..489b6a53e47 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
 
-import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.{SparkConf, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{STORAGE_UNROLL_MEMORY_THRESHOLD, 
UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
@@ -371,7 +371,8 @@ private[spark] class MemoryStore(
     entry match {
       case null => None
       case _: DeserializedMemoryEntry[_] =>
-        throw new IllegalArgumentException("should only call getBytes on 
serialized blocks")
+        throw SparkException.internalError(
+          "should only call getBytes on serialized blocks", category = 
"STORAGE")
       case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
     }
   }
@@ -381,7 +382,8 @@ private[spark] class MemoryStore(
     entry match {
       case null => None
       case e: SerializedMemoryEntry[_] =>
-        throw new IllegalArgumentException("should only call getValues on 
deserialized blocks")
+        throw SparkException.internalError(
+          "should only call getValues on deserialized blocks", category = 
"STORAGE")
       case DeserializedMemoryEntry(values, _, _, _) =>
         val x = Some(values)
         x.map(_.iterator)
@@ -862,11 +864,13 @@ private[storage] class PartiallySerializedBlock[T](
 
   private def verifyNotConsumedAndNotDiscarded(): Unit = {
     if (consumed) {
-      throw new IllegalStateException(
-        "Can only call one of finishWritingToStream() or valuesIterator() and 
can only call once.")
+      throw SparkException.internalError(
+        "Can only call one of finishWritingToStream() or valuesIterator() and 
can only call once.",
+        category = "STORAGE")
     }
     if (discarded) {
-      throw new IllegalStateException("Cannot call methods on a discarded 
PartiallySerializedBlock")
+      throw SparkException.internalError(
+        "Cannot call methods on a discarded PartiallySerializedBlock", 
category = "STORAGE")
     }
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index a1cd1dbc9be..3708f0aa672 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -309,7 +309,7 @@ class BlockInfoManagerSuite extends SparkFunSuite {
     withTaskId(0) {
       assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
       blockInfoManager.unlock("block")
-      intercept[IllegalStateException] {
+      intercept[SparkException] {
         blockInfoManager.removeBlock("block")
       }
     }
@@ -320,7 +320,7 @@ class BlockInfoManagerSuite extends SparkFunSuite {
       assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
       blockInfoManager.unlock("block")
       assert(blockInfoManager.lockForReading("block").isDefined)
-      intercept[IllegalStateException] {
+      intercept[SparkException] {
         blockInfoManager.removeBlock("block")
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index e6bf01b4b65..70a57eed07a 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.storage
 
 import java.io.File
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.util.Utils
@@ -96,7 +96,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite {
 
     writer.open()
     writer.close()
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       writer.open()
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index 9753b483153..5582524ffee 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -26,7 +26,7 @@ import org.mockito.Mockito.atLeastOnce
 import org.mockito.invocation.InvocationOnMock
 import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
 
-import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, 
TaskContextImpl}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, 
TaskContext, TaskContextImpl}
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.serializer.{JavaSerializer, SerializationStream, 
SerializerManager}
 import org.apache.spark.storage.memory.{MemoryStore, PartiallySerializedBlock, 
RedirectableOutputStream}
@@ -97,10 +97,10 @@ class PartiallySerializedBlockSuite
   test("valuesIterator() and finishWritingToStream() cannot be called after 
discard() is called") {
     val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
     partiallySerializedBlock.discard()
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.finishWritingToStream(null)
     }
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.valuesIterator
     }
   }
@@ -114,7 +114,7 @@ class PartiallySerializedBlockSuite
   test("cannot call valuesIterator() more than once") {
     val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
     partiallySerializedBlock.valuesIterator
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.valuesIterator
     }
   }
@@ -122,7 +122,7 @@ class PartiallySerializedBlockSuite
   test("cannot call finishWritingToStream() more than once") {
     val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
     partiallySerializedBlock.finishWritingToStream(new 
ByteBufferOutputStream())
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.finishWritingToStream(new 
ByteBufferOutputStream())
     }
   }
@@ -130,7 +130,7 @@ class PartiallySerializedBlockSuite
   test("cannot call finishWritingToStream() after valuesIterator()") {
     val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
     partiallySerializedBlock.valuesIterator
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.finishWritingToStream(new 
ByteBufferOutputStream())
     }
   }
@@ -138,7 +138,7 @@ class PartiallySerializedBlockSuite
   test("cannot call valuesIterator() after finishWritingToStream()") {
     val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
     partiallySerializedBlock.finishWritingToStream(new 
ByteBufferOutputStream())
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       partiallySerializedBlock.valuesIterator
     }
   }
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 1f8b09dba10..88bf68dd18d 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -750,6 +750,12 @@ For more details see 
[INSUFFICIENT_TABLE_PROPERTY](sql-error-conditions-insuffic
 
 `<message>`
 
+### INTERNAL_ERROR_STORAGE
+
+[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+`<message>`
+
 ### INTERVAL_ARITHMETIC_OVERFLOW
 
 [SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to