Repository: spark
Updated Branches:
  refs/heads/master 1f0f14efe -> 6502944f3


[SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI

Expose executorId to `ReceiverInfo` and UI since it's helpful when there are 
multiple executors running in the same host. Screenshot:

<img width="1058" alt="screen shot 2015-11-02 at 10 52 19 am" 
src="https://cloud.githubusercontent.com/assets/1000778/10890968/2e2f5512-8150-11e5-8d9d-746e826b69e8.png";>

Author: Shixiong Zhu <shixi...@databricks.com>
Author: zsxwing <zsxw...@gmail.com>

Closes #9418 from zsxwing/SPARK-11333.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6502944f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6502944f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6502944f

Branch: refs/heads/master
Commit: 6502944f39893b9dfb472f8406d5f3a02a316eff
Parents: 1f0f14e
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Nov 9 18:13:37 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 9 18:13:37 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/api/java/JavaStreamingListener.scala     | 1 +
 .../streaming/api/java/JavaStreamingListenerWrapper.scala    | 1 +
 .../org/apache/spark/streaming/scheduler/ReceiverInfo.scala  | 1 +
 .../spark/streaming/scheduler/ReceiverTrackingInfo.scala     | 1 +
 .../scala/org/apache/spark/streaming/ui/StreamingPage.scala  | 8 ++++++--
 .../spark/streaming/JavaStreamingListenerAPISuite.java       | 3 +++
 .../api/java/JavaStreamingListenerWrapperSuite.scala         | 8 ++++++--
 .../streaming/ui/StreamingJobProgressListenerSuite.scala     | 6 +++---
 8 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index c86c710..3442907 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -140,6 +140,7 @@ private[streaming] case class JavaReceiverInfo(
     name: String,
     active: Boolean,
     location: String,
+    executorId: String,
     lastErrorMessage: String,
     lastError: String,
     lastErrorTime: Long)

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index 2c60b39..b109b9f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -33,6 +33,7 @@ private[streaming] class 
JavaStreamingListenerWrapper(javaStreamingListener: Jav
       receiverInfo.name,
       receiverInfo.active,
       receiverInfo.location,
+      receiverInfo.executorId,
       receiverInfo.lastErrorMessage,
       receiverInfo.lastError,
       receiverInfo.lastErrorTime

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index 59df892..3b35964 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -30,6 +30,7 @@ case class ReceiverInfo(
     name: String,
     active: Boolean,
     location: String,
+    executorId: String,
     lastErrorMessage: String = "",
     lastError: String = "",
     lastErrorTime: Long = -1L

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index ab0a84f..4dc5bb9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -49,6 +49,7 @@ private[streaming] case class ReceiverTrackingInfo(
     name.getOrElse(""),
     state == ReceiverState.ACTIVE,
     location = runningExecutor.map(_.host).getOrElse(""),
+    executorId = runningExecutor.map(_.executorId).getOrElse(""),
     lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
     lastError = errorInfo.map(_.lastError).getOrElse(""),
     lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 96d943e..4588b21 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -402,7 +402,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
         <tr>
           <th style="width: 151px;"></th>
           <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 
8px 0 8px">Status</div></th>
-          <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 
8px 0 8px">Location</div></th>
+          <th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 
8px 0 8px">Executor ID / Host</div></th>
           <th style="width: 166px; padding: 8px 0 8px 0"><div style="margin: 0 
8px 0 8px">Last Error Time</div></th>
           <th>Last Error Message</th>
         </tr>
@@ -430,7 +430,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
     val receiverActive = receiverInfo.map { info =>
       if (info.active) "ACTIVE" else "INACTIVE"
     }.getOrElse(emptyCell)
-    val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
+    val receiverLocation = receiverInfo.map { info =>
+      val executorId = if (info.executorId.isEmpty) emptyCell else 
info.executorId
+      val location = if (info.location.isEmpty) emptyCell else info.location
+      s"$executorId / $location"
+    }.getOrElse(emptyCell)
     val receiverLastError = receiverInfo.map { info =>
       val msg = s"${info.lastErrorMessage} - ${info.lastError}"
       if (msg.size > 100) msg.take(97) + "..." else msg

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index 8cc285a..67b2a07 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -29,6 +29,7 @@ public class JavaStreamingListenerAPISuite extends 
JavaStreamingListener {
     receiverInfo.name();
     receiverInfo.active();
     receiverInfo.location();
+    receiverInfo.executorId();
     receiverInfo.lastErrorMessage();
     receiverInfo.lastError();
     receiverInfo.lastErrorTime();
@@ -41,6 +42,7 @@ public class JavaStreamingListenerAPISuite extends 
JavaStreamingListener {
     receiverInfo.name();
     receiverInfo.active();
     receiverInfo.location();
+    receiverInfo.executorId();
     receiverInfo.lastErrorMessage();
     receiverInfo.lastError();
     receiverInfo.lastErrorTime();
@@ -53,6 +55,7 @@ public class JavaStreamingListenerAPISuite extends 
JavaStreamingListener {
     receiverInfo.name();
     receiverInfo.active();
     receiverInfo.location();
+    receiverInfo.executorId();
     receiverInfo.lastErrorMessage();
     receiverInfo.lastError();
     receiverInfo.lastErrorTime();

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
 
b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 6d6d61e..0295e05 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -33,7 +33,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite 
{
       streamId = 2,
       name = "test",
       active = true,
-      location = "localhost"
+      location = "localhost",
+      executorId = "1"
     ))
     listenerWrapper.onReceiverStarted(receiverStarted)
     assertReceiverInfo(listener.receiverStarted.receiverInfo, 
receiverStarted.receiverInfo)
@@ -42,7 +43,8 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite 
{
       streamId = 2,
       name = "test",
       active = false,
-      location = "localhost"
+      location = "localhost",
+      executorId = "1"
     ))
     listenerWrapper.onReceiverStopped(receiverStopped)
     assertReceiverInfo(listener.receiverStopped.receiverInfo, 
receiverStopped.receiverInfo)
@@ -52,6 +54,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite 
{
       name = "test",
       active = false,
       location = "localhost",
+      executorId = "1",
       lastErrorMessage = "failed",
       lastError = "failed",
       lastErrorTime = System.currentTimeMillis()
@@ -197,6 +200,7 @@ class JavaStreamingListenerWrapperSuite extends 
SparkFunSuite {
     assert(javaReceiverInfo.name === receiverInfo.name)
     assert(javaReceiverInfo.active === receiverInfo.active)
     assert(javaReceiverInfo.location === receiverInfo.location)
+    assert(javaReceiverInfo.executorId === receiverInfo.executorId)
     assert(javaReceiverInfo.lastErrorMessage === receiverInfo.lastErrorMessage)
     assert(javaReceiverInfo.lastError === receiverInfo.lastError)
     assert(javaReceiverInfo.lastErrorTime === receiverInfo.lastErrorTime)

http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index af4718b..34cd743 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -130,20 +130,20 @@ class StreamingJobProgressListenerSuite extends 
TestSuiteBase with Matchers {
     listener.numTotalReceivedRecords should be (600)
 
     // onReceiverStarted
-    val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost")
+    val receiverInfoStarted = ReceiverInfo(0, "test", true, "localhost", "0")
     
listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
     listener.receiverInfo(0) should be (Some(receiverInfoStarted))
     listener.receiverInfo(1) should be (None)
 
     // onReceiverError
-    val receiverInfoError = ReceiverInfo(1, "test", true, "localhost")
+    val receiverInfoError = ReceiverInfo(1, "test", true, "localhost", "1")
     listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
     listener.receiverInfo(0) should be (Some(receiverInfoStarted))
     listener.receiverInfo(1) should be (Some(receiverInfoError))
     listener.receiverInfo(2) should be (None)
 
     // onReceiverStopped
-    val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost")
+    val receiverInfoStopped = ReceiverInfo(2, "test", true, "localhost", "2")
     
listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
     listener.receiverInfo(0) should be (Some(receiverInfoStarted))
     listener.receiverInfo(1) should be (Some(receiverInfoError))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to