[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974515#comment-14974515 ]
Anna Kepler commented on SPARK-5206: ------------------------------------ In our case we have a broadcast variable that needs to be accessed in updateStateBykey() method. How can we resolve that? > Accumulators are not re-registered during recovering from checkpoint > -------------------------------------------------------------------- > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.1.0 > Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org