Can you try changing classOf[OutputFormat[String,
BoxedUnit]] to classOf[OutputFormat[String,
Put]] while configuring hconf?

On Sat, Oct 17, 2015, 11:44 AM Amit Hora <hora.a...@gmail.com> wrote:

> Hi,
>
> Regresta for delayed resoonse
> please find below full stack trace
>
> ava.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
>     at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> 15/10/16 18:50:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, localhost, ANY, 1185 bytes)
> 15/10/16 18:50:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> 15/10/16 18:50:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be
> cast to org.apache.hadoop.hbase.client.Mutation
>     at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> 15/10/16 18:50:03 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1
> times; aborting job
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Cancelling stage 0
> 15/10/16 18:50:03 INFO Executor: Executor is trying to kill task 1.0 in
> stage 0.0 (TID 1)
> 15/10/16 18:50:03 INFO TaskSchedulerImpl: Stage 0 was cancelled
> 15/10/16 18:50:03 INFO DAGScheduler: Job 0 failed: foreachRDD at
> TwitterStream.scala:150, took 5.956054 s
> 15/10/16 18:50:03 INFO JobScheduler: Starting job streaming job
> 1445001410000 ms.0 from job set of time 1445001410000 ms
> 15/10/16 18:50:03 ERROR JobScheduler: Error running job streaming job
> 1445001400000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.ClassCastException:
> scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
>     at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>     at scala.Option.foreach(Option.scala:236)
>     at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):
> java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
>     at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000)
>     at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>     at scala.Option.foreach(Option.scala:236)
>     at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/10/16 18:50:03 INFO CacheManager: Partition rdd_20_1 not found,
> computing it
>
> On Fri, Oct 16, 2015 at 7:53 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you show the complete stack trace ?
>>
>> Subclass of Mutation is expected. Put is a subclass.
>>
>> Have you tried replacing BoxedUnit with Put in your code ?
>>
>> Cheers
>>
>> On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I am using below code to stream data from kafka to hbase ,everything
>>> works
>>> fine until i restart the job so that it can restore the state from
>>> checkpoint directory ,but while trying to restore the state it give me
>>> below
>>> error
>>>
>>> ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
>>> scala.runtime.BoxedUnit cannot be cast to
>>> org.apache.hadoop.hbase.client.Mutation
>>>
>>> please find below code
>>>
>>> tweetsRDD.foreachRDD(rdd=>{
>>>       val hconf = HBaseConfiguration.create();
>>>     hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
>>>     hconf.set("zookeeper.session.timeout",
>>> conf.getString("hbase.zookeepertimeout"));
>>>     hconf.set("hbase.client.retries.number", Integer.toString(1));
>>>     hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>>>     hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
>>>
>>>
>>> hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
>>>     hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>>>     hconf.set("hbase.zookeeper.property.clientPort",
>>> conf.getString("hbase.hbase_zk_port"));
>>>      hconf.setClass("mapreduce.outputformat.class",
>>> classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
>>> BoxedUnit]])
>>>
>>>          rdd.map ( record =>(new ImmutableBytesWritable,{
>>>
>>>
>>>             var maprecord = new HashMap[String, String];
>>>               val mapper = new ObjectMapper();
>>>
>>>               //convert JSON string to Map
>>>
>>>               maprecord = mapper.readValue(record.toString(),
>>>                 new TypeReference[HashMap[String, String]]() {});
>>>
>>>
>>>               var ts:Long= maprecord.get("ts").toLong
>>>               var tweetID:Long= maprecord.get("id").toLong
>>>               val key=ts+"_"+tweetID;
>>>               val   put=new Put(Bytes.toBytes(key))
>>>                maprecord.foreach(kv => {
>>> //              println(kv._1+" - "+kv._2)
>>>
>>>
>>> put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>>>
>>>
>>>               }
>>>                )
>>>        put
>>>
>>>         }
>>>          )
>>>          ).saveAsNewAPIHadoopDataset(hconf)
>>>
>>>   })
>>>
>>>
>>>
>>> help me out in solving this as it is urgent for me
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to