Repository: spark
Updated Branches:
  refs/heads/branch-1.4 68559423a -> e419821c3


[SPARK-7931] [STREAMING] Do not restart receiver when stopped

Attempts to restart the socket receiver when it is supposed to be stopped 
causes undesirable error messages.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #6483 from tdas/SPARK-7931 and squashes the following commits:

09aeee1 [Tathagata Das] Do not restart receiver when stopped


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a52fdf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a52fdf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a52fdf2

Branch: refs/heads/branch-1.4
Commit: 7a52fdf25f8d635ba05796abb0c491454d7869cf
Parents: 6855942
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Thu May 28 22:39:21 2015 -0700
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Thu May 28 22:48:23 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/dstream/SocketInputDStream.scala     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a52fdf2/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 8b72bcf..96e0a9c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.dstream
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
       while(!isStopped && iterator.hasNext) {
         store(iterator.next)
       }
+      if (!isStopped()) {
+        restart("Socket data stream had no more data")
+      }
       logInfo("Stopped receiving")
-      restart("Retrying connecting to " + host + ":" + port)
     } catch {
       case e: java.net.ConnectException =>
         restart("Error connecting to " + host + ":" + port, e)
-      case t: Throwable =>
-        restart("Error receiving data", t)
+      case NonFatal(e) =>
+        logWarning("Error receiving data", e)
+        restart("Error receiving data", e)
     } finally {
       if (socket != null) {
         socket.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to