Hello Bryan & Oliver, I am using suggestions from both of you to do the bulk upload. The problem I am running into is that the job that uses 'HFileOutputFormat. configureIncrementalLoad' is taking very long to complete. One thing I noticed is that it's using only 1 Reducer.
When I looked at the source code for HFileOutputFormat, I noticed that the no. of Reducers is determined by this: List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); When I look at the log I see this: 12/05/16 03:11:02 INFO mapreduce.HFileOutputFormat: Configuring 301 reduce partitions to match current region count which implies that the regions were created successfully. But shouldn't this set the number of Reducers to 301? What am I missing? Thanks for your help. On Thu, May 10, 2012 at 9:04 AM, Bryan Beaudreault <bbeaudrea...@hubspot.com > wrote: > I don't think there is. You need to have a table seeded with the right > regions in order to run the bulk loader jobs. > > My machines are sufficiently fast that it did not take that long to sort. > One thing I did do to speed this up was add a mapper to the job that > generates the splits, which would calculate the size of each KeyValue. So > instead of passing around the KeyValue's I would pass around just the size > of the KeyValues. You could do a similar thing with the Puts. Here are my > keys/values for the job in full: > > Mapper: > > KeyIn: ImmutableBytesWritable > ValueIn: KeyValue > > KeyOut: ImmutableBytesWritable > ValueOut: IntWritable > > Reducer: > > KeyIn: ImmutableBytesWritable > ValueIn: IntWritable > > At this point I would just add up the ints from the IntWritable. This cuts > down drastically on the amount of data passed around in the sort. > > Hope this helps. If it is still too slow you might have to experiment with > using many reducers and making sure you don't have holes or regions that > are too big due to the way the keys are partitioned. I was lucky enough to > not have to go that far. > > > On Thu, May 10, 2012 at 11:55 AM, Something Something < > mailinglist...@gmail.com> wrote: > > > I am beginning to get a sinking feeling about this :( But I won't give > up! > > > > Problem is that when I use one Reducer the job runs for a long time. I > > killed it after about an hour. Keep in mind, we do have a decent cluster > > size. The Map stage completes in a minute & when I set no. of reducers > to > > 0 (which is not what we want) the job completes in 12 minutes. In other > > words, sorting is taking very very long! What could be the problem? > > > > Is there no other way to do the bulk upload without first *learning* the > > data? > > > > On Thu, May 10, 2012 at 7:15 AM, Bryan Beaudreault < > > bbeaudrea...@hubspot.com > > > wrote: > > > > > Since our Key was ImmutableByteWritable (representing a rowKey) and the > > > Value was KeyValue, there could be many KeyValue's per row key (thus > > values > > > per hadoop key in the reducer). So yes, what we did is very much the > > same > > > as what you described. Hadoop will sort the ImutableByteWritable keys > > > before sending them to the reducer. This is the primary sort. We then > > > loop the values for each key, adding up the size of each KeyValue until > > we > > > reach the region size. Each time that happens we record the rowKey > from > > > the hadoop key and use that as the start key for a new region. > > > > > > Secondary sort is not necessary unless the order of the values matter > for > > > you. In this case (with the row key as the reducer key), I don't think > > > that matters. > > > > > > On Thu, May 10, 2012 at 3:22 AM, Something Something < > > > mailinglist...@gmail.com> wrote: > > > > > > > Thank you Tim & Bryan for the responses. Sorry for the delayed > > response. > > > > Got busy with other things. > > > > > > > > Bryan - I decided to focus on the region split problem first. The > > > > challenge here is to find the correct start key for each region, > right? > > > > Here are the steps I could think of: > > > > > > > > 1) Sort the keys. > > > > 2) Count how many keys & divide by # of regions we want to create. > > > (e.g. > > > > 300). This gives us # of keys in a region (region size). > > > > 3) Loop thru the sorted keys & every time region size is reached, > > write > > > > down region # & starting key. This info can later be used to create > > the > > > > table. > > > > > > > > Honestly, I am not sure what you mean by "hadoop does this > > > automatically". > > > > If you used a single reducer, did you use secondary sort > > > > (setOutputValueGroupingComparator) to sort the keys? Did you loop > thru > > > the > > > > *values* to find regions? Would appreciate it if you would describe > > this > > > > MR job. Thanks. > > > > > > > > > > > > On Wed, May 9, 2012 at 8:25 AM, Bryan Beaudreault > > > > <bbeaudrea...@hubspot.com>wrote: > > > > > > > > > I also recently had this problem, trying to index 6+ billion > records > > > into > > > > > HBase. The job would take about 4 hours before it brought down the > > > > entire > > > > > cluster, at only around 60% complete. > > > > > > > > > > After trying a bunch of things, we went to bulk loading. This is > > > > actually > > > > > pretty easy, though the hardest part is that you need to have a > table > > > > ready > > > > > with the region splits you are going to use. Region splits aside, > > > there > > > > > are 2 steps: > > > > > > > > > > 1) Change your job to instead of executing yours Puts, just output > > them > > > > > using context.write. Put is writable. (We used > > ImmutableBytesWritable > > > as > > > > > the Key, representing the rowKey) > > > > > 2) Add another job that reads that input and configure it > > > > > using HFileOutputFormat.configureIncrementalLoad(Job job, HTable > > > table); > > > > > This will add the right reducer. > > > > > > > > > > Once those two have run, you can finalize the process using the > > > > > completebulkload tool documented at > > > > > http://hbase.apache.org/bulk-loads.html > > > > > > > > > > For the region splits problem, we created another job which sorted > > all > > > of > > > > > the puts by the key (hadoop does this automatically) and had a > single > > > > > reducer. It stepped through all of the Puts calculating up the > total > > > > size > > > > > until it reached some threshold. When it did it recorded the > > bytearray > > > > and > > > > > used that for the start of the next region. We used the result of > > this > > > > job > > > > > to create a new table. There is probably a better way to do this > but > > > it > > > > > takes like 20 minutes to write. > > > > > > > > > > This whole process took less than an hour, with the bulk load part > > only > > > > > taking 15 minutes. Much better! > > > > > > > > > > On Wed, May 9, 2012 at 11:08 AM, Something Something < > > > > > mailinglist...@gmail.com> wrote: > > > > > > > > > > > Hey Oliver, > > > > > > > > > > > > Thanks a "billion" for the response -:) I will take any code you > > can > > > > > > provide even if it's a hack! I will even send you an Amazon gift > > > card > > > > - > > > > > > not that you care or need it -:) > > > > > > > > > > > > Can you share some performance statistics? Thanks again. > > > > > > > > > > > > > > > > > > On Wed, May 9, 2012 at 8:02 AM, Oliver Meyn (GBIF) < > om...@gbif.org > > > > > > > > wrote: > > > > > > > > > > > > > Heya Something, > > > > > > > > > > > > > > I had a similar task recently and by far the best way to go > about > > > > this > > > > > is > > > > > > > with bulk loading after pre-splitting your target table. As > you > > > know > > > > > > > ImportTsv doesn't understand Avro files so I hacked together my > > own > > > > > > > ImportAvro class to create the Hfiles that I eventually moved > > into > > > > > HBase > > > > > > > with completebulkload. I haven't committed my class anywhere > > > because > > > > > > it's > > > > > > > a pretty ugly hack, but I'm happy to share it with you as a > > > starting > > > > > > point. > > > > > > > Doing billions of puts will just drive you crazy. > > > > > > > > > > > > > > Cheers, > > > > > > > Oliver > > > > > > > > > > > > > > On 2012-05-09, at 4:51 PM, Something Something wrote: > > > > > > > > > > > > > > > I ran the following MR job that reads AVRO files & puts them > on > > > > > HBase. > > > > > > > The > > > > > > > > files have tons of data (billions). We have a fairly decent > > size > > > > > > > cluster. > > > > > > > > When I ran this MR job, it brought down HBase. When I > > commented > > > > out > > > > > > the > > > > > > > > Puts on HBase, the job completed in 45 seconds (yes that's > > > > seconds). > > > > > > > > > > > > > > > > Obviously, my HBase configuration is not ideal. I am using > all > > > the > > > > > > > default > > > > > > > > HBase configurations that come out of Cloudera's > distribution: > > > > > > > 0.90.4+49. > > > > > > > > > > > > > > > > I am planning to read up on the following two: > > > > > > > > > > > > > > > > http://hbase.apache.org/book/important_configurations.html > > > > > > > > http://www.cloudera.com/blog/2011/04/hbase-dos-and-donts/ > > > > > > > > > > > > > > > > But can someone quickly take a look and recommend a list of > > > > > priorities, > > > > > > > > such as "try this first..."? That would be greatly > > appreciated. > > > > As > > > > > > > > always, thanks for the time. > > > > > > > > > > > > > > > > > > > > > > > > Here's the Mapper. (There's no reducer): > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > public class AvroProfileMapper extends > > > > AvroMapper<GenericData.Record, > > > > > > > > NullWritable> { > > > > > > > > private static final Logger logger = > > > > > > > > LoggerFactory.getLogger(AvroProfileMapper.class); > > > > > > > > > > > > > > > > final private String SEPARATOR = "*"; > > > > > > > > > > > > > > > > private HTable table; > > > > > > > > > > > > > > > > private String datasetDate; > > > > > > > > private String tableName; > > > > > > > > > > > > > > > > @Override > > > > > > > > public void configure(JobConf jobConf) { > > > > > > > > super.configure(jobConf); > > > > > > > > datasetDate = jobConf.get("datasetDate"); > > > > > > > > tableName = jobConf.get("tableName"); > > > > > > > > > > > > > > > > // Open table for writing > > > > > > > > try { > > > > > > > > table = new HTable(jobConf, tableName); > > > > > > > > table.setAutoFlush(false); > > > > > > > > table.setWriteBufferSize(1024 * 1024 * 12); > > > > > > > > } catch (IOException e) { > > > > > > > > throw new RuntimeException("Failed table > > > construction", > > > > > e); > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > @Override > > > > > > > > public void map(GenericData.Record record, > > > > > > AvroCollector<NullWritable> > > > > > > > > collector, > > > > > > > > Reporter reporter) throws IOException { > > > > > > > > > > > > > > > > String u1 = record.get("u1").toString(); > > > > > > > > > > > > > > > > GenericData.Array<GenericData.Record> fields = > > > > > > > > (GenericData.Array<GenericData.Record>) record.get("bag"); > > > > > > > > for (GenericData.Record rec : fields) { > > > > > > > > Integer s1 = (Integer) rec.get("s1"); > > > > > > > > Integer n1 = (Integer) rec.get("n1"); > > > > > > > > Integer c1 = (Integer) rec.get("c1"); > > > > > > > > Integer freq = (Integer) rec.get("freq"); > > > > > > > > if (freq == null) { > > > > > > > > freq = 0; > > > > > > > > } > > > > > > > > > > > > > > > > String key = u1 + SEPARATOR + n1 + SEPARATOR + c1 > + > > > > > > SEPARATOR > > > > > > > + > > > > > > > > s1; > > > > > > > > Put put = new Put(Bytes.toBytes(key)); > > > > > > > > put.setWriteToWAL(false); > > > > > > > > put.add(Bytes.toBytes("info"), > > > > Bytes.toBytes("frequency"), > > > > > > > > Bytes.toBytes(freq.toString())); > > > > > > > > try { > > > > > > > > table.put(put); > > > > > > > > } catch (IOException e) { > > > > > > > > throw new RuntimeException("Error while > writing > > to > > > > " + > > > > > > > > table + " table.", e); > > > > > > > > } > > > > > > > > > > > > > > > > } > > > > > > > > logger.error("------------ Finished processing user: > " > > + > > > > u1); > > > > > > > > } > > > > > > > > > > > > > > > > @Override > > > > > > > > public void close() throws IOException { > > > > > > > > table.close(); > > > > > > > > } > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Oliver Meyn > > > > > > > Software Developer > > > > > > > Global Biodiversity Information Facility (GBIF) > > > > > > > +45 35 32 15 12 > > > > > > > http://www.gbif.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >