Thank you all.

With your help, I managed to create HFiles for a small RDD.
But for a large RDD, I gave up (at least for now).
The reason is as follows:

First, the flow of my code (it's Java) is as follows:

{
  // Create a RDD which has the records

  JavaPairRDD<MyKey, byte[]> rdd =
          // MyKey has a byte[] member for rowkey
    buildRDDWithSomeBusinessLogic()
    .flatMapToPair(...)
          // builds MyKey, byte[] pair.
          // The number of records increases almost 100 times. MyKey is not 
distinct
    .groupByKey()
    .mapValues(...)
          // make MyKey distinct. Combines values with compression.
          // The number of records dramatically decreases.
          // The average size of values is around 1KB.
    .sortByKey(..., numRegions)
          // Sorts since HFileOutputFormat requires that the record are sorted.
          // And set the number of the partitions same as the number of regions.
    .persist(StorageLevel.MEMORY_AND_DISK_SER());
          // Since I must do multiple task on the RDD.

  // Get keys for pre-splitting by fetching the first record of each partitions

  List<MyKey> splitKyes = 
     rdd.mapPartitionsWithIndex(...)
          // Gets the first record of each partitions.
          // First partition's first record is excluded, since it's not needed.
     .collect();
     Collections.sort(splitKeys);
  

  // Now we have the split keys
  // Create a HBase table

  createHBaseTableWithSplitKeys(tableName, splitKeys);
  
  // Use saveAsNewAPIHadoopFile API with HFileOutputFormat

  Configuration config = HBaseConfiguration.create();
  config.set("hbase.zookeeper.quorum", ...);
  config.set("zookeeper.znode.parent", ...);
  Job job = Job.getInstance(conf);
  job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job.setMapOutputValueClass(KeyValue.class);
  HTable table = new HTable(conf, tableName);
  HFileOutputFormat.configureIncrementalLoad(job, table);

  rdd
    .mapToPair(new PairFunction<Tuple2<MyKey, byte[]>, ImmutableBytesWritable, 
KeyValue>(...);
        // map data types
    .saveAsNewAPIHadoopFile(outputDirName,
        ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, 
conf);
}

For a small RDD, HFiles were successfully created.
But for a large RDD, the following issues appeared.

1. OutOfMemory exception on mapPartitionsWithIndex() for splitKeys.

In my case, the number of regions is fairly small for the RDD, and the size of 
a region is big.
This is intentional since the reasonable size of a HBase region is several GB.
But, for Spark, it is too big for a partition that can be handled for an 
executor.
I thought mapPartitionsWithIndex would not load the entire partition, but I was 
wrong.
Maybe it loaded the whole partition while I only wanted to fetch the first 
record of the iterator.
But to acquire the split keys, there is no other way than 
mapPartitionsWithIndex.
So, I'm basically stuck.

I could save all the partitions with save...() API and then load each partition 
separately and call first().
But I does not feel right. Parallelism is lost.

2. Strange Spark behavior

It is not fatal as 1, but it's strange.
In my code, the flow is as follows: flatMapToPair -> groupByKey -> mapValues -> 
sortByKey
But when I watch the Spark UI, it is executed as follows: flatMapToPair -> 
sortByKey -> sortByKey(again!) -> mapValues
Since in my case the number of records are very large between flatMapToPair and 
mapValues, it seems that Spark executes sortByKey on the worst timing.
I tried to trick the Spark with replacing mapValues with mapToPair, but the 
execution order did not change.
Why?

Maybe because of 2, the export process is already takes more time than the 
process built with regular Put(WAL disabled), even before it fails with OOM. 
So, it is virtually meaningless to use HFileOutputFormat.

Any suggestion or correction would be very helpful.

Thanks.


-----Original Message-----
From: Soumitra Kumar [mailto:kumar.soumi...@gmail.com] 
Sent: Saturday, September 20, 2014 1:44 PM
To: Ted Yu
Cc: innowireless TaeYun Kim; user; Aniket Bhatnagar
Subject: Re: Bulk-load to HBase

I successfully did this once.

RDD map to RDD [(ImmutableBytesWritable, KeyValue)] then val conf = 
HBaseConfiguration.create() val job = new Job (conf, "CEF2HFile") 
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]);
job.setMapOutputValueClass (classOf[KeyValue]); val table = new HTable(conf, 
"output") HFileOutputFormat.configureIncrementalLoad (job, table); 
saveAsNewAPIHadoopFile("hdfs://localhost.localdomain:8020/user/cloudera/spark", 
classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf)

