1.How to do it in java?
2.Can broadcast objects also be created in same way  after checkpointing.
3.Is it safe If I disable checkpoint and write offsets at end of each batch
to hdfs in mycode  and somehow specify in my job to use this offset for
creating kafkastream at first time. How can I specify javasparkstreaming
context to use this offsets while creating kafkastream at first time only
and after that use from previous batch interval's offsets..

On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das <t...@databricks.com> wrote:

> Rather than using accumulator directly, what you can do is something like
> this to lazily create an accumulator and use it (will get lazily recreated
> if driver restarts from checkpoint)
>
>
> dstream.transform { rdd =>
>     val accum = SingletonObject.getOrCreateAccumulator()   // single
> object method to create an accumulator or get an already created one.
>     rdd.map { x =>  /// use accum  }
> }
>
>
> On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Hi
>>
>> I am using spark streaming 1.3 and using checkpointing.
>> But job is failing to recover from checkpoint on restart.
>>
>> For broadcast variable it says :
>> 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
>> java.lang.ClassCastException: [B cannot be cast to
>> pkg.broadcastvariableclassname
>> at point where i call bcvariable.value() in map function.
>>
>>  at  mapfunction......
>> at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>         at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 2.For accumulator variable it says :
>> 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
>> ResultTask(1, 16)
>> java.util.NoSuchElementException: key not found: 2
>>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>>         at scala.collection.AbstractMap.default(Map.scala:58)
>>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
>>
>> its descibed in
>> https://issues.apache.org/jira/browse/SPARK-5206
>>
>> I can afford to reset the accumulator to 0 on stream restart . Is it
>> possible to have it working ?
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

Reply via email to