spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Repository: spark Updated Branches: refs/heads/master c4f0b4f33 - fe6efacc0 [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster: If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster. Before this patch, here was the code for ReceiverMessage: ``` /** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage ``` Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell: ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost: val lines = ssc.socketTextStream(localhost, ) // Split each line into words val words = lines.flatMap(_.split( )) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(1) ssc.stop(true, true) ``` Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down). Author: Josh Rosen joshro...@databricks.com Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits: 71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe6efacc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe6efacc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe6efacc Branch: refs/heads/master Commit: fe6efacc0b865e9e827a1565877077000e63976e Parents: c4f0b4f Author: Josh Rosen joshro...@databricks.com Authored: Wed Dec 31 16:02:47 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Wed Dec 31 16:02:47 2014 -0800 -- .../org/apache/spark/streaming/receiver/ReceiverMessage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe6efacc/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index bf39d1e..ab9fa19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait ReceiverMessage +private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Repository: spark Updated Branches: refs/heads/branch-1.2 14dbd8312 - 434ea009c [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster: If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster. Before this patch, here was the code for ReceiverMessage: ``` /** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage ``` Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell: ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost: val lines = ssc.socketTextStream(localhost, ) // Split each line into words val words = lines.flatMap(_.split( )) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(1) ssc.stop(true, true) ``` Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down). Author: Josh Rosen joshro...@databricks.com Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits: 71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable. (cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/434ea009 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/434ea009 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/434ea009 Branch: refs/heads/branch-1.2 Commit: 434ea009cd7efb2c29e88a889e87f501647a7fa6 Parents: 14dbd83 Author: Josh Rosen joshro...@databricks.com Authored: Wed Dec 31 16:02:47 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Wed Dec 31 16:03:03 2014 -0800 -- .../org/apache/spark/streaming/receiver/ReceiverMessage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/434ea009/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index bf39d1e..ab9fa19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait ReceiverMessage +private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Repository: spark Updated Branches: refs/heads/branch-1.1 1034707c7 - 61eb9be4b [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster: If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster. Before this patch, here was the code for ReceiverMessage: ``` /** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage ``` Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell: ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost: val lines = ssc.socketTextStream(localhost, ) // Split each line into words val words = lines.flatMap(_.split( )) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(1) ssc.stop(true, true) ``` Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down). Author: Josh Rosen joshro...@databricks.com Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits: 71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable. (cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61eb9be4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61eb9be4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61eb9be4 Branch: refs/heads/branch-1.1 Commit: 61eb9be4b1a6597666d59baf1f53541b47633234 Parents: 1034707 Author: Josh Rosen joshro...@databricks.com Authored: Wed Dec 31 16:02:47 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Wed Dec 31 16:03:24 2014 -0800 -- .../org/apache/spark/streaming/receiver/ReceiverMessage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/61eb9be4/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index bf39d1e..ab9fa19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait ReceiverMessage +private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Repository: spark Updated Branches: refs/heads/branch-1.0 64cd91dca - 5cf94775e [SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster: If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster. Before this patch, here was the code for ReceiverMessage: ``` /** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage ``` Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell: ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost: val lines = ssc.socketTextStream(localhost, ) // Split each line into words val words = lines.flatMap(_.split( )) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(1) ssc.stop(true, true) ``` Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down). Author: Josh Rosen joshro...@databricks.com Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits: 71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable. (cherry picked from commit fe6efacc0b865e9e827a1565877077000e63976e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cf94775 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cf94775 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cf94775 Branch: refs/heads/branch-1.0 Commit: 5cf94775e3a72247585f6912457c559f69e01fa9 Parents: 64cd91d Author: Josh Rosen joshro...@databricks.com Authored: Wed Dec 31 16:02:47 2014 -0800 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Wed Dec 31 16:03:38 2014 -0800 -- .../org/apache/spark/streaming/receiver/ReceiverMessage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5cf94775/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala index bf39d1e..ab9fa19 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala @@ -18,6 +18,6 @@ package org.apache.spark.streaming.receiver /** Messages sent to the NetworkReceiver. */ -private[streaming] sealed trait ReceiverMessage +private[streaming] sealed trait ReceiverMessage extends Serializable private[streaming] object StopReceiver extends ReceiverMessage - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org