Thank you all. With your help, I managed to create HFiles for a small RDD. But for a large RDD, I gave up (at least for now). The reason is as follows:
First, the flow of my code (it's Java) is as follows: { // Create a RDD which has the records JavaPairRDD<MyKey, byte[]> rdd = // MyKey has a byte[] member for rowkey buildRDDWithSomeBusinessLogic() .flatMapToPair(...) // builds MyKey, byte[] pair. // The number of records increases almost 100 times. MyKey is not distinct .groupByKey() .mapValues(...) // make MyKey distinct. Combines values with compression. // The number of records dramatically decreases. // The average size of values is around 1KB. .sortByKey(..., numRegions) // Sorts since HFileOutputFormat requires that the record are sorted. // And set the number of the partitions same as the number of regions. .persist(StorageLevel.MEMORY_AND_DISK_SER()); // Since I must do multiple task on the RDD. // Get keys for pre-splitting by fetching the first record of each partitions List<MyKey> splitKyes = rdd.mapPartitionsWithIndex(...) // Gets the first record of each partitions. // First partition's first record is excluded, since it's not needed. .collect(); Collections.sort(splitKeys); // Now we have the split keys // Create a HBase table createHBaseTableWithSplitKeys(tableName, splitKeys); // Use saveAsNewAPIHadoopFile API with HFileOutputFormat Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", ...); config.set("zookeeper.znode.parent", ...); Job job = Job.getInstance(conf); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HTable table = new HTable(conf, tableName); HFileOutputFormat.configureIncrementalLoad(job, table); rdd .mapToPair(new PairFunction<Tuple2<MyKey, byte[]>, ImmutableBytesWritable, KeyValue>(...); // map data types .saveAsNewAPIHadoopFile(outputDirName, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); } For a small RDD, HFiles were successfully created. But for a large RDD, the following issues appeared. 1. OutOfMemory exception on mapPartitionsWithIndex() for splitKeys. In my case, the number of regions is fairly small for the RDD, and the size of a region is big. This is intentional since the reasonable size of a HBase region is several GB. But, for Spark, it is too big for a partition that can be handled for an executor. I thought mapPartitionsWithIndex would not load the entire partition, but I was wrong. Maybe it loaded the whole partition while I only wanted to fetch the first record of the iterator. But to acquire the split keys, there is no other way than mapPartitionsWithIndex. So, I'm basically stuck. I could save all the partitions with save...() API and then load each partition separately and call first(). But I does not feel right. Parallelism is lost. 2. Strange Spark behavior It is not fatal as 1, but it's strange. In my code, the flow is as follows: flatMapToPair -> groupByKey -> mapValues -> sortByKey But when I watch the Spark UI, it is executed as follows: flatMapToPair -> sortByKey -> sortByKey(again!) -> mapValues Since in my case the number of records are very large between flatMapToPair and mapValues, it seems that Spark executes sortByKey on the worst timing. I tried to trick the Spark with replacing mapValues with mapToPair, but the execution order did not change. Why? Maybe because of 2, the export process is already takes more time than the process built with regular Put(WAL disabled), even before it fails with OOM. So, it is virtually meaningless to use HFileOutputFormat. Any suggestion or correction would be very helpful. Thanks. -----Original Message----- From: Soumitra Kumar [mailto:kumar.soumi...@gmail.com] Sent: Saturday, September 20, 2014 1:44 PM To: Ted Yu Cc: innowireless TaeYun Kim; user; Aniket Bhatnagar Subject: Re: Bulk-load to HBase I successfully did this once. RDD map to RDD [(ImmutableBytesWritable, KeyValue)] then val conf = HBaseConfiguration.create() val job = new Job (conf, "CEF2HFile") job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]); job.setMapOutputValueClass (classOf[KeyValue]); val table = new HTable(conf, "output") HFileOutputFormat.configureIncrementalLoad (job, table); saveAsNewAPIHadoopFile("hdfs://localhost.localdomain:8020/user/cloudera/spark", classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) Then I do hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/cloudera/spark output to load the HFiles to hbase. ----- Original Message ----- From: "Ted Yu" <yuzhih...@gmail.com> To: "Aniket Bhatnagar" <aniket.bhatna...@gmail.com> Cc: "innowireless TaeYun Kim" <taeyun....@innowireless.co.kr>, "user" <user@spark.apache.org> Sent: Friday, September 19, 2014 2:29:51 PM Subject: Re: Bulk-load to HBase Please see http://hbase.apache.org/book.html#completebulkload LoadIncrementalHFiles has a main() method. On Fri, Sep 19, 2014 at 5:41 AM, Aniket Bhatnagar < aniket.bhatna...@gmail.com > wrote: Agreed that the bulk import would be faster. In my case, I wasn't expecting a lot of data to be uploaded to HBase and also, I didn't want to take the pain of importing generated HFiles into HBase. Is there a way to invoke HBase HFile import batch script programmatically? On 19 September 2014 17:58, innowireless TaeYun Kim < taeyun....@innowireless.co.kr > wrote: In fact, it seems that Put can be used by HFileOutputFormat, so Put object itself may not be the problem. The problem is that TableOutputFormat uses the Put object in the normal way (that goes through normal write path), while HFileOutFormat uses it to directly build the HFile. From: innowireless TaeYun Kim [mailto: taeyun....@innowireless.co.kr ] Sent: Friday, September 19, 2014 9:20 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Thank you for the example code. Currently I use foreachPartition() + Put(), but your example code can be used to clean up my code. BTW, since the data uploaded by Put() goes through normal HBase write path, it can be slow. So, it would be nice if bulk-load could be used, since it bypasses the write path. Thanks. From: Aniket Bhatnagar [ mailto:aniket.bhatna...@gmail.com ] Sent: Friday, September 19, 2014 9:01 PM To: innowireless TaeYun Kim Cc: user Subject: Re: Bulk-load to HBase I have been using saveAsNewAPIHadoopDataset but I use TableOutputFormat instead of HFileOutputFormat. But, hopefully this should help you: val hbaseZookeeperQuorum = s"$zookeeperHost:$zookeeperPort:$zookeeperHbasePath" val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_ADDRESS, hbaseZookeeperQuorum) conf.set(TableOutputFormat.QUORUM_PORT, zookeeperPort.toString) conf.setClass("mapreduce.outputformat.class", classOf[TableOutputFormat[Object]], classOf[OutputFormat[Object, Writable]]) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val rddToSave: RDD[(Array[Byte], Array[Byte], Array[Byte])] = ... // Some RDD that contains row key, column qualifier and data val putRDD = rddToSave.map(tuple => { val (rowKey, column data) = tuple val put: Put = new Put(rowKey) put.add(COLUMN_FAMILY_RAW_DATA_BYTES, column, data) (new ImmutableBytesWritable(rowKey), put) }) putRDD.saveAsNewAPIHadoopDataset(conf) On 19 September 2014 16:52, innowireless TaeYun Kim < taeyun....@innowireless.co.kr > wrote: Hi, Sorry, I just found saveAsNewAPIHadoopDataset. Then, Can I use HFileOutputFormat with saveAsNewAPIHadoopDataset? Is there any example code for that? Thanks. From: innowireless TaeYun Kim [mailto: taeyun....@innowireless.co.kr ] Sent: Friday, September 19, 2014 8:18 PM To: user@spark.apache.org Subject: RE: Bulk-load to HBase Hi, After reading several documents, it seems that saveAsHadoopDataset cannot use HFileOutputFormat. It ’ s because saveAsHadoopDataset method uses JobConf, so it belongs to the old Hadoop API, while HFileOutputFormat is a member of mapreduce package which is for the new Hadoop API. Am I right? If so, is there another method to bulk-load to HBase from RDD? Thanks. From: innowireless TaeYun Kim [ mailto:taeyun....@innowireless.co.kr ] Sent: Friday, September 19, 2014 7:17 PM To: user@spark.apache.org Subject: Bulk-load to HBase Hi, Is there a way to bulk-load to HBase from RDD? HBase offers HFileOutputFormat class for bulk loading by MapReduce job, but I cannot figure out how to use it with saveAsHadoopDataset. Thanks. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org