[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2018-03-04 Thread msnreddy (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16385221#comment-16385221
 ] 

msnreddy commented on SPARK-5206:
-

Broadcast variable cannot be used with MapwithState if we need to recover from 
checkpoint directory in Spark streaming. It can only be used inside output 
operations in that case as it requires Spark context to lazily initialize the 
broadcast


 class JavaWordBlacklist {
 
 private static volatile Broadcast> instance = null;
 
 public static Broadcast> getInstance(JavaSparkContext jsc) {
 if (instance == null) {
 synchronized (JavaWordBlacklist.class) {
 if (instance == null) {
 List wordBlacklist = Arrays.asList("a", "b", "c");
 instance = jsc.broadcast(wordBlacklist);
 }
 }
 }
 return instance;
 }
 }
 
 class JavaDroppedWordsCounter {
 
 private static volatile LongAccumulator instance = null;
 
 public static LongAccumulator getInstance(JavaSparkContext jsc) {
 if (instance == null) {
 synchronized (JavaDroppedWordsCounter.class) {
 if (instance == null) {
 instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
 }
 }
 }
 return instance;
 }
 }
 
 wordCounts.foreachRDD((rdd, time) -> {
 // Get or register the blacklist Broadcast
 Broadcast> blacklist = JavaWordBlacklist.getInstance(new 
JavaSparkContext(rdd.context()));
 // Get or register the droppedWordsCounter Accumulator
 LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new 
JavaSparkContext(rdd.context()));
 // Use blacklist to drop words and use droppedWordsCounter to count them
 String counts = rdd.filter(wordCount -> {
 if (blacklist.value().contains(wordCount._1())) {
 droppedWordsCounter.add(wordCount._2());
 return false;
 } else {
 return true;
 }
 }).collect().toString();
 String output = "Counts at time " + time + " " + counts;
 }

 

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 1.1.0
>Reporter: vincent ye
>Priority: Major
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v

[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-10-26 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14975572#comment-14975572
 ] 

Tathagata Das commented on SPARK-5206:
--

Same singleton idea. Have a singleton reference to broadcast variable, which 
gets lazily initialized, and transient (does not get serialized in 
checkpoints). If it restarts, the singleton will be null, and will have to 
initialize again before being used for the first time afte recovery.

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-10-26 Thread Anna Kepler (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974515#comment-14974515
 ] 

Anna Kepler commented on SPARK-5206:


In our case we have a broadcast variable that needs to be accessed in 
updateStateBykey() method. 
How can we resolve that?

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-05-28 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14562492#comment-14562492
 ] 

Tathagata Das commented on SPARK-5206:
--

Good observations on the limitations. Currently, there are no immediate 
concrete plans to fix this, but I welcome suggestions. The tricky things is 
figuring out the right semantics. Your suggestion of reseting the value to zero 
may suffice your purpose where the value is reset after every batch. However 
for other purposes, such resetting will lead to non-intuitive behavior in case 
of failure that hides the problem (accumulator suddenly resetting to zero) but 
implicitly resetting and not explicitly failing (as it does now). So I am not 
convinced that automatically resetting to zero is a good solution. But I am 
open hear everyone's opinions in this matter. 



> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-05-26 Thread Adrian Tanase (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559071#comment-14559071
 ] 

Adrian Tanase commented on SPARK-5206:
--

[~tdas] - are there any plans to make accumulators re-register automatically at 
checkpoint restore? I think this would be a valuable fix even if the values are 
reset to "zero".
For example, we're using them to aggregate some counters that are pushed to 
OpenTSDB after every micro-batch, after which they are re-set to the initial 
value. (to avoid rated metrics in OpenTSDB and keep the counters additive)

I implemented the above suggestion and it works - but it also has a number of 
downsides that make it impractical for a codebase the relies heavily on metrics 
and counters implemented as accumulators:
- it seems to limit usage to either output operations or transformed 
{{DStreams}}
- if prevents using accumulators during transformations (e.g. simple like 
{{map}} or stateful like {{updateStateByKey}})
- one has to wrap all the DStream transformations in {{foreachRDD}} or 
{{transform}}, altering the semantics of the streaming job heavily and 
obscuring the business logic - something that goes against Spark's mantra for 
ease of use

Is there another way that I'm missing in the above pattern, that would simplify 
the implementation by only calling {{getInstance}} once per streaming context?

Thanks!

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-22 Thread Zhichao Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14508271#comment-14508271
 ] 

Zhichao  Zhang commented on SPARK-5206:
---

