[jira] [Commented] (SPARK-18737) Serialization setting "spark.serializer" ignored in Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-18737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15741466#comment-15741466 ] Dr. Michael Menzel commented on SPARK-18737: We actually want to use Kryo, but want to deactivate it for debugging purposes. We see random Kryo exceptions that occur even before any of our business logic comes into play. It seems some initial cluster communication is having issues from time to time (not constantly). We would like to investigate the actual issue, but the Kryo serializer turns the actual issue into a serialization exception. We hoped with turning off Kryo serialization, we can get to the root cause of the issue. Alternatively, enabling Kryo logging may help. But we tried that already by setting the minlog level to debug. Rarely any output at all. After going through the Kryo code we figured there are hardly any log statements made :( Any idea what could provide us any means to debug the issue? We are on Spark 2.0.1 and 2.0.2, using Spark Streaming to consume messages from SQS. All works fine and stable on Spark 1.6 and below. In Spark 2.x the cluster runs unstable and errors in task communication with workes increase over time. Kryo exceptions are thrown with every communication error. > Serialization setting "spark.serializer" ignored in Spark 2.x > - > > Key: SPARK-18737 > URL: https://issues.apache.org/jira/browse/SPARK-18737 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dr. Michael Menzel > > The following exception occurs although the JavaSerializer has been activated: > 16/11/22 10:49:24 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID > 77, ip-10-121-14-147.eu-central-1.compute.internal, partition 1, RACK_LOCAL, > 5621 bytes) > 16/11/22 10:49:24 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching > task 77 on executor id: 2 hostname: > ip-10-121-14-147.eu-central-1.compute.internal. > 16/11/22 10:49:24 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory > on ip-10-121-14-147.eu-central-1.compute.internal:45059 (size: 879.0 B, free: > 410.4 MB) > 16/11/22 10:49:24 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 77, > ip-10-121-14-147.eu-central-1.compute.internal): > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at org.apache.spark.util.NextIterator.to(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > The code runs perfectly
[jira] [Commented] (SPARK-18737) Serialization setting "spark.serializer" ignored in Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-18737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15729036#comment-15729036 ] Dr. Michael Menzel commented on SPARK-18737: No, we have not registered any classes with Kyro (registerKryoClasses setting). We set conf.set("spark.kryo.registrationRequired", false). > Serialization setting "spark.serializer" ignored in Spark 2.x > - > > Key: SPARK-18737 > URL: https://issues.apache.org/jira/browse/SPARK-18737 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dr. Michael Menzel > > The following exception occurs although the JavaSerializer has been activated: > 16/11/22 10:49:24 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID > 77, ip-10-121-14-147.eu-central-1.compute.internal, partition 1, RACK_LOCAL, > 5621 bytes) > 16/11/22 10:49:24 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching > task 77 on executor id: 2 hostname: > ip-10-121-14-147.eu-central-1.compute.internal. > 16/11/22 10:49:24 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory > on ip-10-121-14-147.eu-central-1.compute.internal:45059 (size: 879.0 B, free: > 410.4 MB) > 16/11/22 10:49:24 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 77, > ip-10-121-14-147.eu-central-1.compute.internal): > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at org.apache.spark.util.NextIterator.to(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now > 2.0.1, we see the Kyro deserialization exception and over time the Spark > streaming job stops processing since too many tasks failed. > Our action was to use conf.set("spark.serializer", > "org.apache.spark.serializer.JavaSerializer") and to disable Kryo class > registration with conf.set("spark.kryo.registrationRequired", false). We hope > to identify the root cause of the exception. > However, setting the serializer to JavaSerializer is oviously ignored by the > Spark-internals. Despite the setting we still see the exception printed in > the log and tasks fail. The occurence seems to be non-deterministic, but to > become more frequent over time. > Several questions we could not answer during our troubleshooting: > 1. How can the debug log for Kryo be enabled? -- We tried following the > minilog documentation, but no output can be found. > 2. Is the serializer setting e
[jira] [Updated] (SPARK-18737) Serialization setting "spark.serializer" ignored in Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-18737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dr. Michael Menzel updated SPARK-18737: --- Priority: Blocker (was: Major) > Serialization setting "spark.serializer" ignored in Spark 2.x > - > > Key: SPARK-18737 > URL: https://issues.apache.org/jira/browse/SPARK-18737 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0, 2.0.1 >Reporter: Dr. Michael Menzel >Priority: Blocker > > The following exception occurs although the JavaSerializer has been activated: > 16/11/22 10:49:24 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID > 77, ip-10-121-14-147.eu-central-1.compute.internal, partition 1, RACK_LOCAL, > 5621 bytes) > 16/11/22 10:49:24 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching > task 77 on executor id: 2 hostname: > ip-10-121-14-147.eu-central-1.compute.internal. > 16/11/22 10:49:24 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory > on ip-10-121-14-147.eu-central-1.compute.internal:45059 (size: 879.0 B, free: > 410.4 MB) > 16/11/22 10:49:24 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 77, > ip-10-121-14-147.eu-central-1.compute.internal): > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at org.apache.spark.util.NextIterator.to(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now > 2.0.1, we see the Kyro deserialization exception and over time the Spark > streaming job stops processing since too many tasks failed. > Our action was to use conf.set("spark.serializer", > "org.apache.spark.serializer.JavaSerializer") and to disable Kryo class > registration with conf.set("spark.kryo.registrationRequired", false). We hope > to identify the root cause of the exception. > However, setting the serializer to JavaSerializer is oviously ignored by the > Spark-internals. Despite the setting we still see the exception printed in > the log and tasks fail. The occurence seems to be non-deterministic, but to > become more frequent over time. > Several questions we could not answer during our troubleshooting: > 1. How can the debug log for Kryo be enabled? -- We tried following the > minilog documentation, but no output can be found. > 2. Is the serializer setting effective for Spark internal serializations? How > can the JavaSerialize be forced on internal serializations for worker to > driv
[jira] [Created] (SPARK-18737) Serialization setting "spark.serializer" ignored in Spark 2.x
Dr. Michael Menzel created SPARK-18737: -- Summary: Serialization setting "spark.serializer" ignored in Spark 2.x Key: SPARK-18737 URL: https://issues.apache.org/jira/browse/SPARK-18737 Project: Spark Issue Type: Bug Affects Versions: 2.0.1, 2.0.0 Reporter: Dr. Michael Menzel The following exception occurs although the JavaSerializer has been activated: 16/11/22 10:49:24 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID 77, ip-10-121-14-147.eu-central-1.compute.internal, partition 1, RACK_LOCAL, 5621 bytes) 16/11/22 10:49:24 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 77 on executor id: 2 hostname: ip-10-121-14-147.eu-central-1.compute.internal. 16/11/22 10:49:24 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on ip-10-121-14-147.eu-central-1.compute.internal:45059 (size: 879.0 B, free: 410.4 MB) 16/11/22 10:49:24 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 77, ip-10-121-14-147.eu-central-1.compute.internal): com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at org.apache.spark.util.NextIterator.to(NextIterator.scala:21) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21) at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now 2.0.1, we see the Kyro deserialization exception and over time the Spark streaming job stops processing since too many tasks failed. Our action was to use conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") and to disable Kryo class registration with conf.set("spark.kryo.registrationRequired", false). We hope to identify the root cause of the exception. However, setting the serializer to JavaSerializer is oviously ignored by the Spark-internals. Despite the setting we still see the exception printed in the log and tasks fail. The occurence seems to be non-deterministic, but to become more frequent over time. Several questions we could not answer during our troubleshooting: 1. How can the debug log for Kryo be enabled? -- We tried following the minilog documentation, but no output can be found. 2. Is the serializer setting effective for Spark internal serializations? How can the JavaSerialize be forced on internal serializations for worker to driver communication? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17766) Write ahead log exception on a toy project
[ https://issues.apache.org/jira/browse/SPARK-17766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686835#comment-15686835 ] Dr. Michael Menzel commented on SPARK-17766: We have the exact same issue when cosnuming messages from an AWS SQS stream. The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now 2.0.1 on AWS EMR, we see the Kyro deserialization exception and over time the job stops processing the stream since too many tasks failed. We used conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") and disabled Kryo class registration with conf.set("spark.kryo.registrationRequired", false). We still see the exceptions in the log and tasks fail. It seems to be non-deterministic, but to build up over time. In addition, setting the serializer to JavaSerializer is ignored by the Spark-internals. How can the debug log for Kryo be enabled? We tried with the minilog documentation, but no output can be found. Any hints for how to debug the issue? Will the serializer settings become effective for Spark internal serializations? > Write ahead log exception on a toy project > -- > > Key: SPARK-17766 > URL: https://issues.apache.org/jira/browse/SPARK-17766 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.0 >Reporter: Nadav Samet >Priority: Minor > > Write ahead log seems to get corrupted when the application is stopped > abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this > exception: > {code} > 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] > executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > ...skipping... > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Code: > {code} > import org.apache.hadoop.conf.Configuration > import org.apache.spark._ > import org.apache.spark.streaming._ > object ProtoDemo { > def createContext(dirName: String) = { > val conf = new SparkConf().setAppName("mything").setMaster("local[4]") > conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") > /* > conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", > "true") > conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", > "true") > */ > val ssc = new StreamingContext(conf, Seconds(1)) > ssc.checkpoint(dirName) > val lines = ssc.socketTextStream("127.0.0.1", ) > val words = lines.flatMap(_.split(" ")) > val pairs = words.map(word => (word, 1)) > val wordCounts = pairs.reduceByKey(_ + _) > val runningCounts = wordCounts.updateStateByKey[Int] { > (values: Seq[Int], oldValue: Option[Int]) => > val s = values.sum > Some(oldValue.fold(s)(_ + s)) > } > // Print the first ten elements of each RDD generated in this DStream to > the console > runningCounts.print() > ssc > } > def main(args: Array[String]) = { > val hadoopConf = new Configuration() > val dirName = "/tmp/chkp" > val ssc = StreamingContext.getOrCreate(dirName, () => > createContext(dirName), hadoopConf) > ssc.start() > ssc.awaitTermination() > } > } > {code} > Steps to reproduce: > 1. I put the code in a repository: git clone > https://github.com/thesamet/spark-issue > 2. in one terminal: {{ while true; do nc -l localhost ; done}} > 3. Start a new terminal > 4. Run "sbt run". > 5. Type a few lines in the netcat terminal. > 6. Kill the streamin