Repository: spark
Updated Branches:
  refs/heads/branch-1.0 777a9a5a1 -> d933c710f


[SPARK-1617] and [SPARK-1618] Improvements to streaming ui and bug fix to 
socket receiver

1617: These changes expose the receiver state (active or inactive) and last 
error in the UI
1618: If the socket receiver cannot connect in the first attempt, it should try 
to restart after a delay. That was broken, as the thread that restarts (hence, 
stops) the receiver waited on Thread.join on itself!

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #540 from tdas/streaming-ui-fix and squashes the following commits:

e469434 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into streaming-ui-fix
dbddf75 [Tathagata Das] Style fix.
66df1a5 [Tathagata Das] Merge remote-tracking branch 'apache/master' into 
streaming-ui-fix
ad98bc9 [Tathagata Das] Refactored streaming listener to use ReceiverInfo.
d7f849c [Tathagata Das] Revert "Moved BatchInfo from streaming.scheduler to 
streaming.ui"
5c80919 [Tathagata Das] Moved BatchInfo from streaming.scheduler to streaming.ui
da244f6 [Tathagata Das] Fixed socket receiver as well as made receiver state 
and error visible in the streamign UI.

(cherry picked from commit cd12dd9bde91303d0341180e5f70d2a03d6b65b6)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d933c710
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d933c710
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d933c710

Branch: refs/heads/branch-1.0
Commit: d933c710f9b49f398446d51859fe6953d2e4b6f9
Parents: 777a9a5
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Thu Apr 24 21:34:37 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Apr 24 21:34:50 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/UIUtils.scala     |  2 +-
 .../streaming/dstream/SocketInputDStream.scala  | 49 +++++++----------
 .../streaming/receiver/ActorReceiver.scala      | 12 +++-
 .../spark/streaming/receiver/Receiver.scala     |  5 +-
 .../streaming/receiver/ReceiverMessage.scala    |  4 +-
 .../streaming/receiver/ReceiverSupervisor.scala | 58 ++++++++++++--------
 .../receiver/ReceiverSupervisorImpl.scala       | 24 +++++---
 .../spark/streaming/scheduler/BatchInfo.scala   |  3 +
 .../streaming/scheduler/ReceiverInfo.scala      | 37 +++++++++++++
 .../streaming/scheduler/ReceiverTracker.scala   | 40 +++++++++-----
 .../streaming/scheduler/StreamingListener.scala | 25 ++++++++-
 .../ui/StreamingJobProgressListener.scala       | 18 +++++-
 .../spark/streaming/ui/StreamingPage.scala      | 20 +++++--
 .../spark/streaming/NetworkReceiverSuite.scala  |  8 ++-
 .../streaming/StreamingListenerSuite.scala      | 15 ++---
 15 files changed, 217 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala 
