Hi, Jim
Your generated rdd should be the type of RDD[ImmutableBytesWritable, KeyValue],
while your current type goes to RDD[ImmutableBytesWritable, Put].
You can go like this and the result should be type of
RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile
val result = num.flatMap ( v=> {
keyValueBuilder(v).map(v => (v,1))
}).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(),
v._1.getRowOffset(), v._1.getRowLength()),v._1))
where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]], for
example, you can go:
val keyValueBuilder = (data: (Int, Int)) =>{
val rowkeyBytes = Bytes.toBytes(data._1)
val colfam = Bytes.toBytes("cf")
val qual = Bytes.toBytes("c1")
val value = Bytes.toBytes("val_xxx")
val kv = new KeyValue(rowkeyBytes,colfam,qual,value)
List(kv)
}
Thanks,
Sun
[email protected]
From: Jim Green
Date: 2015-01-28 04:44
To: Ted Yu
CC: user
Subject: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
I used below code, and it still failed with the same error.
Anyone has experience on bulk loading using scala?
Thanks.
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
val conf = HBaseConfiguration.create()
val tableName = "t1"
val table = new HTable(conf, tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
val put: Put = new Put(Bytes.toBytes(x))
put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable],
classOf[KeyValue], classOf[HFileOutputFormat], conf)
On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <[email protected]> wrote:
Thanks Ted. Could you give me a simple example to load one row data in hbase?
How should I generate the KeyValue?
I tried multiple times, and still can not figure it out.
On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <[email protected]> wrote:
Here is the method signature used by HFileOutputFormat :
public void write(ImmutableBytesWritable row, KeyValue kv)
Meaning, KeyValue is expected, not Put.
On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <[email protected]> wrote:
Hi Team,
I need some help on writing a scala to bulk load some data into hbase.
Env:
hbase 0.94
spark-1.0.2
I am trying below code to just bulk load some data into hbase table “t1”.
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
val conf = HBaseConfiguration.create()
val tableName = "t1"
val table = new HTable(conf, tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
val put: Put = new Put(Bytes.toBytes(x))
put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8", classOf[ImmutableBytesWritable],
classOf[Put], classOf[HFileOutputFormat], conf)
However I am allways getting below error:
java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast
to org.apache.hadoop.hbase.KeyValue
at
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
My questions are:
1. Do we have a sample code to do bulk load into hbase directly?
Can we use saveAsNewAPIHadoopFile?
2. Is there any other way to do this?
For example, firstly write a hfile on hdfs, and then use hbase command to bulk
load?
Any sample code using scala?
Thanks.
--
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
--
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
--
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)