[ https://issues.apache.org/jira/browse/SPARK-17766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu resolved SPARK-17766. ---------------------------------- Resolution: Duplicate This has been fixed in SPARK-18617 > Write ahead log exception on a toy project > ------------------------------------------ > > Key: SPARK-17766 > URL: https://issues.apache.org/jira/browse/SPARK-17766 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.0 > Reporter: Nadav Samet > Priority: Minor > > Write ahead log seems to get corrupted when the application is stopped > abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this > exception: > {code} > 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] > executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > ...skipping... > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Code: > {code} > import org.apache.hadoop.conf.Configuration > import org.apache.spark._ > import org.apache.spark.streaming._ > object ProtoDemo { > def createContext(dirName: String) = { > val conf = new SparkConf().setAppName("mything").setMaster("local[4]") > conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") > /* > conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", > "true") > conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", > "true") > */ > val ssc = new StreamingContext(conf, Seconds(1)) > ssc.checkpoint(dirName) > val lines = ssc.socketTextStream("127.0.0.1", 9999) > val words = lines.flatMap(_.split(" ")) > val pairs = words.map(word => (word, 1)) > val wordCounts = pairs.reduceByKey(_ + _) > val runningCounts = wordCounts.updateStateByKey[Int] { > (values: Seq[Int], oldValue: Option[Int]) => > val s = values.sum > Some(oldValue.fold(s)(_ + s)) > } > // Print the first ten elements of each RDD generated in this DStream to > the console > runningCounts.print() > ssc > } > def main(args: Array[String]) = { > val hadoopConf = new Configuration() > val dirName = "/tmp/chkp" > val ssc = StreamingContext.getOrCreate(dirName, () => > createContext(dirName), hadoopConf) > ssc.start() > ssc.awaitTermination() > } > } > {code} > Steps to reproduce: > 1. I put the code in a repository: git clone > https://github.com/thesamet/spark-issue > 2. in one terminal: {{ while true; do nc -l localhost 9999; done}} > 3. Start a new terminal > 4. Run "sbt run". > 5. Type a few lines in the netcat terminal. > 6. Kill the streaming project (Ctrl-C), > 7. Go back to step 4 until you see the exception above. > I tried the above with local filesystem and also with S3, and getting the > same result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org