[~tdas], thanks for your reply. Looking forward to build-in support for 
upgrade. :) (y)

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-22 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507545#comment-14507545
 ] 

Tathagata Das commented on SPARK-5206:
--

1. Yes, there is no built-in way to persist the latest value of accumulator
and recover again. Since everything dies with the driver, recovering the
value has to be done by the user. If it is not too much data, you could
persist the data to a key-value store after every batch, and then use that
data from recover when restarted.

2. There is not built-in support for upgrade YET. Its in my wish list of
things to add. But its a non-trivial task. Hopefully I will get to it soon.
:)



On Wed, Apr 22, 2015 at 9:24 AM, Zhichao Zhang (JIRA) 



> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-22 Thread Zhichao Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14507322#comment-14507322
 ] 

Zhichao  Zhang commented on SPARK-5206:
---

according to the way what you said, it words well, sorry for my carelessness.
but there is still a problem that the accumulator start from initial value(0)  
after recovering from checkpoint, not last value which  is what I expected when 
failure. Any suggestion for this?  thanks a lot.





> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-22 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506806#comment-14506806
 ] 

Tathagata Das commented on SPARK-5206:
--

That's the incorrect way of doing it. It is ending up trying to create a
new accumulator in the executor, which in turn tries to access the
sparkContext in the worker, which is throwing this exception. If you read
the SQLContext example carefully, i am using the singleton to get or create
a SQLContext in the driver (that is, not inside any RDD transformation
which would run in the workers). The correct way to do this would be.

{code}

  val oriLogs = lines.transform((rdd: RDD[String], time: Time) => {
val accum = AccumulatorsSingleton.getInstance(rdd.sparkContext)
 // will get or create accumulator  at the driver

rdd.map(line => {
  ...
  accum += 1   // use the accumulator created at driver in the
executor
  ...
})
  })
{code}




On Wed, Apr 22, 2015 at 3:16 AM, Zhichao Zhang (JIRA) 



> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-22 Thread Zhichao Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506770#comment-14506770
 ] 

Zhichao  Zhang commented on SPARK-5206:
---

[~tdas], thanks for you advice.
there are some error with accumulator, as follows:
{quote}
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 2 in stage 2.0 failed 4 times, most recent failure: Lost 
task 2.3 in stage 2.0 (TID 11007, np06): org.apache.spark.SparkException: RDD 
transformations and actions can only be invoked by the driver, not inside of 
other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is 
invalid because the values transformation and count action cannot be performed 
inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.sparkContext(RDD.scala:137)
at 
cn.cnc.wslogs.TestSSCCheckpoint$$anonfun$2$$anonfun$apply$1.apply(TestSSCCheckpoint.scala:88)
at 
cn.cnc.wslogs.TestSSCCheckpoint$$anonfun$2$$anonfun$apply$1.apply(TestSSCCheckpoint.scala:86)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
{quote}, 
My code :
{code}
object AccumulatorsSingleton {
  
  @transient private var instance: Accumulator[Long] = _
  
  def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
  instance = sc.accumulator(0l, "Record Count")
}
instance
  }
}
{code}
{code}
val oriLogs = lines.transform((rdd: RDD[String], time: Time) => {
rdd.map(line => {
  val lineArr = line.split("\t\\a")
  AccumulatorsSingleton.getInstance(rdd.sparkContext) += 1
  
  val fminute = lineArr(2).toLong / 300 * 300
  
  val sample = lineArr(8).toDouble
  val flow = if (lineArr(9).indexOf("-") != -1) 0.0 else 
lineArr(9).toDouble * sample
  val requestCnt = 1.0 * sample
  
  ((lineArr(1), lineArr(3), lineArr(23), lineArr(21), lineArr(22), 
lineArr(24), fminute), (flow, requestCnt))
})
  }) 
  .reduceByKey((pair1, pair2) => {
(pair1._1 + pair2._1, pair1._2 + pair2._2)
  }, numPartitions = config.getInt("numPartitions"))
{code}

Is *AccumulatorsSingleton.getInstance(rdd.sparkContext) += 1* a action?

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(

[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-21 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506290#comment-14506290
 ] 

Tathagata Das commented on SPARK-5206:
--

This has not been solved yet. This is a tricky problem as the data of the 
accumulator not saved through the restart. 

However, you could try a different approach. See how I suggest using a 
SQLContext in the relevant section in the streaming programming guide 
(http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations).
 This approach of getting or creating a singleton object allows you to either 
keep using the same SQLContext that you have once created, or again create a 
new one (using the recovered SparkContext) after recovery. The same approach 
could work for the accumulators and broadcast variables. 


> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-04-21 Thread Zhichao Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14506272#comment-14506272
 ] 