b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index cf987a1..a3d6a18 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -122,7 +122,7 @@ private[spark] object UIUtils extends Logging {
       }
     }
     if (unit.isEmpty) {
-      "%d".formatLocal(Locale.US, value)
+      "%d".formatLocal(Locale.US, value.toInt)
     } else {
       "%.1f%s".formatLocal(Locale.US, value, unit)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 1e32727..8b72bcf 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -50,49 +50,42 @@ class SocketReceiver[T: ClassTag](
     storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
-  var socket: Socket = null
-  var receivingThread: Thread = null
-
   def onStart() {
-    receivingThread = new Thread("Socket Receiver") {
-      override def run() {
-        connect()
-        receive()
-      }
-    }
-    receivingThread.start()
+    // Start the thread that receives data over a connection
+    new Thread("Socket Receiver") {
+      setDaemon(true)
+      override def run() { receive() }
+    }.start()
   }
 
   def onStop() {
-    if (socket != null) {
-      socket.close()
-    }
-    socket = null
-    if (receivingThread != null) {
-      receivingThread.join()
-    }
+    // There is nothing much to do as the thread calling receive()
+    // is designed to stop by itself isStopped() returns false
   }
 
-  def connect() {
+  /** Create a socket connection and receive data until receiver is stopped */
+  def receive() {
+    var socket: Socket = null
     try {
       logInfo("Connecting to " + host + ":" + port)
       socket = new Socket(host, port)
-    } catch {
-      case e: Exception =>
-        restart("Could not connect to " + host + ":" + port, e)
-    }
-  }
-
-  def receive() {
-    try {
       logInfo("Connected to " + host + ":" + port)
       val iterator = bytesToObjects(socket.getInputStream())
       while(!isStopped && iterator.hasNext) {
         store(iterator.next)
       }
+      logInfo("Stopped receiving")
+      restart("Retrying connecting to " + host + ":" + port)
     } catch {
-      case e: Exception =>
-        restart("Error receiving data from socket", e)
+      case e: java.net.ConnectException =>
+        restart("Error connecting to " + host + ":" + port, e)
+      case t: Throwable =>
+        restart("Error receiving data", t)
+    } finally {
+      if (socket != null) {
+        socket.close()
+        logInfo("Closed socket to " + host + ":" + port)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index 821cf19..743be58 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -28,8 +28,13 @@ import akka.actor.SupervisorStrategy.{Escalate, Restart}
 import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.storage.StorageLevel
 import java.nio.ByteBuffer
+import org.apache.spark.annotation.DeveloperApi
 
-/** A helper with set of defaults for supervisor strategy */
+/**
+ * :: DeveloperApi ::
+ * A helper with set of defaults for supervisor strategy
+ */
+@DeveloperApi
 object ActorSupervisorStrategy {
 
   val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange 
=
@@ -40,6 +45,7 @@ object ActorSupervisorStrategy {
 }
 
 /**
+ * :: DeveloperApi ::
  * A receiver trait to be mixed in with your Actor to gain access to
  * the API for pushing received data into Spark Streaming for being processed.
  *
@@ -61,6 +67,7 @@ object ActorSupervisorStrategy {
  *       to ensure the type safety, i.e parametrized type of push block and 
InputDStream
  *       should be same.
  */
+@DeveloperApi
 trait ActorHelper {
 
   self: Actor => // to ensure that this can be added to Actor classes only
@@ -92,10 +99,12 @@ trait ActorHelper {
 }
 
 /**
+ * :: DeveloperApi ::
  * Statistics for querying the supervisor about state of workers. Used in
  * conjunction with `StreamingContext.actorStream` and
  * [[org.apache.spark.streaming.receiver.ActorHelper]].
  */
+@DeveloperApi
 case class Statistics(numberOfMsgs: Int,
   numberOfWorkers: Int,
   numberOfHiccups: Int,
@@ -188,4 +197,3 @@ private[streaming] class ActorReceiver[T: ClassTag](
     supervisor ! PoisonPill
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 44eecf1..524c1b8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -23,8 +23,10 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.annotation.DeveloperApi
 
 /**
+ * :: DeveloperApi ::
  * Abstract class of a receiver that can be run on worker nodes to receive 
external data. A
  * custom receiver can be defined by defining the functions onStart() and 
onStop(). onStart()
  * should define the setup steps necessary to start receiving data,
@@ -51,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
  *  }
  * }}}
  */
+@DeveloperApi
 abstract class Receiver[T](val storageLevel: StorageLevel) extends 
Serializable {
 
   /**
@@ -198,7 +201,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) 
extends Serializable
 
   /** Check if receiver has been marked for stopping. */
   def isStopped(): Boolean = {
-    !executor.isReceiverStarted()
+    executor.isReceiverStopped()
   }
 
   /** Get unique identifier of this receiver. */

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index 6ab3ca6..bf39d1e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait NetworkReceiverMessage
-private[streaming] object StopReceiver extends NetworkReceiverMessage
+private[streaming] sealed trait ReceiverMessage
+private[streaming] object StopReceiver extends ReceiverMessage
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 256b333..09be3a5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -88,15 +88,29 @@ private[streaming] abstract class ReceiverSupervisor(
   /** Report errors. */
   def reportError(message: String, throwable: Throwable)
 
-  /** Start the executor */
+  /** Called when supervisor is started */
+  protected def onStart() { }
+
+  /** Called when supervisor is stopped */
+  protected def onStop(message: String, error: Option[Throwable]) { }
+
+  /** Called when receiver is started */
+  protected def onReceiverStart() { }
+
+  /** Called when receiver is stopped */
+  protected def onReceiverStop(message: String, error: Option[Throwable]) { }
+
+  /** Start the supervisor */
   def start() {
+    onStart()
     startReceiver()
   }
 
-  /** Mark the executor and the receiver for stopping */
+  /** Mark the supervisor and the receiver for stopping */
   def stop(message: String, error: Option[Throwable]) {
     stoppingError = error.orNull
     stopReceiver(message, error)
+    onStop(message, error)
     stopLatch.countDown()
   }
 
@@ -104,6 +118,8 @@ private[streaming] abstract class ReceiverSupervisor(
   def startReceiver(): Unit = synchronized {
     try {
       logInfo("Starting receiver")
+      receiver.onStart()
+      logInfo("Called receiver onStart")
       onReceiverStart()
       receiverState = Started
     } catch {
@@ -115,7 +131,10 @@ private[streaming] abstract class ReceiverSupervisor(
   /** Stop receiver */
   def stopReceiver(message: String, error: Option[Throwable]): Unit = 
synchronized {
     try {
+      logInfo("Stopping receiver with message: " + message + ": " + 
error.getOrElse(""))
       receiverState = Stopped
+      receiver.onStop()
+      logInfo("Called receiver onStop")
       onReceiverStop(message, error)
     } catch {
       case t: Throwable =>
@@ -130,41 +149,32 @@ private[streaming] abstract class ReceiverSupervisor(
 
   /** Restart receiver with delay */
   def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
-    logWarning("Restarting receiver with delay " + delay + " ms: " + message,
-      error.getOrElse(null))
-    stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, 
error)
-    future {
+    Future {
+      logWarning("Restarting receiver with delay " + delay + " ms: " + message,
+        error.getOrElse(null))
+      stopReceiver("Restarting receiver with delay " + delay + "ms: " + 
message, error)
       logDebug("Sleeping for " + delay)
       Thread.sleep(delay)
-      logDebug("Starting receiver again")
+      logInfo("Starting receiver again")
       startReceiver()
       logInfo("Receiver started again")
     }
   }
 
-  /** Called when the receiver needs to be started */
-  protected def onReceiverStart(): Unit = synchronized {
-    // Call user-defined onStart()
-    logInfo("Calling receiver onStart")
-    receiver.onStart()
-    logInfo("Called receiver onStart")
-  }
-
-  /** Called when the receiver needs to be stopped */
-  protected def onReceiverStop(message: String, error: Option[Throwable]): 
Unit = synchronized {
-    // Call user-defined onStop()
-    logInfo("Calling receiver onStop")
-    receiver.onStop()
-    logInfo("Called receiver onStop")
-  }
-
   /** Check if receiver has been marked for stopping */
   def isReceiverStarted() = {
     logDebug("state = " + receiverState)
     receiverState == Started
   }
 
-  /** Wait the thread until the executor is stopped */
+  /** Check if receiver has been marked for stopping */
+  def isReceiverStopped() = {
+    logDebug("state = " + receiverState)
+    receiverState == Stopped
+  }
+
+
+  /** Wait the thread until the supervisor is stopped */
   def awaitTermination() {
     stopLatch.await()
     logInfo("Waiting for executor stop is over")

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 2a3521b..ce8316b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -79,6 +79,8 @@ private[streaming] class ReceiverSupervisorImpl(
           logInfo("Received stop signal")
           stop("Stopped by driver", None)
       }
+
+      def ref = self
     }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
 
   /** Unique block ids if one wants to add blocks directly */
@@ -154,14 +156,23 @@ private[streaming] class ReceiverSupervisorImpl(
     logWarning("Reported error " + message + " - " + error)
   }
 
-  override def onReceiverStart() {
+  override protected def onStart() {
     blockGenerator.start()
-    super.onReceiverStart()
   }
 
-  override def onReceiverStop(message: String, error: Option[Throwable]) {
-    super.onReceiverStop(message, error)
+  override protected def onStop(message: String, error: Option[Throwable]) {
     blockGenerator.stop()
+    env.actorSystem.stop(actor)
+  }
+
+  override protected def onReceiverStart() {
+    val msg = RegisterReceiver(
+      streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
+    val future = trackerActor.ask(msg)(askTimeout)
+    Await.result(future, askTimeout)
+  }
+
+  override protected def onReceiverStop(message: String, error: 
Option[Throwable]) {
     logInfo("Deregistering receiver " + streamId)
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
     val future = trackerActor.ask(
@@ -170,11 +181,6 @@ private[streaming] class ReceiverSupervisorImpl(
     logInfo("Stopped receiver " + streamId)
   }
 
-  override def stop(message: String, error: Option[Throwable]) {
-    super.stop(message, error)
-    env.actorSystem.stop(actor)
-  }
-
   /** Generate new block ID */
   private def nextBlockId = StreamBlockId(streamId, newBlockId.getAndIncrement)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index 9c69a2a..a68aecb 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.streaming.Time
+import org.apache.spark.annotation.DeveloperApi
 
 /**
+ * :: DeveloperApi ::
  * Class having information on completed batches.
  * @param batchTime   Time of the batch
  * @param submissionTime  Clock time of when jobs of this batch was submitted 
to
@@ -27,6 +29,7 @@ import org.apache.spark.streaming.Time
  * @param processingStartTime Clock time of when the first job of this batch 
started processing
  * @param processingEndTime Clock time of when the last job of this batch 
finished processing
  */
+@DeveloperApi
 case class BatchInfo(
     batchTime: Time,
     receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
new file mode 100644
index 0000000..d7e39c5
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import akka.actor.ActorRef
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Class having information about a receiver
+ */
+@DeveloperApi
+case class ReceiverInfo(
+    streamId: Int,
+    name: String,
+    private[streaming] val actor: ActorRef,
+    active: Boolean,
+    location: String,
+    lastErrorMessage: String = "",
+    lastError: String = ""
+   ) {
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 557e096..5307fe1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -28,13 +28,8 @@ import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, 
StopReceiver}
 import org.apache.spark.util.AkkaUtils
 
-/** Information about receiver */
-case class ReceiverInfo(streamId: Int, typ: String, location: String) {
-  override def toString = s"$typ-$streamId"
-}
-
 /** Information about blocks received by the receiver */
-case class ReceivedBlockInfo(
+private[streaming] case class ReceivedBlockInfo(
     streamId: Int,
     blockId: StreamBlockId,
     numRecords: Long,
@@ -69,7 +64,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
   val receiverInputStreams = ssc.graph.getReceiverInputStreams()
   val receiverInputStreamMap = Map(receiverInputStreams.map(x => (x.id, x)): 
_*)
   val receiverExecutor = new ReceiverLauncher()
-  val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, 
ActorRef]
+  val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, 
ReceiverInfo]
   val receivedBlockInfo = new HashMap[Int, 
SynchronizedQueue[ReceivedBlockInfo]]
     with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
   val timeout = AkkaUtils.askTimeout(ssc.conf)
@@ -129,17 +124,23 @@ class ReceiverTracker(ssc: StreamingContext) extends 
Logging {
     if (!receiverInputStreamMap.contains(streamId)) {
       throw new Exception("Register received for unexpected id " + streamId)
     }
-    receiverInfo += ((streamId, receiverActor))
-    ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
-      ReceiverInfo(streamId, typ, host)
-    ))
+    receiverInfo(streamId) = ReceiverInfo(
+      streamId, s"${typ}-${streamId}", receiverActor, true, host)
+    
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
     logInfo("Registered receiver for stream " + streamId + " from " + 
sender.path.address)
   }
 
   /** Deregister a receiver */
   def deregisterReceiver(streamId: Int, message: String, error: String) {
-    receiverInfo -= streamId
-    ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(streamId, 
message, error))
+    val newReceiverInfo = receiverInfo.get(streamId) match {
+      case Some(oldInfo) =>
+        oldInfo.copy(actor = null, active = false, lastErrorMessage = message, 
lastError = error)
+      case None =>
+        logWarning("No prior receiver info")
+        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = 
message, lastError = error)
+    }
+    receiverInfo(streamId) = newReceiverInfo
+    
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
     val messageWithError = if (error != null && !error.isEmpty) {
       s"$message - $error"
     } else {
@@ -157,7 +158,15 @@ class ReceiverTracker(ssc: StreamingContext) extends 
Logging {
 
   /** Report error sent by a receiver */
   def reportError(streamId: Int, message: String, error: String) {
-    ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(streamId, 
message, error))
+    val newReceiverInfo = receiverInfo.get(streamId) match {
+      case Some(oldInfo) =>
+        oldInfo.copy(lastErrorMessage = message, lastError = error)
+      case None =>
+        logWarning("No prior receiver info")
+        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = 
message, lastError = error)
+    }
+    receiverInfo(streamId) = newReceiverInfo
+    
ssc.scheduler.listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
     val messageWithError = if (error != null && !error.isEmpty) {
       s"$message - $error"
     } else {
@@ -270,7 +279,8 @@ class ReceiverTracker(ssc: StreamingContext) extends 
Logging {
     /** Stops the receivers. */
     private def stopReceivers() {
       // Signal the receivers to stop
-      receiverInfo.values.foreach(_ ! StopReceiver)
+      receiverInfo.values.flatMap { info => Option(info.actor)}
+                         .foreach { _ ! StopReceiver }
       logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 9d6ec1f..ed1aa11 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -20,28 +20,45 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable.Queue
 
 import org.apache.spark.util.Distribution
+import org.apache.spark.annotation.DeveloperApi
 
-/** Base trait for events related to StreamingListener */
+/**
+ * :: DeveloperApi ::
+ * Base trait for events related to StreamingListener
+ */
+@DeveloperApi
 sealed trait StreamingListenerEvent
 
+@DeveloperApi
 case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends 
StreamingListenerEvent
+
+@DeveloperApi
 case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends 
StreamingListenerEvent
+
+@DeveloperApi
 case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends 
StreamingListenerEvent
 
+@DeveloperApi
 case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
-case class StreamingListenerReceiverError(streamId: Int, message: String, 
error: String)
+
+@DeveloperApi
+case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
-case class StreamingListenerReceiverStopped(streamId: Int, message: String, 
error: String)
+
+@DeveloperApi
+case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
   extends StreamingListenerEvent
 
 /** An event used in the listener to shutdown the listener daemon thread. */
 private[scheduler] case object StreamingListenerShutdown extends 
StreamingListenerEvent
 
 /**
+ * :: DeveloperApi ::
  * A listener interface for receiving information about an ongoing streaming
  * computation.
  */
+@DeveloperApi
 trait StreamingListener {
 
   /** Called when a receiver has been started */
@@ -65,9 +82,11 @@ trait StreamingListener {
 
 
 /**
+ * :: DeveloperApi ::
  * A simple StreamingListener that logs summary statistics across Spark 
Streaming batches
  * @param numBatchInfos Number of last batches to consider for generating 
statistics (default: 10)
  */
+@DeveloperApi
 class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
   // Queue containing latest completed batches
   val batchInfos = new Queue[BatchInfo]()

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 14c33c7..f61069b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -23,9 +23,9 @@ import scala.collection.mutable.{Queue, HashMap}
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
 import org.apache.spark.streaming.scheduler.BatchInfo
-import org.apache.spark.streaming.scheduler.ReceiverInfo
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
 import org.apache.spark.util.Distribution
+import org.apache.spark.Logging
 
 
 private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -40,9 +40,21 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
 
   val batchDuration = ssc.graph.batchDuration.milliseconds
 
-  override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted) = {
+  override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted) {
     synchronized {
-      receiverInfos.put(receiverStarted.receiverInfo.streamId, 
receiverStarted.receiverInfo)
+      receiverInfos(receiverStarted.receiverInfo.streamId) = 
receiverStarted.receiverInfo
+    }
+  }
+
+  override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+    synchronized {
+      receiverInfos(receiverError.receiverInfo.streamId) = 
receiverError.receiverInfo
+    }
+  }
+
+  override def onReceiverStopped(receiverStopped: 
StreamingListenerReceiverStopped) {
+    synchronized {
+      receiverInfos(receiverStopped.receiverInfo.streamId) = 
receiverStopped.receiverInfo
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 8fe1219..451b23e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -78,25 +78,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
     val table = if (receivedRecordDistributions.size > 0) {
       val headerRow = Seq(
         "Receiver",
+        "Status",
         "Location",
         "Records in last batch\n[" + 
formatDate(Calendar.getInstance().getTime()) + "]",
         "Minimum rate\n[records/sec]",
-        "25th percentile rate\n[records/sec]",
         "Median rate\n[records/sec]",
-        "75th percentile rate\n[records/sec]",
-        "Maximum rate\n[records/sec]"
+        "Maximum rate\n[records/sec]",
+        "Last Error"
       )
       val dataRows = (0 until listener.numReceivers).map { receiverId =>
         val receiverInfo = listener.receiverInfo(receiverId)
-        val receiverName = 
receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
+        val receiverName = 
receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
+        val receiverActive = receiverInfo.map { info =>
+          if (info.active) "ACTIVE" else "INACTIVE"
+        }.getOrElse(emptyCell)
         val receiverLocation = 
receiverInfo.map(_.location).getOrElse(emptyCell)
         val receiverLastBatchRecords = 
formatNumber(lastBatchReceivedRecord(receiverId))
         val receivedRecordStats = receivedRecordDistributions(receiverId).map 
{ d =>
-          d.getQuantiles().map(r => formatNumber(r.toLong))
+          d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong))
         }.getOrElse {
           Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
         }
-        Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ 
receivedRecordStats
+        val receiverLastError = listener.receiverInfo(receiverId).map { info =>
+          val msg = s"${info.lastErrorMessage} - ${info.lastError}"
+          if (msg.size > 100) msg.take(97) + "..." else msg
+        }.getOrElse(emptyCell)
+        Seq(receiverName, receiverActive, receiverLocation, 
receiverLastBatchRecords) ++
+          receivedRecordStats ++ Seq(receiverLastError)
       }
       Some(listingTable(headerRow, dataRows))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index ff3619a..303d149 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -94,9 +94,13 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
 
     // Verify restarting actually stops and starts the receiver
     receiver.restart("restarting", null, 100)
-    assert(receiver.isStopped)
-    assert(receiver.onStopCalled)
+    eventually(timeout(50 millis), interval(10 millis)) {
+      // receiver will be stopped async
+      assert(receiver.isStopped)
+      assert(receiver.onStopCalled)
+    }
     eventually(timeout(1000 millis), interval(100 millis)) {
+      // receiver will be started async
       assert(receiver.onStartCalled)
       assert(executor.isReceiverStarted)
       assert(receiver.isStarted)

http://git-wip-us.apache.org/repos/asf/spark/blob/d933c710/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 458dd3a..ef0efa5 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -66,7 +66,7 @@ class StreamingListenerSuite extends TestSuiteBase with 
ShouldMatchers {
 
   test("receiver info reporting") {
     val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
-    val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
     inputStream.foreachRDD(_.count)
 
     val collector = new ReceiverInfoCollector
@@ -75,8 +75,8 @@ class StreamingListenerSuite extends TestSuiteBase with 
ShouldMatchers {
     ssc.start()
     try {
       eventually(timeout(1000 millis), interval(20 millis)) {
-        collector.startedReceiverInfo should have size 1
-        collector.startedReceiverInfo(0).streamId should equal (0)
+        collector.startedReceiverStreamIds.size should be >= 1
+        collector.startedReceiverStreamIds(0) should equal (0)
         collector.stoppedReceiverStreamIds should have size 1
         collector.stoppedReceiverStreamIds(0) should equal (0)
         collector.receiverErrors should have size 1
@@ -108,20 +108,21 @@ class BatchInfoCollector extends StreamingListener {
 
 /** Listener that collects information on processed batches */
 class ReceiverInfoCollector extends StreamingListener {
-  val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+  val startedReceiverStreamIds = new ArrayBuffer[Int]
   val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
   val receiverErrors = new ArrayBuffer[(Int, String, String)]()
 
   override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted) {
-    startedReceiverInfo += receiverStarted.receiverInfo
+    startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
   }
 
   override def onReceiverStopped(receiverStopped: 
StreamingListenerReceiverStopped) {
-    stoppedReceiverStreamIds += receiverStopped.streamId
+    stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
   }
 
   override def onReceiverError(receiverError: StreamingListenerReceiverError) {
-    receiverErrors += ((receiverError.streamId, receiverError.message, 
receiverError.error))
+    receiverErrors += ((receiverError.receiverInfo.streamId,
+      receiverError.receiverInfo.lastErrorMessage, 
receiverError.receiverInfo.lastError))
   }
 }
 

Reply via email to