Hi Phillip/Hao, I was wondering if there is a simple working example out there that I can just run and see it work. Then, I can customize it for our needs. Unfortunately, this explanation still confuses me a little. Here is a little about the environment we are working with. We have Cloudera's CDH 4.4.0 installed, and it comes with HBase 0.94.6. We get data streamed in using Flume-NG 1.4.0. All of this is managed using Cloudera Manager 4.7.2 to setup and configure these services. If you need any more information or are able to help, I would be glad to accommodate. Thanks,Ben
Date: Fri, 6 Dec 2013 18:07:08 -0700 From: philip.og...@oracle.com To: user@spark.incubator.apache.org Subject: Re: write data into HBase via spark Hao, Thank you for the detailed response! (even if delayed!) I'm curious to know what version of hbase you added to your pom file. Thanks, Philip On 11/14/2013 10:38 AM, Hao REN wrote: Hi, Philip. Basically, we need PairRDDFunctions.saveAsHadoopDataset to do the job, as HBase is not a fs, saveAsHadoopFile doesn't work. def saveAsHadoopDataset(conf: JobConf): Unit this function takes a JobConf parameter which should be configured. Essentially, you need to set output format and the name of the output table. // step 1: JobConf setup: // Note: mapred package is used, instead of the mapreduce package which contains new hadoop APIs. import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.client._ // ... some other settings val conf = HBaseConfiguration.create() // general hbase setting conf.set("hbase.rootdir", "hdfs://" + nameNodeURL + ":" + hdfsPort + "/hbase") conf.setBoolean("hbase.cluster.distributed", true) conf.set("hbase.zookeeper.quorum", hostname) conf.setInt("hbase.client.scanner.caching", 10000) // ... some other settings val jobConfig: JobConf = new JobConf(conf, this.getClass) // Note: TableOutputFormat is used as deprecated code, because JobConf is an old hadoop API jobConfig.setOutputFormat(classOf[TableOutputFormat]) jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable) // step 2: give your mapping: // the last thing todo is mapping your local data schema to the hbase one // Say, our hbase schema is as below: // row cf:col_1 cf:col_2 // And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ... // So you should map RDD[(int, int, int)] to RDD[(ImmutableBytesWritable, Put)], where Put carries the mapping. // You can define a function used by RDD.map, for example: def convert(triple: (Int, Int, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_1"), Bytes.toBytes(triple._2)) p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_2"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } // Suppose you have a RDD[(Int, Int, Int)] called localData, then writing data to hbase can be done by : new PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig) VoilĂ . That's all you need. Hopefully, this simple example could help. Hao. 2013/11/13 Philip Ogren <philip.og...@oracle.com> Hao, If you have worked out the code and turn it into an example that you can share, then please do! This task is in my queue of things to do so any helpful details that you uncovered would be most appreciated. Thanks, Philip On 11/13/2013 5:30 AM, Hao REN wrote: Ok, I worked it out. The following thread helps a lot. http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E Hao 2013/11/12 Hao REN <julien19890...@gmail.com> Could someone show me a simple example about how to write data into HBase via spark ? I have checked HbaseTest example, it's only for reading from HBase. Thank you. -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24 -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24 -- REN Hao Data Engineer @ ClaraVista Paris, France Tel: +33 06 14 54 57 24