[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168620786 @guoxu1231 actually that causes a style check failure, as does your new comment. Please try running the style checker locally. This will need to be fixed before merge. ``` [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala:76:4: Insert a space after the start of the comment [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala:21:23: No space after token , [error] (streaming/compile:scalastyle) errors exist [error] Total time: 10 s, completed Jan 3, 2016 3:59:01 AM [error] running /home/jenkins/workspace/NewSparkPullRequestBuilder/dev/lint-scala ; received return code 1 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168676470 **[Test build #2311 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2311/consoleFull)** for PR 10464 at commit [`0ebe531`](https://github.com/apache/spark/commit/0ebe53102de57ee334b68c2eec2840b8ebab3044). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168689318 **[Test build #2311 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2311/consoleFull)** for PR 10464 at commit [`0ebe531`](https://github.com/apache/spark/commit/0ebe53102de57ee334b68c2eec2840b8ebab3044). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168689648 Merged to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/10464 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168676151 @srowen thanks for the reminder, I just fixed the check style failures, please help to trigger the test build. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48692018 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import java.io._ -import java.net.{Socket, UnknownHostException} +import java.net.{Socket,ConnectException} --- End diff -- LGTM except the imports are technically out of order but I could fix on merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168491717 **[Test build #2302 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2302/consoleFull)** for PR 10464 at commit [`2c1d8ec`](https://github.com/apache/spark/commit/2c1d8ece488987ef9c963e6a5bfcefa7f5ef217b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168491769 **[Test build #2302 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2302/consoleFull)** for PR 10464 at commit [`2c1d8ec`](https://github.com/apache/spark/commit/2c1d8ece488987ef9c963e6a5bfcefa7f5ef217b). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48684054 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") +} catch { + case NonFatal(e) => +restart(s"Error connecting to $host:$port", e) +} + +if (socket != null && socket.isConnected) { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +//in case restart thread close it twice +synchronized { + if (socket != null) { +socket.close() +socket = null +logInfo(s"Closed socket to $host:$port") + } +} } /** Create a socket connection and receive data until receiver is stopped */ def receive() { -var socket: Socket = null try { - logInfo("Connecting to " + host + ":" + port) - socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - val iterator = bytesToObjects(socket.getInputStream()) + val iterator = bytesToObjects(socket.getInputStream) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48684056 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") +} catch { + case NonFatal(e) => --- End diff -- Revert to catch only ConnectionException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48678510 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") +} catch { + case NonFatal(e) => --- End diff -- Hm, I suppose now you're restarting here on any non-fatal error in connecting, not just `ConnectException`. Maybe that's OK. It's simpler to return here rather than then have to check if the socket initialized. Also the info logs seem like they don't belong in the try-catch; they can't fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48678511 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") +} catch { + case NonFatal(e) => +restart(s"Error connecting to $host:$port", e) +} + +if (socket != null && socket.isConnected) { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +//in case restart thread close it twice +synchronized { + if (socket != null) { +socket.close() +socket = null +logInfo(s"Closed socket to $host:$port") + } +} } /** Create a socket connection and receive data until receiver is stopped */ def receive() { -var socket: Socket = null try { - logInfo("Connecting to " + host + ":" + port) - socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - val iterator = bytesToObjects(socket.getInputStream()) + val iterator = bytesToObjects(socket.getInputStream) --- End diff -- Nit: the empty parens were correct here; it has a side effect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-168287331 @srowen please help to review latest code changes. I have tested different cases.(Just setup the entire environment, it is my first PR, thanks for the patient review) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48676000 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => +restart(s"Error connecting to $host:$port", e) + case NonFatal(e) => --- End diff -- "This is now in the wrong place. It was correct where it was." Sorry, I didn't catch this comment. I thought you means the "new Thread" block so I moved the "new Thead" block outside the try/catch block, It will looks like cause NPE. could you give more information about which block is now in the wrong block? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48596197 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => --- End diff -- Good question. If it fails to connect, it seems like that should be a fatal error and not attempt to restart. So I suppose you could not catch anything in `onStart` as failures here are more fatal. Later, it makes sense to still catch general exceptions (not `ConnectException`) and maybe try to restart --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167957676 Testing in the yarn cluster with new changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48588257 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => --- End diff -- Should we give up IOException? In Original logic, IO Exception(throws from socket = new Socke) also be caughted and trigger the restart logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48588447 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => +restart(s"Error connecting to $host:$port", e) + case NonFatal(e) => +logWarning("Error receiving data", e) +restart("Error receiving data", e) +} finally { + onStop() +} } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +//in case restart thread close it twice +if (socket != null) { + socket.close() --- End diff -- Yes, you are rignt, I must be drunk yesterday. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48527650 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + var socket: Socket = null + def onStart() { +logInfo("Connecting to " + host + ":" + port) --- End diff -- Nit: use string interpolation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48527684 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + var socket: Socket = null --- End diff -- Probably better as `private var socket: Socket = _` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48527726 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + var socket: Socket = null + def onStart() { +logInfo("Connecting to " + host + ":" + port) +socket = new Socket(host, port) +logInfo("Connected to " + host + ":" + port) + // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) - override def run() { receive() } + override def run() { receive(socket) } }.start() } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +if (socket != null) { + socket.close() --- End diff -- The finally block was left below though. It will cause the socket to be closed twice. It seems like it should be closed one place, or at least nulled in the finally block. But then you need to synchronize too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48527970 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + var socket: Socket = null + def onStart() { +logInfo("Connecting to " + host + ":" + port) +socket = new Socket(host, port) +logInfo("Connected to " + host + ":" + port) + // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) - override def run() { receive() } + override def run() { receive(socket) } }.start() } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +if (socket != null) { + socket.close() --- End diff -- Thanks for the great review comments! It's great . Will revise it now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48530083 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + var socket: Socket = null --- End diff -- Thanks for the great review comments! . Will revise it now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167760455 @srowen, thanks for the review. I combined the the resource release logic from finally block and onStop(), and use synchronized block to avoid possible release conflicts with restart thread(almost impossible, but in case some very rare circumstance). Receiver usually keep running very long time, we must be very careful about the resource creation and release. Thanks for the patient review scrowen :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48544697 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => +restart(s"Error connecting to $host:$port", e) + case NonFatal(e) => +logWarning("Error receiving data", e) +restart("Error receiving data", e) +} finally { + onStop() +} } def onStop() { -// There is nothing much to do as the thread calling receive() -// is designed to stop by itself isStopped() returns false +//in case restart thread close it twice +if (socket != null) { + socket.close() --- End diff -- This is now incorrect, since you can have two threads enter this block and both close the socket, or else close and null it before the other does. The synchronization of close() is irrelevant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48544636 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => --- End diff -- This catch should only pertain to creating the socket, not the rest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167802563 Thanks @srowen, look like previous receive() method included many logic inside (create socket, release socket, restart in case non-fatch exception), I almost missed the restart in case ConnectException. thanks for the review. I removed the synchronized block as well, because I found socket.close() is synchronized method. and also improved some minor IDE suggested fix. I'm newbie in scala development, thanks for the detailed review suggestion. :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48534997 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,7 +52,13 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = null + def onStart() { +logInfo(s"Connecting to $host:$port") +socket = new Socket(host, port) +logInfo(s"Connecting to $host:$port") --- End diff -- Log message is wrong -- it's connected now. You could omit this message anyway. Don't you need to move handling of `ConnectException` here? and finally, nit, but I think initializing the socket value to "_" is more canonical in Scala? if you initialize at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/10464#discussion_r48544647 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala --- @@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag]( storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { + private var socket: Socket = _ + def onStart() { -// Start the thread that receives data over a connection -new Thread("Socket Receiver") { - setDaemon(true) - override def run() { receive() } -}.start() + +try { + logInfo(s"Connecting to $host:$port") + socket = new Socket(host, port) + logInfo(s"Connected to $host:$port") + + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { +setDaemon(true) +override def run() { receive() } + }.start() +} catch { + case e: ConnectException => +restart(s"Error connecting to $host:$port", e) + case NonFatal(e) => --- End diff -- This is now in the wrong place. It was correct where it was. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user zsxwing commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167661660 Looks a race condition in `restart` and `finally { ... socket.stop() ...}`. `restart` will start a new thread and call `receiver.onStart`. So `receiver.onStart` may run before `socket.stop()`. However, it looks unlikely since it sleeps 2 seconds before calling `startReceiver()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167541966 Does this really solve the problem? the current code appears to clean up the socket on stopping already, so I wonder why this would fix it. Did you test it? It makes more sense to open the socket in onStart if you close in onStop? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167711102 @srowen, thanks for the reply. I tested it in my yarn environment, it looks more stable now. In current implementation, restart will start new socket in new thread in a fixed sleep interval and old socket resource will be released in finally block in old thread, in some rare cicumastance old socket resource may not be released properly(not sure why it happened, after 10 hours or sometimes 36 hous continous running). In this PR, I suggest to move the socket release to Receive.onStop(), it will be explictly invoked and release the socket resource before start new Socket connection, should be safer and keep the same behaviour like other Receivers and release resource in onStop() method. @zsxwing, @jerryshao --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167714955 @zsxwing However, it looks unlikely since it sleeps 2 seconds before calling startReceiver(). Is it possible about thread pool capability issue and 2 seconds sleep is not guaranteed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167683269 I guess this might be the problem of TCP delay releasing the port after disconnected, normally kernel will retain the port for a while after disconnected (TIME_WAIT), the port cannot be used at the time period. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167469967 No, you don't. It's OK to have 2 commits. There's a duplication [here](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L91), I think you could simplify it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167036016 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
Github user guoxu1231 commented on the pull request: https://github.com/apache/spark/pull/10464#issuecomment-167038464 Not sure why 2 commits in this PR? Do I need t re-submit this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...
GitHub user guoxu1231 opened a pull request: https://github.com/apache/spark/pull/10464 [SPARK-12513] [Streaming] SocketReceiver hang in Netcat example Close client side socket connection before restart socket receiver. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guoxu1231/spark SPARK-12513 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/10464.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #10464 commit a07169cf57ea2c952ca36009299af07137422581 Author: Shawn GuoDate: 2015-12-22T06:42:53Z Merge pull request #1 from apache/master Update from original commit 228da1ff117c3bbe1f6b5056824c68b36c41a365 Author: guoxu1231 Date: 2015-12-24T03:11:00Z Close client side socket connection before restart socket receiver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org