Repository: spark Updated Branches: refs/heads/branch-1.4 68559423a -> e419821c3
[SPARK-7931] [STREAMING] Do not restart receiver when stopped Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #6483 from tdas/SPARK-7931 and squashes the following commits: 09aeee1 [Tathagata Das] Do not restart receiver when stopped Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a52fdf2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a52fdf2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a52fdf2 Branch: refs/heads/branch-1.4 Commit: 7a52fdf25f8d635ba05796abb0c491454d7869cf Parents: 6855942 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Thu May 28 22:39:21 2015 -0700 Committer: Patrick Wendell <patr...@databricks.com> Committed: Thu May 28 22:48:23 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/dstream/SocketInputDStream.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7a52fdf2/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 8b72bcf..96e0a9c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.dstream +import scala.util.control.NonFatal + import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator @@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag]( while(!isStopped && iterator.hasNext) { store(iterator.next) } + if (!isStopped()) { + restart("Socket data stream had no more data") + } logInfo("Stopped receiving") - restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) - case t: Throwable => - restart("Error receiving data", t) + case NonFatal(e) => + logWarning("Error receiving data", e) + restart("Error receiving data", e) } finally { if (socket != null) { socket.close() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org