[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16385221#comment-16385221 ] msnreddy commented on SPARK-5206: - Broadcast variable cannot be used with MapwithState if we need to recover from checkpoint directory in Spark streaming. It can only be used inside output operations in that case as it requires Spark context to lazily initialize the broadcast class JavaWordBlacklist { private static volatile Broadcast> instance = null; public static Broadcast> getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaWordBlacklist.class) { if (instance == null) { List wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); } } } return instance; } } class JavaDroppedWordsCounter { private static volatile LongAccumulator instance = null; public static LongAccumulator getInstance(JavaSparkContext jsc) { if (instance == null) { synchronized (JavaDroppedWordsCounter.class) { if (instance == null) { instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); } } } return instance; } } wordCounts.foreachRDD((rdd, time) -> { // Get or register the blacklist Broadcast Broadcast> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); // Get or register the droppedWordsCounter Accumulator LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); // Use blacklist to drop words and use droppedWordsCounter to count them String counts = rdd.filter(wordCount -> { if (blacklist.value().contains(wordCount._1())) { droppedWordsCounter.add(wordCount._2()); return false; } else { return true; } }).collect().toString(); String output = "Counts at time " + time + " " + counts; } > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 1.1.0 >Reporter: vincent ye >Priority: Major > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14975572#comment-14975572 ] Tathagata Das commented on SPARK-5206: -- Same singleton idea. Have a singleton reference to broadcast variable, which gets lazily initialized, and transient (does not get serialized in checkpoints). If it restarts, the singleton will be null, and will have to initialize again before being used for the first time afte recovery. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974515#comment-14974515 ] Anna Kepler commented on SPARK-5206: In our case we have a broadcast variable that needs to be accessed in updateStateBykey() method. How can we resolve that? > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562492#comment-14562492 ] Tathagata Das commented on SPARK-5206: -- Good observations on the limitations. Currently, there are no immediate concrete plans to fix this, but I welcome suggestions. The tricky things is figuring out the right semantics. Your suggestion of reseting the value to zero may suffice your purpose where the value is reset after every batch. However for other purposes, such resetting will lead to non-intuitive behavior in case of failure that hides the problem (accumulator suddenly resetting to zero) but implicitly resetting and not explicitly failing (as it does now). So I am not convinced that automatically resetting to zero is a good solution. But I am open hear everyone's opinions in this matter. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559071#comment-14559071 ] Adrian Tanase commented on SPARK-5206: -- [~tdas] - are there any plans to make accumulators re-register automatically at checkpoint restore? I think this would be a valuable fix even if the values are reset to "zero". For example, we're using them to aggregate some counters that are pushed to OpenTSDB after every micro-batch, after which they are re-set to the initial value. (to avoid rated metrics in OpenTSDB and keep the counters additive) I implemented the above suggestion and it works - but it also has a number of downsides that make it impractical for a codebase the relies heavily on metrics and counters implemented as accumulators: - it seems to limit usage to either output operations or transformed {{DStreams}} - if prevents using accumulators during transformations (e.g. simple like {{map}} or stateful like {{updateStateByKey}}) - one has to wrap all the DStream transformations in {{foreachRDD}} or {{transform}}, altering the semantics of the streaming job heavily and obscuring the business logic - something that goes against Spark's mantra for ease of use Is there another way that I'm missing in the above pattern, that would simplify the implementation by only calling {{getInstance}} once per streaming context? Thanks! > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508271#comment-14508271 ] Zhichao Zhang commented on SPARK-5206: --- [~tdas], thanks for your reply. Looking forward to build-in support for upgrade. :) (y) > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507545#comment-14507545 ] Tathagata Das commented on SPARK-5206: -- 1. Yes, there is no built-in way to persist the latest value of accumulator and recover again. Since everything dies with the driver, recovering the value has to be done by the user. If it is not too much data, you could persist the data to a key-value store after every batch, and then use that data from recover when restarted. 2. There is not built-in support for upgrade YET. Its in my wish list of things to add. But its a non-trivial task. Hopefully I will get to it soon. :) On Wed, Apr 22, 2015 at 9:24 AM, Zhichao Zhang (JIRA) > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507322#comment-14507322 ] Zhichao Zhang commented on SPARK-5206: --- according to the way what you said, it words well, sorry for my carelessness. but there is still a problem that the accumulator start from initial value(0) after recovering from checkpoint, not last value which is what I expected when failure. Any suggestion for this? thanks a lot. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506806#comment-14506806 ] Tathagata Das commented on SPARK-5206: -- That's the incorrect way of doing it. It is ending up trying to create a new accumulator in the executor, which in turn tries to access the sparkContext in the worker, which is throwing this exception. If you read the SQLContext example carefully, i am using the singleton to get or create a SQLContext in the driver (that is, not inside any RDD transformation which would run in the workers). The correct way to do this would be. {code} val oriLogs = lines.transform((rdd: RDD[String], time: Time) => { val accum = AccumulatorsSingleton.getInstance(rdd.sparkContext) // will get or create accumulator at the driver rdd.map(line => { ... accum += 1 // use the accumulator created at driver in the executor ... }) }) {code} On Wed, Apr 22, 2015 at 3:16 AM, Zhichao Zhang (JIRA) > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506770#comment-14506770 ] Zhichao Zhang commented on SPARK-5206: --- [~tdas], thanks for you advice. there are some error with accumulator, as follows: {quote} Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 4 times, most recent failure: Lost task 2.3 in stage 2.0 (TID 11007, np06): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87) at org.apache.spark.rdd.RDD.sparkContext(RDD.scala:137) at cn.cnc.wslogs.TestSSCCheckpoint$$anonfun$2$$anonfun$apply$1.apply(TestSSCCheckpoint.scala:88) at cn.cnc.wslogs.TestSSCCheckpoint$$anonfun$2$$anonfun$apply$1.apply(TestSSCCheckpoint.scala:86) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) {quote}, My code : {code} object AccumulatorsSingleton { @transient private var instance: Accumulator[Long] = _ def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { instance = sc.accumulator(0l, "Record Count") } instance } } {code} {code} val oriLogs = lines.transform((rdd: RDD[String], time: Time) => { rdd.map(line => { val lineArr = line.split("\t\\a") AccumulatorsSingleton.getInstance(rdd.sparkContext) += 1 val fminute = lineArr(2).toLong / 300 * 300 val sample = lineArr(8).toDouble val flow = if (lineArr(9).indexOf("-") != -1) 0.0 else lineArr(9).toDouble * sample val requestCnt = 1.0 * sample ((lineArr(1), lineArr(3), lineArr(23), lineArr(21), lineArr(22), lineArr(24), fminute), (flow, requestCnt)) }) }) .reduceByKey((pair1, pair2) => { (pair1._1 + pair2._1, pair1._2 + pair2._2) }, numPartitions = config.getInt("numPartitions")) {code} Is *AccumulatorsSingleton.getInstance(rdd.sparkContext) += 1* a action? > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506290#comment-14506290 ] Tathagata Das commented on SPARK-5206: -- This has not been solved yet. This is a tricky problem as the data of the accumulator not saved through the restart. However, you could try a different approach. See how I suggest using a SQLContext in the relevant section in the streaming programming guide (http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations). This approach of getting or creating a singleton object allows you to either keep using the same SQLContext that you have once created, or again create a new one (using the recovered SparkContext) after recovery. The same approach could work for the accumulators and broadcast variables. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506272#comment-14506272 ] Zhichao Zhang commented on SPARK-5206: --- how to solve this, no progress yet? > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341343#comment-14341343 ] zzc commented on SPARK-5206: I have same problem, [~vincentye38], how to resolve this? > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14294339#comment-14294339 ] vincent ye commented on SPARK-5206: --- I suggest to add life-cycle events into DStream, like start, stop and restart. So we can put initialization code into them. Do you know where to submit suggestion to Spark team? > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14292896#comment-14292896 ] Saisai Shao commented on SPARK-5206: IMHO I think this is a general problem in Spark Streaming, any variable which should be registered both in driver and executor side will lead to error when recovering from failure if the behavior of readObject lacks of driver re-register. Also object like broadcast variable will also meet exception when recovering from checkpoint, since actual data is lost in executor side, and recovery from driver side is not possible if I understand correctly. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277554#comment-14277554 ] vincent ye commented on SPARK-5206: --- Hi Tathagata, Accumulator object is created after the StreamingContext (ssc) created using: val counter = ssc.sparkContext.accumulator() The way I create the recoverable ssc is like this: val ssc = StreamingContext.getOrCreate("/spark/applicationName", functionToCreateContext()) def functionToCreateContext = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val counter = ssc.sparkContext.accumulator(0L, "message received") . dstream.foreach(counter += 1) .. ssc } ssc.start ssc.awaitTermination() If the app is recovering from checkpoint, It won't execute functionToCreateContext. Then the counter of Accumulator won't be instantiated and registered to Accumulators singleton object. But the counter inside dstream.foreach(counter += 1) will be created by deserializer on a remote worker which won't register the counter to the driver. I don't understand what you mean explicity referencing the Accumulator object .. Thanks > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277552#comment-14277552 ] vincent ye commented on SPARK-5206: --- Hi Tathagata, Accumulator object is created after the StreamingContext (ssc) created using: val counter = ssc.sparkContext.accumulator() The way I create the recoverable ssc is like this: val ssc = StreamingContext.getOrCreate("/spark/applicationName", functionToCreateContext()) def functionToCreateContext = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val counter = ssc.sparkContext.accumulator(0L, "message received") . dstream.foreach(counter += 1) .. ssc } ssc.start ssc.awaitTermination() If the app is recovering from checkpoint, It won't execute functionToCreateContext. Then the counter of Accumulator won't be instantiated and registered to Accumulators singleton object. But the counter inside dstream.foreach(counter += 1) will be created by deserializer on a remote worker which won't register the counter to the driver. I don't understand what you mean explicity referencing the Accumulator object .. Thanks > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14274681#comment-14274681 ] Tathagata Das commented on SPARK-5206: -- Interesting observation! Can this be solved just by explicitly referencing the Accumulator object in the beginning of your program? If that works, then we can add this reference to Accumulator in the StreamingContext object to make sure it is automatically called. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > I guess that an Accumulator is registered to a singleton Accumulators in Line > 58 of org.apache.spark.Accumulable: > Accumulators.register(this, true) > This code need to be executed in the driver once. But when the application is > recovered from checkpoint. It won't be executed in the driver. So when the > driver process it at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), > It can't find the Accumulator because it's not re-register during the > recovery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273927#comment-14273927 ] vincent ye commented on SPARK-5206: --- I guess that an Accumulator is registered to a singleton Accumulators in Line 58 of org.apache.spark.Accumulable: Accumulators.register(this, true) This code need to be executed in the driver once. But when the application is recovered from checkpoint. It won't be executed in the driver. So when the driver process it at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), It can't find the Accumulator because it's not re-register during the recovery. > Accumulators are not re-registered during recovering from checkpoint > > > Key: SPARK-5206 > URL: https://issues.apache.org/jira/browse/SPARK-5206 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: vincent ye > > I got exception as following while my streaming application restarts from > crash from checkpoit: > 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR > scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, > 4) > java.util.NoSuchElementException: key not found: 1 > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org