Repository: spark Updated Branches: refs/heads/master 3c104c79d -> 2053d793c
Improve MapOutputTracker error logging. Author: Reynold Xin <r...@apache.org> Closes #1258 from rxin/mapOutputTracker and squashes the following commits: a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2053d793 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2053d793 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2053d793 Branch: refs/heads/master Commit: 2053d793cc2e8e5f5776e6576ddc6f8e6168e60c Parents: 3c104c7 Author: Reynold Xin <r...@apache.org> Authored: Sat Jun 28 21:05:03 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Sat Jun 28 21:05:03 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/MapOutputTracker.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2053d793/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 182abac..8940917 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -26,10 +26,10 @@ import scala.concurrent.Await import akka.actor._ import akka.pattern.ask -import org.apache.spark.util._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util._ private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int) @@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging Await.result(future, timeout) } catch { case e: Exception => + logError("Error communicating with MapOutputTracker", e) throw new SparkException("Error communicating with MapOutputTracker", e) } } /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */ protected def sendTracker(message: Any) { - if (askTracker(message) != true) { - throw new SparkException("Error reply received from MapOutputTracker") + val response = askTracker(message) + if (response != true) { + throw new SparkException( + "Error reply received from MapOutputTracker. Expecting true, got " + response.toString) } } @@ -366,9 +369,9 @@ private[spark] object MapOutputTracker { // any of the statuses is null (indicating a missing location due to a failed mapper), // throw a FetchFailedException. private def convertMapStatuses( - shuffleId: Int, - reduceId: Int, - statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { + shuffleId: Int, + reduceId: Int, + statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) statuses.map { status => @@ -403,7 +406,7 @@ private[spark] object MapOutputTracker { if (compressedSize == 0) { 0 } else { - math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong + math.pow(LOG_BASE, compressedSize & 0xFF).toLong } } }