Does this help? https://issues.apache.org/jira/browse/SPARK-5206
On 10/27/15, 1:53 PM, "Amit Singh Hora" <hora.a...@gmail.com> wrote: >Hi all , > >I am using Cloudera's SparkObHbase to bulk insert in hbase ,Please find >below code >object test { > > def main(args: Array[String]): Unit = { > > > > val conf = ConfigFactory.load("connection.conf").getConfig("connection") > val checkpointDirectory=conf.getString("spark.checkpointDir") > val ssc = StreamingContext.getOrCreate(checkpointDirectory, ()=>{ > functionToCreateContext(checkpointDirectory) > }) > > > ssc.start() > ssc.awaitTermination() > > } > > def getHbaseContext(sc: SparkContext,conf: Config): HBaseContext={ > println("always gets created") > val hconf = HBaseConfiguration.create(); > val timeout= conf.getString("hbase.zookeepertimeout") > val master=conf.getString("hbase.hbase_master") > val zk=conf.getString("hbase.hbase_zkquorum") > val zkport=conf.getString("hbase.hbase_zk_port") > > hconf.set("zookeeper.session.timeout",timeout); > hconf.set("hbase.client.retries.number", Integer.toString(1)); > hconf.set("zookeeper.recovery.retry", Integer.toString(1)); > hconf.set("hbase.master", master); > hconf.set("hbase.zookeeper.quorum",zk); > hconf.set("zookeeper.znode.parent", "/hbase-unsecure"); > hconf.set("hbase.zookeeper.property.clientPort",zkport ); > > > val hbaseContext = new HBaseContext(sc, hconf); > return hbaseContext > } > def functionToCreateContext(checkpointDirectory: String): StreamingContext >= { > println("creating for frst time") > val conf = ConfigFactory.load("connection.conf").getConfig("connection") > val brokerlist = conf.getString("kafka.broker") > val topic = conf.getString("kafka.topic") > > val Array(brokers, topics) = Array(brokerlist, topic) > > > val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample >" ) > sparkConf.set("spark.cleaner.ttl", "2"); > sparkConf.setMaster("local[2]") > > > val topicsSet = topic.split(",").toSet > val batchduration = conf.getString("spark.batchduration").toInt > val ssc: StreamingContext = new StreamingContext(sparkConf, >Seconds(batchduration)) > ssc.checkpoint(checkpointDirectory) // set checkpoint directory > val kafkaParams = Map[String, String]("metadata.broker.list" -> >brokerlist, "auto.offset.reset" -> "smallest") > val messages = KafkaUtils.createDirectStream[String, String, >StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet) > val lines=messages.map(_._2) > > > > getHbaseContext(ssc.sparkContext,conf).streamBulkPut[String](lines, > "ecs_test", > (putRecord) => { > if (putRecord.length() > 0) { > var maprecord = new HashMap[String, String]; > val mapper = new ObjectMapper(); > > //convert JSON string to Map > maprecord = mapper.readValue(putRecord, > new TypeReference[HashMap[String, String]]() {}); > > var ts: Long = maprecord.get("ts").toLong > > var tweetID:Long= maprecord.get("id").toLong > val key=ts+"_"+tweetID; > > val put = new Put(Bytes.toBytes(key)) > maprecord.foreach(kv => { > > >put.add(Bytes.toBytes("test"),Bytes.toBytes(kv._1),Bytes.toBytes(kv._2)) > > > }) > > > put > } else { > null > } > }, > false); > > ssc > > } >} > >i am not able to retrieve from checkpoint after restart ,always get >Unable to getConfig from broadcast > >after debugging more i can see that the method for creating the HbaseContext >actually broadcasts the configuration ,context object passed > >as a solution i just want to recreate the hbase context in every condition >weather the checkpoint exists or not > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/SPARKONHBase-checkpointing-issue-tp25211.html >Sent from the Apache Spark User List mailing list archive at Nabble.com. > >--------------------------------------------------------------------- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org