Then I do
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles 
/user/cloudera/spark output to load the HFiles to hbase.

----- Original Message -----
From: "Ted Yu" <yuzhih...@gmail.com>
To: "Aniket Bhatnagar" <aniket.bhatna...@gmail.com>
Cc: "innowireless TaeYun Kim" <taeyun....@innowireless.co.kr>, "user" 
<user@spark.apache.org>
Sent: Friday, September 19, 2014 2:29:51 PM
Subject: Re: Bulk-load to HBase


Please see http://hbase.apache.org/book.html#completebulkload
LoadIncrementalHFiles has a main() method. 


On Fri, Sep 19, 2014 at 5:41 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com 
> wrote: 



Agreed that the bulk import would be faster. In my case, I wasn't expecting a 
lot of data to be uploaded to HBase and also, I didn't want to take the pain of 
importing generated HFiles into HBase. Is there a way to invoke HBase HFile 
import batch script programmatically? 




On 19 September 2014 17:58, innowireless TaeYun Kim < 
taeyun....@innowireless.co.kr > wrote: 






In fact, it seems that Put can be used by HFileOutputFormat, so Put object 
itself may not be the problem. 

The problem is that TableOutputFormat uses the Put object in the normal way 
(that goes through normal write path), while HFileOutFormat uses it to directly 
build the HFile. 





From: innowireless TaeYun Kim [mailto: taeyun....@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 9:20 PM 


To: user@spark.apache.org 
Subject: RE: Bulk-load to HBase 







Thank you for the example code. 



Currently I use foreachPartition() + Put(), but your example code can be used 
to clean up my code. 



BTW, since the data uploaded by Put() goes through normal HBase write path, it 
can be slow. 

So, it would be nice if bulk-load could be used, since it bypasses the write 
path. 



Thanks. 



From: Aniket Bhatnagar [ mailto:aniket.bhatna...@gmail.com ] 
Sent: Friday, September 19, 2014 9:01 PM 
To: innowireless TaeYun Kim 
Cc: user 
Subject: Re: Bulk-load to HBase 




I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead 
of HFileOutputFormat. But, hopefully this should help you: 






val hbaseZookeeperQuorum = s"$zookeeperHost:$zookeeperPort:$zookeeperHbasePath" 


val conf = HBaseConfiguration.create() 


conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum) 


conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) 


conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) 



conf.setClass("mapreduce.outputformat.class", 
classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) 


conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 





val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD 
that contains row key, column qualifier and data 






val putRDD = rddToSave.map(tuple => { 


val (rowKey, column data) = tuple 


val put: Put = new Put(rowKey) 


put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) 





(new ImmutableBytesWritable(rowKey), put) 


}) 





putRDD.saveAsNewAPIHadoopDataset(conf) 








On 19 September 2014 16:52, innowireless TaeYun Kim < 
taeyun....@innowireless.co.kr > wrote: 



Hi, 



Sorry, I just found saveAsNewAPIHadoopDataset. 

Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any 
example code for that? 



Thanks. 





From: innowireless TaeYun Kim [mailto: taeyun....@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 8:18 PM 
To: user@spark.apache.org 
Subject: RE: Bulk-load to HBase 





Hi, 



After reading several documents, it seems that saveAsHadoopDataset cannot use 
HFileOutputFormat. 

It ’ s because saveAsHadoopDataset method uses JobConf, so it belongs to the 
old Hadoop API, while HFileOutputFormat is a member of mapreduce package which 
is for the new Hadoop API. 



Am I right? 

If so, is there another method to bulk-load to HBase from RDD? 



Thanks. 





From: innowireless TaeYun Kim [ mailto:taeyun....@innowireless.co.kr ] 
Sent: Friday, September 19, 2014 7:17 PM 
To: user@spark.apache.org 
Subject: Bulk-load to HBase 



Hi, 



Is there a way to bulk-load to HBase from RDD? 

HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I 
cannot figure out how to use it with saveAsHadoopDataset. 



Thanks. 




---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to