Nadav Samet created SPARK-17766: ----------------------------------- Summary: Write ahead corruption on a toy project Key: SPARK-17766 URL: https://issues.apache.org/jira/browse/SPARK-17766 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 2.0.0 Reporter: Nadav Samet
Write ahead log seems to get corrupted when the application is stopped abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this exception: {code} 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 ...skipping... at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Code: {code} import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.streaming._ object ProtoDemo { def createContext(dirName: String) = { val conf = new SparkConf().setAppName("mything").setMaster("local[4]") conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") /* conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true") conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true") */ val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(dirName) val lines = ssc.socketTextStream("127.0.0.1", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) val runningCounts = wordCounts.updateStateByKey[Int] { (values: Seq[Int], oldValue: Option[Int]) => val s = values.sum Some(oldValue.fold(s)(_ + s)) } // Print the first ten elements of each RDD generated in this DStream to the console runningCounts.print() ssc } def main(args: Array[String]) = { val hadoopConf = new Configuration() val dirName = "/tmp/chkp" val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf) ssc.start() ssc.awaitTermination() } } {code} Steps to reproduce: 1. I put the code in a repository: git clone https://github.com/thesamet/spark-issue 2. in one terminal: {{ while true; do nc -l localhost 9999; done}} 3. In another terminal "sbt run". 4. Type a few lines in the netcat terminal. 5. Kill the streaming project (Ctrl-C), 6. Go back to step 2 until you see the exception above. I tried the above with local filesystem and also with S3, and getting the same result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org