Zhichao  Zhang commented on SPARK-5206:
---

how to solve this, no progress yet?

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-02-27 Thread zzc (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14341343#comment-14341343
 ] 

zzc commented on SPARK-5206:


I have same problem, [~vincentye38], how to resolve this?

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-27 Thread vincent ye (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14294339#comment-14294339
 ] 

vincent ye commented on SPARK-5206:
---

I suggest to add life-cycle events into DStream, like start, stop and restart. 
So we can put initialization code into them. Do you know where to submit 
suggestion to Spark team?

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-26 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14292896#comment-14292896
 ] 

Saisai Shao commented on SPARK-5206:


IMHO I think this is a general problem in Spark Streaming, any variable which 
should be registered both in driver and executor side will lead to error when 
recovering from failure if the behavior of readObject lacks of driver 
re-register.

Also object like broadcast variable will also meet exception when recovering 
from checkpoint, since actual data is lost in executor side, and recovery from 
driver side is not possible if I understand correctly.

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-14 Thread vincent ye (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277554#comment-14277554
 ] 

vincent ye commented on SPARK-5206:
---

Hi Tathagata,
Accumulator object is created after the StreamingContext (ssc) created using:
val counter = ssc.sparkContext.accumulator()
The way I create the recoverable ssc is like this:
val ssc = StreamingContext.getOrCreate("/spark/applicationName", 
functionToCreateContext())
def functionToCreateContext =
{ val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new 
StreamingContext(conf, Seconds(1)) val counter = 
ssc.sparkContext.accumulator(0L, "message received") . 
dstream.foreach(counter += 1) .. ssc }
ssc.start
ssc.awaitTermination()
If the app is recovering from checkpoint, It won't execute 
functionToCreateContext. Then the counter of Accumulator won't be instantiated 
and registered to Accumulators singleton object. But the counter inside 
dstream.foreach(counter += 1) will be created by deserializer on a remote 
worker which won't register the counter to the driver.
I don't understand what you mean explicity referencing the Accumulator object 
..
Thanks

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-14 Thread vincent ye (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277552#comment-14277552
 ] 

vincent ye commented on SPARK-5206:
---

Hi Tathagata,
Accumulator object is created after the StreamingContext (ssc) created using:
val counter = ssc.sparkContext.accumulator()

The way I create the recoverable ssc is like this:

val ssc = StreamingContext.getOrCreate("/spark/applicationName", 
functionToCreateContext())
def functionToCreateContext = {
   val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
   val ssc = new StreamingContext(conf, Seconds(1))
   val counter = ssc.sparkContext.accumulator(0L, "message received")
   .
   dstream.foreach(counter += 1)
   ..
   ssc
}

ssc.start
ssc.awaitTermination()


If the app is recovering from checkpoint, It won't execute 
functionToCreateContext. Then the counter of Accumulator won't be instantiated 
and registered to Accumulators singleton object. But the counter inside 
dstream.foreach(counter += 1) will be created by deserializer on a remote 
worker which won't register the counter to the driver.

I don't understand what you mean explicity referencing the Accumulator object 
..



Thanks
 

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-12 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14274681#comment-14274681
 ] 

Tathagata Das commented on SPARK-5206:
--

Interesting observation! Can this be solved just by explicitly referencing the 
Accumulator object in the beginning of your program? If that works, then we can 
add this reference to Accumulator in the StreamingContext object to make sure 
it is automatically called. 

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 
> 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is 
> recovered from checkpoint. It won't be executed in the driver. So when the 
> driver process it at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
>  It can't find the Accumulator because it's not re-register during the 
> recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

2015-01-12 Thread vincent ye (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14273927#comment-14273927
 ] 

vincent ye commented on SPARK-5206:
---

I guess that an Accumulator is registered to a singleton Accumulators in Line 
58 of org.apache.spark.Accumulable:
Accumulators.register(this, true)

This code need to be executed in the driver once. But when the application is 
recovered from checkpoint. It won't be executed in the driver. So when the 
driver process it at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938),
 It can't find the Accumulator because it's not re-register during the recovery.

> Accumulators are not re-registered during recovering from checkpoint
> 
>
> Key: SPARK-5206
> URL: https://issues.apache.org/jira/browse/SPARK-5206
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from 
> crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR 
> scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 
> 4)
> java.util.NoSuchElementException: key not found: 1
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org