[ 
https://issues.apache.org/jira/browse/SPARK-17766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686835#comment-15686835
 ] 

Dr. Michael Menzel commented on SPARK-17766:
--------------------------------------------

We have the exact same issue when cosnuming messages from an AWS SQS stream. 
The code runs perfectly with Spark 1.6.0. Since we moved to 2.0.0 and now 2.0.1 
on AWS EMR, we see the Kyro deserialization exception and over time the job 
stops processing the stream since too many tasks failed. 

We used conf.set("spark.serializer", 
"org.apache.spark.serializer.JavaSerializer") and disabled Kryo class 
registration with conf.set("spark.kryo.registrationRequired", false).
We still see the exceptions in the log and tasks fail. It seems to be 
non-deterministic, but to build up over time.
In addition, setting the serializer to JavaSerializer is ignored by the 
Spark-internals.

How can the debug log for Kryo be enabled? We tried with the minilog 
documentation, but no output can be found.
Any hints for how to debug the issue? 
Will the serializer settings become effective for Spark internal 
serializations? 

> Write ahead log exception on a toy project
> ------------------------------------------
>
>                 Key: SPARK-17766
>                 URL: https://issues.apache.org/jira/browse/SPARK-17766
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.0
>            Reporter: Nadav Samet
>            Priority: Minor
>
> Write ahead log seems to get corrupted when the application is stopped 
> abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this 
> exception:
> {code}
> 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] 
> executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
> 13994
> ...skipping...
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>         at 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>         at 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>         at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Code:
> {code}
> import org.apache.hadoop.conf.Configuration
> import org.apache.spark._
> import org.apache.spark.streaming._
> object ProtoDemo {
>   def createContext(dirName: String) = {
>     val conf = new SparkConf().setAppName("mything").setMaster("local[4]")
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>     /*
>     conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", 
> "true")
>     conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", 
> "true")
>     */
>     val ssc = new StreamingContext(conf, Seconds(1))
>     ssc.checkpoint(dirName)
>     val lines = ssc.socketTextStream("127.0.0.1", 9999)
>     val words = lines.flatMap(_.split(" "))
>     val pairs = words.map(word => (word, 1))
>     val wordCounts = pairs.reduceByKey(_ + _)
>     val runningCounts = wordCounts.updateStateByKey[Int] {
>       (values: Seq[Int], oldValue: Option[Int]) =>
>         val s = values.sum
>         Some(oldValue.fold(s)(_ + s))
>       }
>   // Print the first ten elements of each RDD generated in this DStream to 
> the console
>     runningCounts.print()
>     ssc
>   }
>   def main(args: Array[String]) = {
>     val hadoopConf = new Configuration()
>     val dirName = "/tmp/chkp"
>     val ssc = StreamingContext.getOrCreate(dirName, () => 
> createContext(dirName), hadoopConf)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> Steps to reproduce:
> 1. I put the code in a repository: git clone 
> https://github.com/thesamet/spark-issue
> 2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
> 3. Start a new terminal
> 4. Run "sbt run".
> 5. Type a few lines in the netcat terminal.
> 6. Kill the streaming project (Ctrl-C), 
> 7. Go back to step 4 until you see the exception above.
> I tried the above with local filesystem and also with S3, and getting the 
> same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to