Hi All,

I am using below code to stream data from kafka to hbase ,everything works
fine until i restart the job so that it can restore the state from
checkpoint directory ,but while trying to restore the state it give me below
error

ge 0.0 (TID 0, localhost): java.lang.ClassCastException:
scala.runtime.BoxedUnit cannot be cast to
org.apache.hadoop.hbase.client.Mutation

please find below code

tweetsRDD.foreachRDD(rdd=>{
      val hconf = HBaseConfiguration.create();
    hconf.set(TableOutputFormat.OUTPUT_TABLE, hbasetablename)
    hconf.set("zookeeper.session.timeout",
conf.getString("hbase.zookeepertimeout"));
    hconf.set("hbase.client.retries.number", Integer.toString(1));
    hconf.set("zookeeper.recovery.retry", Integer.toString(1));
    hconf.set("hbase.master", conf.getString("hbase.hbase_master"));
   
hconf.set("hbase.zookeeper.quorum",conf.getString("hbase.hbase_zkquorum"));
    hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
    hconf.set("hbase.zookeeper.property.clientPort",
conf.getString("hbase.hbase_zk_port"));
     hconf.setClass("mapreduce.outputformat.class",
classOf[TableOutputFormat[String]], classOf[OutputFormat[String,
BoxedUnit]])

//i have also tried using 
     //     hconf.setClass("mapreduce.outputformat.class",
classOf[TableOutputFormat[String]], //classOf[OutputFormat[String,
Mutation]])
         rdd.map ( record =>(new ImmutableBytesWritable,{
         

            var maprecord = new HashMap[String, String];
              val mapper = new ObjectMapper();

              //convert JSON string to Map
            
              maprecord = mapper.readValue(record.toString(),
                new TypeReference[HashMap[String, String]]() {});
            
              
              var ts:Long= maprecord.get("ts").toLong
              var tweetID:Long= maprecord.get("id").toLong
              val key=ts+"_"+tweetID;
              val   put=new Put(Bytes.toBytes(key))
               maprecord.foreach(kv => {
//              println(kv._1+" - "+kv._2)
         
put.add(Bytes.toBytes(colfamily.value),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
          

              }
               )
       put
          
        }  
         ) 
         ).saveAsNewAPIHadoopDataset(hconf)
         
  })
         


help me out in solving this as it is urgent for me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HBase-Spark-Streaming-giving-error-after-restore-tp25090.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to