Hi, Philip.
Basically, we need* PairRDDFunctions.saveAsHadoopDataset* to do the
job, as HBase is not a fs, saveAsHadoopFile doesn't work.
*def saveAsHadoopDataset(conf: JobConf): Unit*
this function takes a JobConf parameter which should be configured.
Essentially, you need to set output format and the name of the output
table.
*// step 1: JobConf setup:*
// Note: mapred package is used, instead of the mapreduce package
which contains new hadoop APIs.
*import org.apache.hadoop.hbase.mapred.TableOutputFormat
*
*import org.apache.hadoop.hbase.client._*
// ... some other settings
*val conf = HBaseConfiguration.create()*
// general hbase setting
*conf.set("hbase.rootdir", "hdfs://" + nameNodeURL + ":" + hdfsPort +
"/hbase")*
*conf.setBoolean("hbase.cluster.distributed", true)*
*conf.set("hbase.zookeeper.quorum", hostname)*
*conf.setInt("hbase.client.scanner.caching", 10000)*
// ... some other settings
*val jobConfig: JobConf = new JobConf(conf, this.getClass)*
// Note: TableOutputFormat is used as deprecated code, because
JobConf is an old hadoop API
*jobConfig.setOutputFormat(classOf[TableOutputFormat])*
*jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)*
*// step 2: give your mapping:*
*
*
// the last thing todo is mapping your local data schema to the hbase one
// Say, our hbase schema is as below:
// *row cf:col_1 cf:col_2*
// And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ...
// So you should map *RDD[(int, int, int)]* to
*RDD[(ImmutableBytesWritable, Put)]*, where Put carries the mapping.
// You can define a function used by RDD.map, for example:
*def convert(triple: (Int, Int, Int)) = {*
* val p = new Put(Bytes.toBytes(triple._1))*
* p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_1"),
Bytes.toBytes(triple._2))*
* p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_2"),
Bytes.toBytes(triple._3))*
* (new ImmutableBytesWritable, p)*
*}*
// Suppose you have a *RDD[(Int, Int, Int)]* called *localData*, then
writing data to hbase can be done by :
*new
PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)*
VoilĂ . That's all you need. Hopefully, this simple example could help.
Hao.
2013/11/13 Philip Ogren <philip.og...@oracle.com
<mailto:philip.og...@oracle.com>>
Hao,
If you have worked out the code and turn it into an example that
you can share, then please do! This task is in my queue of things
to do so any helpful details that you uncovered would be most
appreciated.
Thanks,
Philip
On 11/13/2013 5:30 AM, Hao REN wrote:
Ok, I worked it out.
The following thread helps a lot.
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E
Hao
2013/11/12 Hao REN <julien19890...@gmail.com
<mailto:julien19890...@gmail.com>>
Could someone show me a simple example about how to write
data into HBase via spark ?
I have checked HbaseTest example, it's only for reading from
HBase.
Thank you.
--
REN Hao
Data Engineer @ ClaraVista
Paris, France
Tel: +33 06 14 54 57 24 <tel:%2B33%2006%2014%2054%2057%2024>
--
REN Hao
Data Engineer @ ClaraVista
Paris, France
Tel: +33 06 14 54 57 24 <tel:%2B33%2006%2014%2054%2057%2024>
--
REN Hao
Data Engineer @ ClaraVista
Paris, France
Tel: +33 06 14 54 57 24