Repository: spark
Updated Branches:
  refs/heads/master 3c104c79d -> 2053d793c


Improve MapOutputTracker error logging.

Author: Reynold Xin <r...@apache.org>

Closes #1258 from rxin/mapOutputTracker and squashes the following commits:

a7c95b6 [Reynold Xin] Improve MapOutputTracker error logging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2053d793
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2053d793
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2053d793

Branch: refs/heads/master
Commit: 2053d793cc2e8e5f5776e6576ddc6f8e6168e60c
Parents: 3c104c7
Author: Reynold Xin <r...@apache.org>
Authored: Sat Jun 28 21:05:03 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Sat Jun 28 21:05:03 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/MapOutputTracker.scala  | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2053d793/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 182abac..8940917 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -26,10 +26,10 @@ import scala.concurrent.Await
 import akka.actor._
 import akka.pattern.ask
 
-import org.apache.spark.util._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util._
 
 private[spark] sealed trait MapOutputTrackerMessage
 private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -107,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
       Await.result(future, timeout)
     } catch {
       case e: Exception =>
+        logError("Error communicating with MapOutputTracker", e)
         throw new SparkException("Error communicating with MapOutputTracker", 
e)
     }
   }
 
   /** Send a one-way message to the trackerActor, to which we expect it to 
reply with true. */
   protected def sendTracker(message: Any) {
-    if (askTracker(message) != true) {
-      throw new SparkException("Error reply received from MapOutputTracker")
+    val response = askTracker(message)
+    if (response != true) {
+      throw new SparkException(
+        "Error reply received from MapOutputTracker. Expecting true, got " + 
response.toString)
     }
   }
 
@@ -366,9 +369,9 @@ private[spark] object MapOutputTracker {
   // any of the statuses is null (indicating a missing location due to a 
failed mapper),
   // throw a FetchFailedException.
   private def convertMapStatuses(
-        shuffleId: Int,
-        reduceId: Int,
-        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+      shuffleId: Int,
+      reduceId: Int,
+      statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
     assert (statuses != null)
     statuses.map {
       status =>
@@ -403,7 +406,7 @@ private[spark] object MapOutputTracker {
     if (compressedSize == 0) {
       0
     } else {
-      math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
+      math.pow(LOG_BASE, compressedSize & 0xFF).toLong
     }
   }
 }

Reply via email to