This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0f6a4a737ee [SPARK-38476][CORE] Use error class in org.apache.spark.storage 0f6a4a737ee is described below commit 0f6a4a737ee9457a0b0c336b7d079cdd878d20e8 Author: Bo Zhang <bo.zh...@databricks.com> AuthorDate: Tue Jul 11 13:06:52 2023 +0300 [SPARK-38476][CORE] Use error class in org.apache.spark.storage ### What changes were proposed in this pull request? This PR aims to change exceptions created in package org.apache.spark.shuffle to use error class. This also adds an error class INTERNAL_ERROR_STORAGE and uses that for the internal errors in the package. ### Why are the changes needed? This is to move exceptions created in package org.apache.spark.storage to error class. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated existing tests. Closes #41923 from bozhang2820/spark-38476. Authored-by: Bo Zhang <bo.zh...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- common/utils/src/main/resources/error/error-classes.json | 6 ++++++ .../org/apache/spark/storage/BlockInfoManager.scala | 7 ++++--- .../scala/org/apache/spark/storage/BlockManager.scala | 4 ++-- .../org/apache/spark/storage/DiskBlockManager.scala | 10 +++++----- .../org/apache/spark/storage/DiskBlockObjectWriter.scala | 4 +++- .../main/scala/org/apache/spark/storage/DiskStore.scala | 7 ++++--- .../scala/org/apache/spark/storage/FallbackStorage.scala | 5 +++-- .../spark/storage/ShuffleBlockFetcherIterator.scala | 5 +++-- .../org/apache/spark/storage/memory/MemoryStore.scala | 16 ++++++++++------ .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 4 ++-- .../spark/storage/DiskBlockObjectWriterSuite.scala | 4 ++-- .../spark/storage/PartiallySerializedBlockSuite.scala | 14 +++++++------- docs/sql-error-conditions.md | 6 ++++++ 13 files changed, 57 insertions(+), 35 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 66305c20112..347ce026476 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1089,6 +1089,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_STORAGE" : { + "message" : [ + "<message>" + ], + "sqlState" : "XX000" + }, "INTERVAL_ARITHMETIC_OVERFLOW" : { "message" : [ "<message>.<alternative>" diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index fb532dd0736..45ebb6eafa6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -29,7 +29,7 @@ import scala.reflect.ClassTag import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset} import com.google.common.util.concurrent.Striped -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging @@ -543,8 +543,9 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false logTrace(s"Task $taskAttemptId trying to remove block $blockId") blockInfo(blockId) { (info, condition) => if (info.writerTask != taskAttemptId) { - throw new IllegalStateException( - s"Task $taskAttemptId called remove() on block $blockId without a write lock") + throw SparkException.internalError( + s"Task $taskAttemptId called remove() on block $blockId without a write lock", + category = "STORAGE") } else { invisibleRDDBlocks.synchronized { blockInfoWrappers.remove(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b4453b4d35e..05d57c67576 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1171,8 +1171,8 @@ private[spark] class BlockManager( val buf = blockTransferService.fetchBlockSync(loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager) if (blockSize > 0 && buf.size() == 0) { - throw new IllegalStateException("Empty buffer received for non empty block " + - s"when fetching remote block $blockId from $loc") + throw SparkException.internalError("Empty buffer received for non empty block " + + s"when fetching remote block $blockId from $loc", category = "STORAGE") } buf } catch { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 0427fbd9b62..1e8289287cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -27,7 +27,7 @@ import scala.collection.mutable.HashMap import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{config, Logging} @@ -139,15 +139,15 @@ private[spark] class DiskBlockManager( case mergedMetaBlockId: ShuffleMergedMetaBlockId => getMergedShuffleFile(mergedMetaBlockId.name, dirs) case _ => - throw new IllegalArgumentException( - s"Only merged block ID is supported, but got $blockId") + throw SparkException.internalError( + s"Only merged block ID is supported, but got $blockId", category = "STORAGE") } } private def getMergedShuffleFile(filename: String, dirs: Option[Array[String]]): File = { if (!dirs.exists(_.nonEmpty)) { - throw new IllegalArgumentException( - s"Cannot read $filename because merged shuffle dirs is empty") + throw SparkException.internalError( + s"Cannot read $filename because merged shuffle dirs is empty", category = "STORAGE") } new File(ExecutorDiskUtils.getFilePath(dirs.get, subDirsPerLocalDir, filename)) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 3bdae2fe74f..f8bd73e6561 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -22,6 +22,7 @@ import java.nio.channels.{ClosedByInterruptException, FileChannel} import java.nio.file.Files import java.util.zip.Checksum +import org.apache.spark.SparkException import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.io.MutableCheckedOutputStream @@ -153,7 +154,8 @@ private[spark] class DiskBlockObjectWriter( def open(): DiskBlockObjectWriter = { if (hasBeenClosed) { - throw new IllegalStateException("Writer already closed. Cannot be reopened.") + throw SparkException.internalError( + "Writer already closed. Cannot be reopened.", category = "STORAGE") } if (!initialized) { initialize() diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index d45947db693..1cb5adef5f4 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.Closeables import io.netty.channel.DefaultFileRegion import org.apache.commons.io.FileUtils -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.{config, Logging} import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} @@ -67,8 +67,9 @@ private[spark] class DiskStore( diskManager.getFile(blockId).delete() } catch { case e: Exception => - throw new IllegalStateException( - s"Block $blockId is already present in the disk store and could not delete it $e") + throw SparkException.internalError( + s"Block $blockId is already present in the disk store and could not delete it $e", + category = "STORAGE") } } logDebug(s"Attempting to put block $blockId") diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 5aa5c6eff7b..eb23fb4b1c8 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH} @@ -170,7 +170,8 @@ private[spark] object FallbackStorage extends Logging { case batchId: ShuffleBlockBatchId => (batchId.shuffleId, batchId.mapId, batchId.startReduceId, batchId.endReduceId) case _ => - throw new IllegalArgumentException("unexpected shuffle block id format: " + blockId) + throw SparkException.internalError( + s"unexpected shuffle block id format: $blockId", category = "STORAGE") } val name = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 7cd408caaa5..b21a2aa1c17 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -33,7 +33,7 @@ import io.netty.util.internal.OutOfDirectMemoryError import org.apache.commons.io.IOUtils import org.roaringbitmap.RoaringBitmap -import org.apache.spark.{MapOutputTracker, TaskContext} +import org.apache.spark.{MapOutputTracker, SparkException, TaskContext} import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging @@ -1143,7 +1143,8 @@ final class ShuffleBlockFetcherIterator( logWarning(diagnosisResponse) diagnosisResponse case unexpected: BlockId => - throw new IllegalArgumentException(s"Unexpected type of BlockId, $unexpected") + throw SparkException.internalError( + s"Unexpected type of BlockId, $unexpected", category = "STORAGE") } } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index e62f3c126b1..489b6a53e47 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams -import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.{SparkConf, SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{STORAGE_UNROLL_MEMORY_THRESHOLD, UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} @@ -371,7 +371,8 @@ private[spark] class MemoryStore( entry match { case null => None case _: DeserializedMemoryEntry[_] => - throw new IllegalArgumentException("should only call getBytes on serialized blocks") + throw SparkException.internalError( + "should only call getBytes on serialized blocks", category = "STORAGE") case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } } @@ -381,7 +382,8 @@ private[spark] class MemoryStore( entry match { case null => None case e: SerializedMemoryEntry[_] => - throw new IllegalArgumentException("should only call getValues on deserialized blocks") + throw SparkException.internalError( + "should only call getValues on deserialized blocks", category = "STORAGE") case DeserializedMemoryEntry(values, _, _, _) => val x = Some(values) x.map(_.iterator) @@ -862,11 +864,13 @@ private[storage] class PartiallySerializedBlock[T]( private def verifyNotConsumedAndNotDiscarded(): Unit = { if (consumed) { - throw new IllegalStateException( - "Can only call one of finishWritingToStream() or valuesIterator() and can only call once.") + throw SparkException.internalError( + "Can only call one of finishWritingToStream() or valuesIterator() and can only call once.", + category = "STORAGE") } if (discarded) { - throw new IllegalStateException("Cannot call methods on a discarded PartiallySerializedBlock") + throw SparkException.internalError( + "Cannot call methods on a discarded PartiallySerializedBlock", category = "STORAGE") } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index a1cd1dbc9be..3708f0aa672 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -309,7 +309,7 @@ class BlockInfoManagerSuite extends SparkFunSuite { withTaskId(0) { assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) blockInfoManager.unlock("block") - intercept[IllegalStateException] { + intercept[SparkException] { blockInfoManager.removeBlock("block") } } @@ -320,7 +320,7 @@ class BlockInfoManagerSuite extends SparkFunSuite { assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())) blockInfoManager.unlock("block") assert(blockInfoManager.lockForReading("block").isDefined) - intercept[IllegalStateException] { + intercept[SparkException] { blockInfoManager.removeBlock("block") } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala index e6bf01b4b65..70a57eed07a 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io.File -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.util.Utils @@ -96,7 +96,7 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite { writer.open() writer.close() - intercept[IllegalStateException] { + intercept[SparkException] { writer.open() } } diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala index 9753b483153..5582524ffee 100644 --- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala @@ -26,7 +26,7 @@ import org.mockito.Mockito.atLeastOnce import org.mockito.invocation.InvocationOnMock import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} -import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TaskContext, TaskContextImpl} import org.apache.spark.memory.MemoryMode import org.apache.spark.serializer.{JavaSerializer, SerializationStream, SerializerManager} import org.apache.spark.storage.memory.{MemoryStore, PartiallySerializedBlock, RedirectableOutputStream} @@ -97,10 +97,10 @@ class PartiallySerializedBlockSuite test("valuesIterator() and finishWritingToStream() cannot be called after discard() is called") { val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) partiallySerializedBlock.discard() - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.finishWritingToStream(null) } - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.valuesIterator } } @@ -114,7 +114,7 @@ class PartiallySerializedBlockSuite test("cannot call valuesIterator() more than once") { val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) partiallySerializedBlock.valuesIterator - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.valuesIterator } } @@ -122,7 +122,7 @@ class PartiallySerializedBlockSuite test("cannot call finishWritingToStream() more than once") { val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) partiallySerializedBlock.finishWritingToStream(new ByteBufferOutputStream()) - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.finishWritingToStream(new ByteBufferOutputStream()) } } @@ -130,7 +130,7 @@ class PartiallySerializedBlockSuite test("cannot call finishWritingToStream() after valuesIterator()") { val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) partiallySerializedBlock.valuesIterator - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.finishWritingToStream(new ByteBufferOutputStream()) } } @@ -138,7 +138,7 @@ class PartiallySerializedBlockSuite test("cannot call valuesIterator() after finishWritingToStream()") { val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2) partiallySerializedBlock.finishWritingToStream(new ByteBufferOutputStream()) - intercept[IllegalStateException] { + intercept[SparkException] { partiallySerializedBlock.valuesIterator } } diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 1f8b09dba10..88bf68dd18d 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -750,6 +750,12 @@ For more details see [INSUFFICIENT_TABLE_PROPERTY](sql-error-conditions-insuffic `<message>` +### INTERNAL_ERROR_STORAGE + +[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +`<message>` + ### INTERVAL_ARITHMETIC_OVERFLOW [SQLSTATE: 22015](sql-error-conditions-sqlstates.html#class-22-data-exception) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org