[ https://issues.apache.org/jira/browse/SPARK-16480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15377616#comment-15377616 ]
Sean Owen commented on SPARK-16480: ----------------------------------- It's just reading serialized data, here apparently using Java serialization. > Streaming checkpointing does not work well with SIGTERM > ------------------------------------------------------- > > Key: SPARK-16480 > URL: https://issues.apache.org/jira/browse/SPARK-16480 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.6.1 > Reporter: Stavros Kontopoulos > > A customer gets the following exception when tries to stop gracefully a > streaming job with SIGTERM: > {quote} > org.apache.spark.SparkException: RDD transformations and actions can only be > invoked by the driver, not inside of other transformations; for example, > rdd1.map(x => rdd2.values.count() * x) is invalid because the values > transformation and count action cannot be performed inside of the rdd1.map > transformation. For more information, see SPARK-5063. > at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > {quote} > This obviously implies invalid state for checkpointing data. This can be > reproduced easily with the following code skeleton (Kafka direct stream is > used): > {quote} > val dataStream = myKafkaDirectStream. > .mapWithState(stateSpec).stateSnapshots().foreachRDD { data => > // back up the state > data.cache() > data.collect().foreach(println) > data.map { x => x._1 + "," + x._2 }.saveAsTextFile(inputPath) > .. > } > {quote} > inputPath above is both the initialRdd and the checkpointing dir (using a > different path does not affect the issue). > Streaming context is created correctly with getOrCreate and all > transformations and actions are put in > that function. > In order to reproduce you just run the job (1st run), stop it with kill -15 > ... and then restart it (2nd run). > To reproduce the issue you can use an empty local folder for the "inputPath" > the checkpointing path. > The checkpointing blocks from the first run: > {quote} > 16/07/11 13:59:51 DEBUG DirectKafkaInputDStream: Updated checkpoint data for > time 1468234791000 ms: [ > 4 checkpoint files > 1468234791000 ms -> [Lscala.Tuple4;@60a55c45 > 1468234790000 ms -> [Lscala.Tuple4;@58e5be3 > 1468234789000 ms -> [Lscala.Tuple4;@13cf6be7 > 1468234788000 ms -> [Lscala.Tuple4;@6017d6ae > ] > 16/07/11 13:59:51 DEBUG InternalMapWithStateDStream: Updated checkpoint data > for time 1468234791000 ms: [ > 0 checkpoint files > ] > 16/07/11 13:59:51 DEBUG FlatMappedDStream: Updated checkpoint data for time > 1468234791000 ms: [ > 0 checkpoint files > ] > 16/07/11 13:59:51 DEBUG ForEachDStream: Updated checkpoint data for time > 1468234791000 ms: [ > 0 checkpoint files > ] > {quote} > In the first run i successfully see: > {quote} > 16/07/11 13:59:49 INFO StreamingContext: Invoking stop(stopGracefully=true) > from shutdown hook > {quote} > Here is the log output from the second run, when the recovery occurs from the > checkpointing data: > {quote} > 16/07/11 14:00:02 DEBUG FileBasedWriteAheadLogReader: Error reading next > item, EOF reached > java.io.EOFException > at java.io.DataInputStream.readInt(DataInputStream.java:392) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:432) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:212) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$recoverPastEvents$1.apply(ReceivedBlockTracker.scala:210) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.recoverPastEvents(ReceivedBlockTracker.scala:210) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:81) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) > at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122) > 16/07/11 14:00:02 INFO JobGenerator: Batches during down time (12 batches): > 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, > 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, > 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms > 16/07/11 14:00:02 INFO JobGenerator: Batches pending processing (0 batches): > 16/07/11 14:00:02 INFO JobGenerator: Batches to reschedule (12 batches): > 1468234791000 ms, 1468234792000 ms, 1468234793000 ms, 1468234794000 ms, > 1468234795000 ms, 1468234796000 ms, 1468234797000 ms, 1468234798000 ms, > 1468234799000 ms, 1468234800000 ms, 1468234801000 ms, 1468234802000 ms > 16/07/11 14:00:02 DEBUG DStreamGraph: Generating jobs for time 1468234791000 > ms > 16/07/11 14:00:02 DEBUG FlatMappedDStream: Time 1468234791000 ms is valid > 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234791000 ms is > valid > 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234790000 ms is > valid > 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234789000 ms is > valid > 16/07/11 14:00:02 DEBUG InternalMapWithStateDStream: Time 1468234788000 ms is > valid > 16/07/11 14:00:02 INFO InternalMapWithStateDStream: Time 1468234787000 ms is > invalid as zeroTime is 1468234787000 ms and slideDuration is 1000 ms and > difference is 0 ms > 16/07/11 14:00:02 ERROR StreamingContext: Error starting the context, marking > it as stopped > org.apache.spark.SparkException: RDD transformations and actions can only be > invoked by the driver, not inside of other transformations; for example, > rdd1.map(x => rdd2.values.count() * x) is invalid because the values > transformation and count action cannot be performed inside of the rdd1.map > transformation. For more information, see SPARK-5063. > at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at > org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530) > at > org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189) > {quote} > Maybe the first exception in the 2nd run relates to: > https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3CCAMz94CGJzCv6yHW1mOTU2QRX=Pdu2k_PCvJ0++JPNM=upfg...@mail.gmail.com%3E > and causes the issue (not sure what is happening there). > The issue was reported to happen also when using HDFS. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org