spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

2014-12-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master c4f0b4f33 - fe6efacc0


[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

Spark Streaming's ReceiverMessage trait should extend Serializable in order to 
fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and 
that message cannot be serialized, then this seems to lead to more-or-less 
silent failures. As an optimization, Akka skips message serialization for 
messages sent within the same JVM. As a result, Spark's unit tests will never 
fail due to non-serializable Akka messages, but these will cause mostly-silent 
failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a 
regular `object`, not a `case object`, StopReceiver will throw serialization 
errors. As a result, graceful receiver shutdown is broken on real clusters (and 
local-cluster mode) but works in local modes. If you want to reproduce this, 
try running the word count example from the Streaming Programming Guide in the 
Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:
val lines = ssc.socketTextStream(localhost, )
// Split each line into words
val words = lines.flatMap(_.split( ))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the 
console
wordCounts.print()
ssc.start()
Thread.sleep(1)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when 
running against a real cluster (it would report that some receivers were not 
shut down).

Author: Josh Rosen joshro...@databricks.com

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend 
Serializable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe6efacc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe6efacc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe6efacc

Branch: refs/heads/master
Commit: fe6efacc0b865e9e827a1565877077000e63976e
Parents: c4f0b4f
Author: Josh Rosen joshro...@databricks.com
Authored: Wed Dec 31 16:02:47 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Wed Dec 31 16:02:47 2014 -0800

--
 .../org/apache/spark/streaming/receiver/ReceiverMessage.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe6efacc/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e..ab9fa19 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
 private[streaming] object StopReceiver extends ReceiverMessage
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

2014-12-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 14dbd8312 - 434ea009c


[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

Spark Streaming's ReceiverMessage trait should extend Serializable in order to 
fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and 
that message cannot be serialized, then this seems to lead to more-or-less 
silent failures. As an optimization, Akka skips message serialization for 
messages sent within the same JVM. As a result, Spark's unit tests will never 
fail due to non-serializable Akka messages, but these will cause mostly-silent 
failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a 
regular `object`, not a `case object`, StopReceiver will throw serialization 
errors. As a result, graceful receiver shutdown is broken on real clusters (and 
local-cluster mode) but works in local modes. If you want to reproduce this, 
try running the word count example from the Streaming Programming Guide in the 
Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:
val lines = ssc.socketTextStream(localhost, )
// Split each line into words
val words = lines.flatMap(_.split( ))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the 
console
wordCounts.print()
ssc.start()
Thread.sleep(1)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when 
running against a real cluster (it would report that some receivers were not 
shut down).

Author: Josh Rosen joshro...@databricks.com

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend 
Serializable.

(cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/434ea009
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/434ea009
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/434ea009

Branch: refs/heads/branch-1.2
Commit: 434ea009cd7efb2c29e88a889e87f501647a7fa6
Parents: 14dbd83
Author: Josh Rosen joshro...@databricks.com
Authored: Wed Dec 31 16:02:47 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Wed Dec 31 16:03:03 2014 -0800

--
 .../org/apache/spark/streaming/receiver/ReceiverMessage.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/434ea009/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e..ab9fa19 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
 private[streaming] object StopReceiver extends ReceiverMessage
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

2014-12-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 1034707c7 - 61eb9be4b


[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

Spark Streaming's ReceiverMessage trait should extend Serializable in order to 
fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and 
that message cannot be serialized, then this seems to lead to more-or-less 
silent failures. As an optimization, Akka skips message serialization for 
messages sent within the same JVM. As a result, Spark's unit tests will never 
fail due to non-serializable Akka messages, but these will cause mostly-silent 
failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a 
regular `object`, not a `case object`, StopReceiver will throw serialization 
errors. As a result, graceful receiver shutdown is broken on real clusters (and 
local-cluster mode) but works in local modes. If you want to reproduce this, 
try running the word count example from the Streaming Programming Guide in the 
Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:
val lines = ssc.socketTextStream(localhost, )
// Split each line into words
val words = lines.flatMap(_.split( ))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the 
console
wordCounts.print()
ssc.start()
Thread.sleep(1)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when 
running against a real cluster (it would report that some receivers were not 
shut down).

Author: Josh Rosen joshro...@databricks.com

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend 
Serializable.

(cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61eb9be4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61eb9be4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61eb9be4

Branch: refs/heads/branch-1.1
Commit: 61eb9be4b1a6597666d59baf1f53541b47633234
Parents: 1034707
Author: Josh Rosen joshro...@databricks.com
Authored: Wed Dec 31 16:02:47 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Wed Dec 31 16:03:24 2014 -0800

--
 .../org/apache/spark/streaming/receiver/ReceiverMessage.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/61eb9be4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e..ab9fa19 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
 private[streaming] object StopReceiver extends ReceiverMessage
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

2014-12-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 64cd91dca - 5cf94775e


[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable

Spark Streaming's ReceiverMessage trait should extend Serializable in order to 
fix a subtle bug that only occurs when running on a real cluster:

If you attempt to send a fire-and-forget message to a remote Akka actor and 
that message cannot be serialized, then this seems to lead to more-or-less 
silent failures. As an optimization, Akka skips message serialization for 
messages sent within the same JVM. As a result, Spark's unit tests will never 
fail due to non-serializable Akka messages, but these will cause mostly-silent 
failures when running on a real cluster.

Before this patch, here was the code for ReceiverMessage:

```
/** Messages sent to the NetworkReceiver. */
private[streaming] sealed trait ReceiverMessage
private[streaming] object StopReceiver extends ReceiverMessage
```

Since ReceiverMessage does not extend Serializable and StopReceiver is a 
regular `object`, not a `case object`, StopReceiver will throw serialization 
errors. As a result, graceful receiver shutdown is broken on real clusters (and 
local-cluster mode) but works in local modes. If you want to reproduce this, 
try running the word count example from the Streaming Programming Guide in the 
Spark shell:

```
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:
val lines = ssc.socketTextStream(localhost, )
// Split each line into words
val words = lines.flatMap(_.split( ))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the 
console
wordCounts.print()
ssc.start()
Thread.sleep(1)
ssc.stop(true, true)
```

Prior to this patch, this would work correctly in local mode but fail when 
running against a real cluster (it would report that some receivers were not 
shut down).

Author: Josh Rosen joshro...@databricks.com

Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits:

71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend 
Serializable.

(cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cf94775
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cf94775
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cf94775

Branch: refs/heads/branch-1.0
Commit: 5cf94775e3a72247585f6912457c559f69e01fa9
Parents: 64cd91d
Author: Josh Rosen joshro...@databricks.com
Authored: Wed Dec 31 16:02:47 2014 -0800
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Wed Dec 31 16:03:38 2014 -0800

--
 .../org/apache/spark/streaming/receiver/ReceiverMessage.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5cf94775/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e..ab9fa19 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
 package org.apache.spark.streaming.receiver
 
 /** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
 private[streaming] object StopReceiver extends ReceiverMessage
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org