Repository: spark
Updated Branches:
  refs/heads/master db9513789 -> e714ecf27


[SPARK-7931] [STREAMING] Do not restart receiver when stopped

Attempts to restart the socket receiver when it is supposed to be stopped 
causes undesirable error messages.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #6483 from tdas/SPARK-7931 and squashes the following commits:

09aeee1 [Tathagata Das] Do not restart receiver when stopped


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e714ecf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e714ecf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e714ecf2

Branch: refs/heads/master
Commit: e714ecf277a7412ea8263662977fe3ad1f794975
Parents: db95137
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Thu May 28 22:39:21 2015 -0700
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Thu May 28 22:39:25 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/dstream/SocketInputDStream.scala     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e714ecf2/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 8b72bcf..96e0a9c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.dstream
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
       while(!isStopped && iterator.hasNext) {
         store(iterator.next)
       }
+      if (!isStopped()) {
+        restart("Socket data stream had no more data")
+      }
       logInfo("Stopped receiving")
-      restart("Retrying connecting to " + host + ":" + port)
     } catch {
       case e: java.net.ConnectException =>
         restart("Error connecting to " + host + ":" + port, e)
-      case t: Throwable =>
-        restart("Error receiving data", t)
+      case NonFatal(e) =>
+        logWarning("Error receiving data", e)
+        restart("Error receiving data", e)
     } finally {
       if (socket != null) {
         socket.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to