Hao,

Thank you for the detailed response!  (even if delayed!)

I'm curious to know what version of hbase you added to your pom file.

Thanks,
Philip

On 11/14/2013 10:38 AM, Hao REN wrote:
Hi, Philip.

Basically, we need* PairRDDFunctions.saveAsHadoopDataset* to do the job, as HBase is not a fs, saveAsHadoopFile doesn't work.

*def saveAsHadoopDataset(conf: JobConf): Unit*

this function takes a JobConf parameter which should be configured. Essentially, you need to set output format and the name of the output table.

*// step 1: JobConf setup:*

// Note: mapred package is used, instead of the mapreduce package which contains new hadoop APIs.
*import org.apache.hadoop.hbase.mapred.TableOutputFormat
*
*import org.apache.hadoop.hbase.client._*
// ... some other settings

*val conf = HBaseConfiguration.create()*

// general hbase setting
*conf.set("hbase.rootdir", "hdfs://" + nameNodeURL + ":" + hdfsPort + "/hbase")*
*conf.setBoolean("hbase.cluster.distributed", true)*
*conf.set("hbase.zookeeper.quorum", hostname)*
*conf.setInt("hbase.client.scanner.caching", 10000)*
// ... some other settings

*val jobConfig: JobConf = new JobConf(conf, this.getClass)*

// Note: TableOutputFormat is used as deprecated code, because JobConf is an old hadoop API
*jobConfig.setOutputFormat(classOf[TableOutputFormat])*
*jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)*


*// step 2: give your mapping:*
*
*
// the last thing todo is mapping your local data schema to the hbase one
// Say, our hbase schema is as below:
// *row    cf:col_1    cf:col_2*

// And in spark, you have a RDD of triple, like (1, 2, 3), (4, 5, 6), ...
// So you should map *RDD[(int, int, int)]* to *RDD[(ImmutableBytesWritable, Put)]*, where Put carries the mapping.

// You can define a function used by RDD.map, for example:

*def convert(triple: (Int, Int, Int)) = {*
*      val p = new Put(Bytes.toBytes(triple._1))*
* p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_1"), Bytes.toBytes(triple._2))* * p.add(Bytes.toBytes("cf"), Bytes.toBytes("col_2"), Bytes.toBytes(triple._3))*
*      (new ImmutableBytesWritable, p)*
*}*

// Suppose you have a *RDD[(Int, Int, Int)]* called *localData*, then writing data to hbase can be done by :

*new PairRDDFunctions(localData.map(convert)).saveAsHadoopDataset(jobConfig)*

VoilĂ . That's all you need. Hopefully, this simple example could help.

Hao.





2013/11/13 Philip Ogren <philip.og...@oracle.com <mailto:philip.og...@oracle.com>>

    Hao,

    If you have worked out the code and turn it into an example that
    you can share, then please do!  This task is in my queue of things
    to do so any helpful details that you uncovered would be most
    appreciated.

    Thanks,
    Philip



    On 11/13/2013 5:30 AM, Hao REN wrote:
    Ok, I worked it out.

    The following thread helps a lot.

    
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3C7B4868A9-B83E-4507-BB2A-2721FCE8E738%40gmail.com%3E

    Hao


    2013/11/12 Hao REN <julien19890...@gmail.com
    <mailto:julien19890...@gmail.com>>

        Could someone show me a simple example about how to write
        data into HBase via spark ?

        I have checked HbaseTest example, it's only for reading from
        HBase.

        Thank you.

-- REN Hao

        Data Engineer @ ClaraVista

        Paris, France

        Tel: +33 06 14 54 57 24 <tel:%2B33%2006%2014%2054%2057%2024>




-- REN Hao

    Data Engineer @ ClaraVista

    Paris, France

    Tel: +33 06 14 54 57 24 <tel:%2B33%2006%2014%2054%2057%2024>




--
REN Hao

Data Engineer @ ClaraVista

Paris, France

Tel:  +33 06 14 54 57 24

Reply via email to