Repository: spark Updated Branches: refs/heads/master 981bde9b0 -> bf578deaf
Removed throwable field from FetchFailedException and added MetadataFetchFailedException FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason. This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead). Author: Reynold Xin <r...@apache.org> Closes #1227 from rxin/metadataFetchException and squashes the following commits: 5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException. 8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf578dea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf578dea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf578dea Branch: refs/heads/master Commit: bf578deaf2493081ceeb78dfd7617def5699a06e Parents: 981bde9 Author: Reynold Xin <r...@apache.org> Authored: Thu Jun 26 21:12:16 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu Jun 26 21:12:16 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/FetchFailedException.scala | 45 ----------------- .../org/apache/spark/MapOutputTracker.scala | 12 +++-- .../scala/org/apache/spark/TaskEndReason.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/scheduler/TaskDescription.scala | 4 ++ .../spark/shuffle/FetchFailedException.scala | 52 ++++++++++++++++++++ .../shuffle/hash/BlockStoreShuffleFetcher.scala | 5 +- .../apache/spark/MapOutputTrackerSuite.scala | 1 + 8 files changed, 69 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/FetchFailedException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/FetchFailedException.scala deleted file mode 100644 index 8eaa26b..0000000 --- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import org.apache.spark.storage.BlockManagerId - -private[spark] class FetchFailedException( - taskEndReason: TaskEndReason, - message: String, - cause: Throwable) - extends Exception { - - def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, - cause: Throwable) = - this(FetchFailed(bmAddress, shuffleId, mapId, reduceId), - "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId), - cause) - - def this (shuffleId: Int, reduceId: Int, cause: Throwable) = - this(FetchFailed(null, shuffleId, -1, reduceId), - "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause) - - override def getMessage(): String = message - - - override def getCause(): Throwable = cause - - def toTaskEndReason: TaskEndReason = taskEndReason - -} http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ee82d9f..182abac 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -25,9 +25,11 @@ import scala.concurrent.Await import akka.actor._ import akka.pattern.ask + +import org.apache.spark.util._ import org.apache.spark.scheduler.MapStatus +import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util._ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) @@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } else { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing all output locations for shuffle " + shuffleId)) + throw new MetadataFetchFailedException( + shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId) } } else { statuses.synchronized { @@ -371,8 +373,8 @@ private[spark] object MapOutputTracker { statuses.map { status => if (status == null) { - throw new FetchFailedException(null, shuffleId, -1, reduceId, - new Exception("Missing an output location for shuffle " + shuffleId)) + throw new MetadataFetchFailedException( + shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId) } else { (status.location, decompressSize(status.compressedSizes(reduceId))) } http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/TaskEndReason.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 5e8bd8c..df42d67 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason { */ @DeveloperApi case class FetchFailed( - bmAddress: BlockManagerId, + bmAddress: BlockManagerId, // Note that bmAddress can be null shuffleId: Int, mapId: Int, reduceId: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 557b9a3..4d3ba11 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,8 +26,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util.{AkkaUtils, Utils} http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 1481d70..4c96b9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -21,6 +21,10 @@ import java.nio.ByteBuffer import org.apache.spark.util.SerializableBuffer +/** + * Description of a task that gets passed onto executors to be executed, usually created by + * [[TaskSetManager.resourceOffer]]. + */ private[spark] class TaskDescription( val taskId: Long, val executorId: String, http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala new file mode 100644 index 0000000..71c08e9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.{FetchFailed, TaskEndReason} + +/** + * Failed to fetch a shuffle block. The executor catches this exception and propagates it + * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage. + * + * Note that bmAddress can be null. + */ +private[spark] class FetchFailedException( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int) + extends Exception { + + override def getMessage: String = + "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId) + + def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId) +} + +/** + * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]]. + */ +private[spark] class MetadataFetchFailedException( + shuffleId: Int, + reduceId: Int, + message: String) + extends FetchFailedException(null, shuffleId, -1, reduceId) { + + override def getMessage: String = message +} http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index b05b6ea..a932455 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap +import org.apache.spark._ import org.apache.spark.executor.ShuffleReadMetrics import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId} import org.apache.spark.util.CompletionIterator -import org.apache.spark._ private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { blockId match { case ShuffleBlockId(shufId, mapId, _) => val address = statuses(mapId.toInt)._1 - throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId) case _ => throw new SparkException( "Failed to get block " + blockId + ", which is not a shuffle block") http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 95ba273..9702838 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -24,6 +24,7 @@ import akka.testkit.TestActorRef import org.scalatest.FunSuite import org.apache.spark.scheduler.MapStatus +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils