Does this help?

https://issues.apache.org/jira/browse/SPARK-5206



On 10/27/15, 1:53 PM, "Amit Singh Hora" <hora.a...@gmail.com> wrote:

>Hi all ,
>
>I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find
>below code
>object test {
>  
>    def main(args: Array[String]): Unit = {
>    
>    
>    
>   val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>    val checkpointDirectory=conf.getString("spark.checkpointDir")
>    val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{
>      functionToCreateContext(checkpointDirectory)
>    })
> 
>    
>    ssc.start()
>    ssc.awaitTermination()
>    
>         }
>    
>    def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={
>      println("always gets created")
>           val hconf = HBaseConfiguration.create();
>    val timeout= conf.getString("hbase.zookeepertimeout")
>    val master=conf.getString("hbase.hbase_master")
>    val zk=conf.getString("hbase.hbase_zkquorum")
>    val zkport=conf.getString("hbase.hbase_zk_port")
>    
>      hconf.set("zookeeper.session.timeout",timeout);
>    hconf.set("hbase.client.retries.number", Integer.toString(1));
>    hconf.set("zookeeper.recovery.retry", Integer.toString(1));
>    hconf.set("hbase.master", master);
>    hconf.set("hbase.zookeeper.quorum",zk);
>    hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
>    hconf.set("hbase.zookeeper.property.clientPort",zkport );
>
>   
>    val hbaseContext = new HBaseContext(sc, hconf);
>    return hbaseContext
>    }
>  def functionToCreateContext(checkpointDirectory: String): StreamingContext
>= {
>    println("creating for frst time")
>    val conf = ConfigFactory.load("connection.conf").getConfig("connection")
>    val brokerlist = conf.getString("kafka.broker")
>    val topic = conf.getString("kafka.topic")
>    
>    val Array(brokers, topics) = Array(brokerlist, topic)
>
>
>    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample
>" )
>    sparkConf.set("spark.cleaner.ttl", "2");
>    sparkConf.setMaster("local[2]")
>    
>
>     val topicsSet = topic.split(",").toSet
>        val batchduration = conf.getString("spark.batchduration").toInt
>    val ssc: StreamingContext = new StreamingContext(sparkConf,
>Seconds(batchduration))
>      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>brokerlist, "auto.offset.reset" -> "smallest")
>    val messages = KafkaUtils.createDirectStream[String, String,
>StringDecoder, StringDecoder](
>      ssc, kafkaParams, topicsSet)
>    val lines=messages.map(_._2)
>   
>
>
>    getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines,
>      "ecs_test",
>      (putRecord) => {
>        if (putRecord.length() > 0) {
>          var maprecord = new HashMap[String, String];
>                  val mapper = new ObjectMapper();
>
>                  //convert JSON string to Map
>                  maprecord = mapper.readValue(putRecord,
>                    new TypeReference[HashMap[String, String]]() {});
>                  
>                  var ts: Long = maprecord.get("ts").toLong
>                  
>                   var tweetID:Long= maprecord.get("id").toLong
>              val key=ts+"_"+tweetID;
>                  
>                  val put = new Put(Bytes.toBytes(key))
>                  maprecord.foreach(kv => {
>                 
>         
>put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2))
>          
>
>              })
>        
>        
>          put
>        } else {
>          null
>        }
>      },
>      false);
>
>    ssc
>      
>  }
>}
>
>i am not able to retrieve from checkpoint after restart ,always get 
>Unable to getConfig from broadcast
>
>after debugging more i can see that the method for creating the HbaseContext
>actually broadcasts the configuration ,context object passed
>
>as a solution i just want to recreate the hbase context in every condition
>weather the checkpoint exists or not
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to