Can you try changing classOf[OutputFormat[String, BoxedUnit]] to classOf[OutputFormat[String, Put]] while configuring hconf?
On Sat, Oct 17, 2015, 11:44 AM Amit Hora <hora.a...@gmail.com> wrote: > Hi, > > Regresta for delayed resoonse > please find below full stack trace > > ava.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to > org.apache.hadoop.hbase.client.Mutation > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/10/16 18:50:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID > 1, localhost, ANY, 1185 bytes) > 15/10/16 18:50:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > 15/10/16 18:50:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, > localhost): java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be > cast to org.apache.hadoop.hbase.client.Mutation > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 15/10/16 18:50:03 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 > times; aborting job > 15/10/16 18:50:03 INFO TaskSchedulerImpl: Cancelling stage 0 > 15/10/16 18:50:03 INFO Executor: Executor is trying to kill task 1.0 in > stage 0.0 (TID 1) > 15/10/16 18:50:03 INFO TaskSchedulerImpl: Stage 0 was cancelled > 15/10/16 18:50:03 INFO DAGScheduler: Job 0 failed: foreachRDD at > TwitterStream.scala:150, took 5.956054 s > 15/10/16 18:50:03 INFO JobScheduler: Starting job streaming job > 1445001410000 ms.0 from job set of time 1445001410000 ms > 15/10/16 18:50:03 ERROR JobScheduler: Error running job streaming job > 1445001400000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 0.0 (TID 0, localhost): java.lang.ClassCastException: > scala.runtime.BoxedUnit cannot be cast to > org.apache.hadoop.hbase.client.Mutation > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): > java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to > org.apache.hadoop.hbase.client.Mutation > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:1000) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 15/10/16 18:50:03 INFO CacheManager: Partition rdd_20_1 not found, > computing it > > On Fri, Oct 16, 2015 at 7:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you show the complete stack trace ? >> >> Subclass of Mutation is expected. Put is a subclass. >> >> Have you tried replacing BoxedUnit with Put in your code ? >> >> Cheers >> >> On Fri, Oct 16, 2015 at 6:02 AM, Amit Singh Hora <hora.a...@gmail.com> >> wrote: >> >>> Hi All, >>> >>> I am using below code to stream data from kafka to hbase ,everything >>> works >>> fine until i restart the job so that it can restore the state from >>> checkpoint directory ,but while trying to restore the state it give me >>> below >>> error >>> >>> ge 0.0 (TID 0, localhost): java.lang.ClassCastException: >>> scala.runtime.BoxedUnit cannot be cast to >>> org.apache.hadoop.hbase.client.Mutation >>> >>> please find below code >>> >>> tweetsRDD.foreachRDD(rdd=>{ >>> val hconf = HBaseConfiguration.create(); >>> hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename) >>> hconf.set("zookeeper.session.timeout", >>> conf.getString("hbase.zookeepertimeout")); >>> hconf.set("hbase.client.retries.number", Integer.toString(1)); >>> hconf.set("zookeeper.recovery.retry", Integer.toString(1)); >>> hconf.set("hbase.master", conf.getString("hbase.hbase_master")); >>> >>> >>> hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum")); >>> hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); >>> hconf.set("hbase.zookeeper.property.clientPort", >>> conf.getString("hbase.hbase_zk_port")); >>> hconf.setClass("mapreduce.outputformat.class", >>> classOf[TableOutputFormat[String]], classOf[OutputFormat[String, >>> BoxedUnit]]) >>> >>> rdd.map ( record =>(new ImmutableBytesWritable,{ >>> >>> >>> var maprecord = new HashMap[String, String]; >>> val mapper = new ObjectMapper(); >>> >>> //convert JSON string to Map >>> >>> maprecord = mapper.readValue(record.toString(), >>> new TypeReference[HashMap[String, String]]() {}); >>> >>> >>> var ts:Long= maprecord.get("ts").toLong >>> var tweetID:Long= maprecord.get("id").toLong >>> val key=ts+"_"+tweetID; >>> val put=new Put(Bytes.toBytes(key)) >>> maprecord.foreach(kv => { >>> // println(kv._1+" - "+kv._2) >>> >>> >>> put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) >>> >>> >>> } >>> ) >>> put >>> >>> } >>> ) >>> ).saveAsNewAPIHadoopDataset(hconf) >>> >>> }) >>> >>> >>> >>> help me out in solving this as it is urgent for me >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25089.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >