Nadav Samet created SPARK-17766:
-----------------------------------

             Summary: Write ahead corruption on a toy project
                 Key: SPARK-17766
                 URL: https://issues.apache.org/jira/browse/SPARK-17766
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 2.0.0
            Reporter: Nadav Samet


Write ahead log seems to get corrupted when the application is stopped abruptly 
(Ctrl-C, or kill). Then, the application refuses to run due to this exception:

{code}
2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] 
executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
13994
...skipping...
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
        at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
        at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{code}

Code:
{code}
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.streaming._

object ProtoDemo {
  def createContext(dirName: String) = {
    val conf = new SparkConf().setAppName("mything").setMaster("local[4]")
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    /*
    conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
    conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", 
"true")
    */

    val ssc = new StreamingContext(conf, Seconds(1))
    ssc.checkpoint(dirName)
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    val runningCounts = wordCounts.updateStateByKey[Int] {
      (values: Seq[Int], oldValue: Option[Int]) =>
        val s = values.sum
        Some(oldValue.fold(s)(_ + s))
      }

  // Print the first ten elements of each RDD generated in this DStream to the 
console
    runningCounts.print()
    ssc
  }

  def main(args: Array[String]) = {
    val hadoopConf = new Configuration()
    val dirName = "/tmp/chkp"
    val ssc = StreamingContext.getOrCreate(dirName, () => 
createContext(dirName), hadoopConf)
    ssc.start()
    ssc.awaitTermination()
  }
}
{code}

Steps to reproduce:
1. I put the code in a repository: git clone 
https://github.com/thesamet/spark-issue
2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
3. In another terminal "sbt run".
4. Type a few lines in the netcat terminal.
5. Kill the streaming project (Ctrl-C), 
6. Go back to step 2 until you see the exception above.

I tried the above with local filesystem and also with S3, and getting the same 
result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to