Repository: spark Updated Branches: refs/heads/master db9513789 -> e714ecf27
[SPARK-7931] [STREAMING] Do not restart receiver when stopped Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #6483 from tdas/SPARK-7931 and squashes the following commits: 09aeee1 [Tathagata Das] Do not restart receiver when stopped Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e714ecf2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e714ecf2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e714ecf2 Branch: refs/heads/master Commit: e714ecf277a7412ea8263662977fe3ad1f794975 Parents: db95137 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Thu May 28 22:39:21 2015 -0700 Committer: Patrick Wendell <patr...@databricks.com> Committed: Thu May 28 22:39:25 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/dstream/SocketInputDStream.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e714ecf2/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 8b72bcf..96e0a9c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.dstream +import scala.util.control.NonFatal + import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator @@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag]( while(!isStopped && iterator.hasNext) { store(iterator.next) } + if (!isStopped()) { + restart("Socket data stream had no more data") + } logInfo("Stopped receiving") - restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) - case t: Throwable => - restart("Error receiving data", t) + case NonFatal(e) => + logWarning("Error receiving data", e) + restart("Error receiving data", e) } finally { if (socket != null) { socket.close() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org