[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168620786
  
@guoxu1231 actually that causes a style check failure, as does your new 
comment. Please try running the style checker locally. This will need to be 
fixed before merge.

```
[error] 
/home/jenkins/workspace/NewSparkPullRequestBuilder/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala:76:4:
 Insert a space after the start of the comment
[error] 
/home/jenkins/workspace/NewSparkPullRequestBuilder/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala:21:23:
 No space after token ,
[error] (streaming/compile:scalastyle) errors exist
[error] Total time: 10 s, completed Jan 3, 2016 3:59:01 AM
[error] running 
/home/jenkins/workspace/NewSparkPullRequestBuilder/dev/lint-scala ; received 
return code 1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168676470
  
**[Test build #2311 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2311/consoleFull)**
 for PR 10464 at commit 
[`0ebe531`](https://github.com/apache/spark/commit/0ebe53102de57ee334b68c2eec2840b8ebab3044).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168689318
  
**[Test build #2311 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2311/consoleFull)**
 for PR 10464 at commit 
[`0ebe531`](https://github.com/apache/spark/commit/0ebe53102de57ee334b68c2eec2840b8ebab3044).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168689648
  
Merged to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/10464


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-04 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168676151
  
@srowen  thanks for the reminder, I just fixed the check style failures, 
please help to trigger the test build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-03 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48692018
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import java.io._
-import java.net.{Socket, UnknownHostException}
+import java.net.{Socket,ConnectException}
--- End diff --

LGTM except the imports are technically out of order but I could fix on 
merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168491717
  
**[Test build #2302 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2302/consoleFull)**
 for PR 10464 at commit 
[`2c1d8ec`](https://github.com/apache/spark/commit/2c1d8ece488987ef9c963e6a5bfcefa7f5ef217b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168491769
  
**[Test build #2302 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2302/consoleFull)**
 for PR 10464 at commit 
[`2c1d8ec`](https://github.com/apache/spark/commit/2c1d8ece488987ef9c963e6a5bfcefa7f5ef217b).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-02 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48684054
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+} catch {
+  case NonFatal(e) =>
+restart(s"Error connecting to $host:$port", e)
+}
+
+if (socket != null && socket.isConnected) {
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+}
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+//in case restart thread close it twice
+synchronized {
+  if (socket != null) {
+socket.close()
+socket = null
+logInfo(s"Closed socket to $host:$port")
+  }
+}
   }
 
   /** Create a socket connection and receive data until receiver is 
stopped */
   def receive() {
-var socket: Socket = null
 try {
-  logInfo("Connecting to " + host + ":" + port)
-  socket = new Socket(host, port)
-  logInfo("Connected to " + host + ":" + port)
-  val iterator = bytesToObjects(socket.getInputStream())
+  val iterator = bytesToObjects(socket.getInputStream)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-02 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48684056
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+} catch {
+  case NonFatal(e) =>
--- End diff --

Revert to catch only ConnectionException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48678510
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+} catch {
+  case NonFatal(e) =>
--- End diff --

Hm, I suppose now you're restarting here on any non-fatal error in 
connecting, not just `ConnectException`. Maybe that's OK. It's simpler to 
return here rather than then have to check if the socket initialized. Also the 
info logs seem like they don't belong in the try-catch; they can't fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2016-01-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48678511
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -51,29 +51,44 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+} catch {
+  case NonFatal(e) =>
+restart(s"Error connecting to $host:$port", e)
+}
+
+if (socket != null && socket.isConnected) {
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+}
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+//in case restart thread close it twice
+synchronized {
+  if (socket != null) {
+socket.close()
+socket = null
+logInfo(s"Closed socket to $host:$port")
+  }
+}
   }
 
   /** Create a socket connection and receive data until receiver is 
stopped */
   def receive() {
-var socket: Socket = null
 try {
-  logInfo("Connecting to " + host + ":" + port)
-  socket = new Socket(host, port)
-  logInfo("Connected to " + host + ":" + port)
-  val iterator = bytesToObjects(socket.getInputStream())
+  val iterator = bytesToObjects(socket.getInputStream)
--- End diff --

Nit: the empty parens were correct here; it has a side effect


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-31 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-168287331
  
@srowen  please help to review latest code changes. I have tested different 
cases.(Just setup the entire environment, it is my first PR, thanks for the 
patient review)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-31 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48676000
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
+restart(s"Error connecting to $host:$port", e)
+  case NonFatal(e) =>
--- End diff --

"This is now in the wrong place. It was correct where it was."
Sorry, I didn't catch this comment. I thought you means the "new Thread" 
block so I moved the "new Thead" block outside the try/catch block,  It will 
looks like cause NPE.  could you give more information about which block is now 
in the wrong block?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-30 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48596197
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
--- End diff --

Good question. If it fails to connect, it seems like that should be a fatal 
error and not attempt to restart. So I suppose you could not catch anything in 
`onStart` as failures here are more fatal. Later, it makes sense to still catch 
general exceptions (not `ConnectException`) and maybe try to restart


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-30 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167957676
  
Testing in the yarn cluster with new changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48588257
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
--- End diff --

Should we give up IOException?  In Original logic, IO Exception(throws from 
socket = new Socke) also be caughted and trigger the restart logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48588447
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
+restart(s"Error connecting to $host:$port", e)
+  case NonFatal(e) =>
+logWarning("Error receiving data", e)
+restart("Error receiving data", e)
+} finally {
+  onStop()
+}
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+//in case restart thread close it twice
+if (socket != null) {
+  socket.close()
--- End diff --

Yes, you are rignt, I must be drunk yesterday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48527650
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  var socket: Socket = null
+
   def onStart() {
+logInfo("Connecting to " + host + ":" + port)
--- End diff --

Nit: use string interpolation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48527684
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  var socket: Socket = null
--- End diff --

Probably better as `private var socket: Socket = _`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48527726
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  var socket: Socket = null
+
   def onStart() {
+logInfo("Connecting to " + host + ":" + port)
+socket = new Socket(host, port)
+logInfo("Connected to " + host + ":" + port)
+
 // Start the thread that receives data over a connection
 new Thread("Socket Receiver") {
   setDaemon(true)
-  override def run() { receive() }
+  override def run() { receive(socket) }
 }.start()
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+if (socket != null) {
+  socket.close()
--- End diff --

The finally block was left below though. It will cause the socket to be 
closed twice. It seems like it should be closed one place, or at least nulled 
in the finally block. But then you need to synchronize too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48527970
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  var socket: Socket = null
+
   def onStart() {
+logInfo("Connecting to " + host + ":" + port)
+socket = new Socket(host, port)
+logInfo("Connected to " + host + ":" + port)
+
 // Start the thread that receives data over a connection
 new Thread("Socket Receiver") {
   setDaemon(true)
-  override def run() { receive() }
+  override def run() { receive(socket) }
 }.start()
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+if (socket != null) {
+  socket.close()
--- End diff --

Thanks for the great review comments!  It's great .  Will revise it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48530083
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,34 +52,40 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  var socket: Socket = null
--- End diff --

Thanks for the great review comments! .  Will revise it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167760455
  
@srowen, thanks for the review. I combined the the resource release logic 
from finally block and onStop(), and use synchronized block to avoid possible 
release conflicts with restart thread(almost impossible, but in case some very 
rare circumstance). 
Receiver usually keep running very long time, we must be very careful about 
the resource creation and release.  Thanks for the patient review scrowen :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48544697
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
+restart(s"Error connecting to $host:$port", e)
+  case NonFatal(e) =>
+logWarning("Error receiving data", e)
+restart("Error receiving data", e)
+} finally {
+  onStop()
+}
   }
 
   def onStop() {
-// There is nothing much to do as the thread calling receive()
-// is designed to stop by itself isStopped() returns false
+//in case restart thread close it twice
+if (socket != null) {
+  socket.close()
--- End diff --

This is now incorrect, since you can have two threads enter this block and 
both close the socket, or else close and null it before the other does. The 
synchronization of close() is irrelevant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48544636
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
--- End diff --

This catch should only pertain to creating the socket, not the rest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167802563
  
Thanks @srowen, look like previous receive() method included many logic 
inside (create socket, release socket, restart in case non-fatch exception), I 
almost missed the restart in case ConnectException. thanks for the review. 
I removed the synchronized block as well, because I found socket.close() is 
synchronized method.
and also improved some minor IDE suggested fix. 

I'm newbie in scala development, thanks for the detailed review suggestion. 
:+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48534997
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,7 +52,13 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = null
+
   def onStart() {
+logInfo(s"Connecting to $host:$port")
+socket = new Socket(host, port)
+logInfo(s"Connecting to $host:$port")
--- End diff --

Log message is wrong -- it's connected now. You could omit this message 
anyway. Don't you need to move handling of `ConnectException` here? and 
finally, nit, but I think initializing the socket value to "_" is more 
canonical in Scala? if you initialize at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/10464#discussion_r48544647
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 ---
@@ -52,46 +52,60 @@ class SocketReceiver[T: ClassTag](
 storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
-// Start the thread that receives data over a connection
-new Thread("Socket Receiver") {
-  setDaemon(true)
-  override def run() { receive() }
-}.start()
+
+try {
+  logInfo(s"Connecting to $host:$port")
+  socket = new Socket(host, port)
+  logInfo(s"Connected to $host:$port")
+
+  // Start the thread that receives data over a connection
+  new Thread("Socket Receiver") {
+setDaemon(true)
+override def run() { receive() }
+  }.start()
+} catch {
+  case e: ConnectException =>
+restart(s"Error connecting to $host:$port", e)
+  case NonFatal(e) =>
--- End diff --

This is now in the wrong place. It was correct where it was.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-28 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167661660
  
Looks a race condition in `restart` and `finally { ... socket.stop() ...}`. 
`restart` will start a new thread and call `receiver.onStart`. So 
`receiver.onStart` may run before `socket.stop()`.

However, it looks unlikely since it sleeps 2 seconds before calling 
`startReceiver()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-28 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167541966
  
Does this really solve the problem? the current code appears to clean up 
the socket on stopping already, so I wonder why this would fix it. Did you test 
it?
It makes more sense to open the socket in onStart if you close in onStop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-28 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167711102
  
@srowen, thanks for the reply. I tested it in my yarn environment, it looks 
more stable now.

In current implementation, restart will start new socket in new thread in a 
fixed sleep interval and old socket resource will be released in finally block 
in old thread,  in some rare cicumastance old socket resource may not be 
released properly(not sure why it happened, after 10 hours or sometimes 36 hous 
continous running).

In this PR, I suggest to move the socket release to Receive.onStop(), it 
will be explictly invoked and release the socket resource before start new 
Socket connection,  should be safer and keep the same behaviour like other 
Receivers and release resource in onStop() method.

@zsxwing, @jerryshao 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-28 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167714955
  
@zsxwing However, it looks unlikely since it sleeps 2 seconds before 
calling startReceiver().

Is it possible about thread pool capability issue and 2 seconds sleep is 
not guaranteed? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-28 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167683269
  
I guess this might be the problem of TCP delay releasing the port after 
disconnected, normally kernel will retain the port for a while after 
disconnected (TIME_WAIT), the port cannot be used at the time period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-27 Thread jerryshao
Github user jerryshao commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167469967
  
No, you don't. It's OK to have 2 commits. There's a duplication 
[here](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L91),
 I think you could simplify it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167036016
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-23 Thread guoxu1231
Github user guoxu1231 commented on the pull request:

https://github.com/apache/spark/pull/10464#issuecomment-167038464
  
Not sure why 2 commits in this PR? Do I need t re-submit this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12513] [Streaming] SocketReceiver hang ...

2015-12-23 Thread guoxu1231
GitHub user guoxu1231 opened a pull request:

https://github.com/apache/spark/pull/10464

[SPARK-12513] [Streaming] SocketReceiver hang in Netcat example

Close client side socket connection before restart socket receiver.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guoxu1231/spark SPARK-12513

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/10464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #10464


commit a07169cf57ea2c952ca36009299af07137422581
Author: Shawn Guo 
Date:   2015-12-22T06:42:53Z

Merge pull request #1 from apache/master

Update from original

commit 228da1ff117c3bbe1f6b5056824c68b36c41a365
Author: guoxu1231 
Date:   2015-12-24T03:11:00Z

Close client side socket connection before restart socket receiver.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org