Thanks Sun.
My understanding is , savaAsNewHadoopFile is to save as Hfile on hdfs.

Is it doable to use saveAsNewAPIHadoopDataset to directly loading to hbase?
If so, is there any sample code for that?

Thanks!

On Tue, Jan 27, 2015 at 6:07 PM, [email protected] <[email protected]>
wrote:

> Hi, Jim
> Your generated rdd should be the type of RDD[ImmutableBytesWritable,
> KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put].
> You can go like this and the result should be type of
> RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile
>      val result = num.flatMap ( v=> {
>           keyValueBuilder(v).map(v => (v,1))
>        }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(),
> v._1.getRowOffset(), v._1.getRowLength()),v._1))
>
> where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]],
> for example, you can go:
>            val keyValueBuilder = (data: (Int, Int))  =>{
>                val rowkeyBytes = Bytes.toBytes(data._1)
>                val colfam = Bytes.toBytes("cf")
>                val qual = Bytes.toBytes("c1")
>                val value = Bytes.toBytes("val_xxx")
>
>                val kv = new KeyValue(rowkeyBytes,colfam,qual,value)
>                List(kv)
>           }
>
>
> Thanks,
> Sun
> ------------------------------
> [email protected]
>
>
> *From:* Jim Green <[email protected]>
> *Date:* 2015-01-28 04:44
> *To:* Ted Yu <[email protected]>
> *CC:* user <[email protected]>
> *Subject:* Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
> I used below code, and it still failed with the same error.
> Anyone has experience on bulk loading using scala?
> Thanks.
>
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.mapred.TableOutputFormat
> import org.apache.hadoop.mapred.JobConf
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
> import org.apache.hadoop.mapreduce.Job
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
> import org.apache.hadoop.hbase.KeyValue
> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>
> val conf = HBaseConfiguration.create()
> val tableName = "t1"
> val table = new HTable(conf, tableName)
>
> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
> val job = Job.getInstance(conf)
> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
> job.setMapOutputValueClass (classOf[KeyValue])
> HFileOutputFormat.configureIncrementalLoad (job, table)
>
> val num = sc.parallelize(1 to 10)
> val rdd = num.map(x=>{
>     val put: Put = new Put(Bytes.toBytes(x))
>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
> })
> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable],
> classOf[KeyValue], classOf[HFileOutputFormat], conf)
>
>
>
> On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <[email protected]> wrote:
>
>> Thanks Ted. Could you give me a simple example to load one row data in
>> hbase? How should I generate the KeyValue?
>> I tried multiple times, and still can not figure it out.
>>
>> On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <[email protected]> wrote:
>>
>>> Here is the method signature used by HFileOutputFormat :
>>>       public void write(ImmutableBytesWritable row, KeyValue kv)
>>>
>>> Meaning, KeyValue is expected, not Put.
>>>
>>> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <[email protected]>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I need some help on writing a scala to bulk load some data into hbase.
>>>> *Env:*
>>>> hbase 0.94
>>>> spark-1.0.2
>>>>
>>>> I am trying below code to just bulk load some data into hbase table
>>>> “t1”.
>>>>
>>>> import org.apache.spark._
>>>> import org.apache.spark.rdd.NewHadoopRDD
>>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>> import org.apache.hadoop.fs.Path;
>>>> import org.apache.hadoop.hbase.HColumnDescriptor
>>>> import org.apache.hadoop.hbase.util.Bytes
>>>> import org.apache.hadoop.hbase.client.Put;
>>>> import org.apache.hadoop.hbase.client.HTable;
>>>> import org.apache.hadoop.hbase.mapred.TableOutputFormat
>>>> import org.apache.hadoop.mapred.JobConf
>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>>> import org.apache.hadoop.mapreduce.Job
>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>>>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
>>>> import org.apache.hadoop.hbase.KeyValue
>>>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>>>>
>>>> val conf = HBaseConfiguration.create()
>>>> val tableName = "t1"
>>>> val table = new HTable(conf, tableName)
>>>>
>>>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
>>>> val job = Job.getInstance(conf)
>>>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
>>>> job.setMapOutputValueClass (classOf[KeyValue])
>>>> HFileOutputFormat.configureIncrementalLoad (job, table)
>>>>
>>>> val num = sc.parallelize(1 to 10)
>>>> val rdd = num.map(x=>{
>>>>     val put: Put = new Put(Bytes.toBytes(x))
>>>>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>>>>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
>>>> })
>>>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8",
>>>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
>>>> conf)
>>>>
>>>>
>>>> However I am allways getting below error:
>>>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put
>>>> cannot be cast to org.apache.hadoop.hbase.KeyValue
>>>> at
>>>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
>>>> at
>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> My questions are:
>>>> 1. Do we have a sample code to do bulk load into hbase directly?
>>>> Can we use saveAsNewAPIHadoopFile?
>>>>
>>>> 2. Is there any other way to do this?
>>>> For example, firstly write a hfile on hdfs, and then use hbase command
>>>> to bulk load?
>>>> Any sample code using scala?
>>>>
>>>> Thanks.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> www.openkb.info
>>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> www.openkb.info
>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>
>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>
>


-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)

Reply via email to