Hello Bryan & Oliver,

I am using suggestions from both of you to do the bulk upload.  The problem
I am running into is that the job that uses 'HFileOutputFormat.
configureIncrementalLoad' is taking very long to complete.  One thing I
noticed is that it's using only 1 Reducer.

When I looked at the source code for HFileOutputFormat, I noticed that the
no. of Reducers is determined by this:

    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
        "to match current region count");
    job.setNumReduceTasks(startKeys.size());


When I look at the log I see this:

12/05/16 03:11:02 INFO mapreduce.HFileOutputFormat: Configuring 301 reduce
partitions to match current region count

which implies that the regions were created successfully.  But shouldn't
this set the number of Reducers to 301?  What am I missing?

Thanks for your help.



On Thu, May 10, 2012 at 9:04 AM, Bryan Beaudreault <bbeaudrea...@hubspot.com
> wrote:

> I don't think there is.  You need to have a table seeded with the right
> regions in order to run the bulk loader jobs.
>
> My machines are sufficiently fast that it did not take that long to sort.
>  One thing I did do to speed this up was add a mapper to the job that
> generates the splits,  which would calculate the size of each KeyValue.  So
> instead of passing around the KeyValue's I would pass around just the size
> of the KeyValues.  You could do a similar thing with the Puts.  Here are my
> keys/values for the job in full:
>
> Mapper:
>
> KeyIn: ImmutableBytesWritable
> ValueIn: KeyValue
>
> KeyOut: ImmutableBytesWritable
> ValueOut: IntWritable
>
> Reducer:
>
> KeyIn: ImmutableBytesWritable
> ValueIn: IntWritable
>
> At this point I would just add up the ints from the IntWritable.  This cuts
> down drastically on the amount of data passed around in the sort.
>
> Hope this helps.  If it is still too slow you might have to experiment with
> using many reducers and making sure you don't have holes or regions that
> are too big due to the way the keys are partitioned.  I was lucky enough to
> not have to go that far.
>
>
> On Thu, May 10, 2012 at 11:55 AM, Something Something <
> mailinglist...@gmail.com> wrote:
>
> > I am beginning to get a sinking feeling about this :(  But I won't give
> up!
> >
> > Problem is that when I use one Reducer the job runs for a long time.  I
> > killed it after about an hour.  Keep in mind, we do have a decent cluster
> > size.  The Map stage completes in a minute & when I set no. of reducers
> to
> > 0 (which is not what we want) the job completes in 12 minutes.  In other
> > words, sorting is taking very  very long!  What could be the problem?
> >
> > Is there no other way to do the bulk upload without first *learning* the
> > data?
> >
> > On Thu, May 10, 2012 at 7:15 AM, Bryan Beaudreault <
> > bbeaudrea...@hubspot.com
> > > wrote:
> >
> > > Since our Key was ImmutableByteWritable (representing a rowKey) and the
> > > Value was KeyValue, there could be many KeyValue's per row key (thus
> > values
> > > per hadoop key in the reducer).  So yes, what we did is very much the
> > same
> > > as what you described.  Hadoop will sort the ImutableByteWritable keys
> > > before sending them to the reducer.  This is the primary sort.  We then
> > > loop the values for each key, adding up the size of each KeyValue until
> > we
> > > reach the region size.  Each time that happens we record the rowKey
> from
> > > the hadoop key and use that as the start key for a new region.
> > >
> > > Secondary sort is not necessary unless the order of the values matter
> for
> > > you.  In this case (with the row key as the reducer key), I don't think
> > > that matters.
> > >
> > > On Thu, May 10, 2012 at 3:22 AM, Something Something <
> > > mailinglist...@gmail.com> wrote:
> > >
> > > > Thank you Tim & Bryan for the responses.  Sorry for the delayed
> > response.
> > > > Got busy with other things.
> > > >
> > > > Bryan - I decided to focus on the region split problem first.  The
> > > > challenge here is to find the correct start key for each region,
> right?
> > > > Here are the steps I could think of:
> > > >
> > > > 1)  Sort the keys.
> > > > 2)  Count how many keys & divide by # of regions we want to create.
> > >  (e.g.
> > > > 300).  This gives us # of keys in a region (region size).
> > > > 3)  Loop thru the sorted keys & every time region size is reached,
> > write
> > > > down region # & starting key.  This info can later be used to create
> > the
> > > > table.
> > > >
> > > > Honestly, I am not sure what you mean by "hadoop does this
> > > automatically".
> > > > If you used a single reducer, did you use secondary sort
> > > > (setOutputValueGroupingComparator) to sort the keys?  Did you loop
> thru
> > > the
> > > > *values* to find regions?  Would appreciate it if you would describe
> > this
> > > > MR job.  Thanks.
> > > >
> > > >
> > > > On Wed, May 9, 2012 at 8:25 AM, Bryan Beaudreault
> > > > <bbeaudrea...@hubspot.com>wrote:
> > > >
> > > > > I also recently had this problem, trying to index 6+ billion
> records
> > > into
> > > > > HBase.  The job would take about 4 hours before it brought down the
> > > > entire
> > > > > cluster, at only around 60% complete.
> > > > >
> > > > > After trying a bunch of things, we went to bulk loading.  This is
> > > > actually
> > > > > pretty easy, though the hardest part is that you need to have a
> table
> > > > ready
> > > > > with the region splits you are going to use.  Region splits aside,
> > > there
> > > > > are 2 steps:
> > > > >
> > > > > 1) Change your job to instead of executing yours Puts, just output
> > them
> > > > > using context.write.  Put is writable. (We used
> > ImmutableBytesWritable
> > > as
> > > > > the Key, representing the rowKey)
> > > > > 2) Add another job that reads that input and configure it
> > > > > using HFileOutputFormat.configureIncrementalLoad(Job job, HTable
> > > table);
> > > > >  This will add the right reducer.
> > > > >
> > > > > Once those two have run, you can finalize the process using the
> > > > > completebulkload tool documented at
> > > > > http://hbase.apache.org/bulk-loads.html
> > > > >
> > > > > For the region splits problem, we created another job which sorted
> > all
> > > of
> > > > > the puts by the key (hadoop does this automatically) and had a
> single
> > > > > reducer.  It stepped through all of the Puts calculating up the
> total
> > > > size
> > > > > until it reached some threshold.  When it did it recorded the
> > bytearray
> > > > and
> > > > > used that for the start of the next region. We used the result of
> > this
> > > > job
> > > > > to create a new table.  There is probably a better way to do this
> but
> > > it
> > > > > takes like 20 minutes to write.
> > > > >
> > > > > This whole process took less than an hour, with the bulk load part
> > only
> > > > > taking 15 minutes.  Much better!
> > > > >
> > > > > On Wed, May 9, 2012 at 11:08 AM, Something Something <
> > > > > mailinglist...@gmail.com> wrote:
> > > > >
> > > > > > Hey Oliver,
> > > > > >
> > > > > > Thanks a "billion" for the response -:)  I will take any code you
> > can
> > > > > > provide even if it's a hack!  I will even send you an Amazon gift
> > > card
> > > > -
> > > > > > not that you care or need it -:)
> > > > > >
> > > > > > Can you share some performance statistics?  Thanks again.
> > > > > >
> > > > > >
> > > > > > On Wed, May 9, 2012 at 8:02 AM, Oliver Meyn (GBIF) <
> om...@gbif.org
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Heya Something,
> > > > > > >
> > > > > > > I had a similar task recently and by far the best way to go
> about
> > > > this
> > > > > is
> > > > > > > with bulk loading after pre-splitting your target table.  As
> you
> > > know
> > > > > > > ImportTsv doesn't understand Avro files so I hacked together my
> > own
> > > > > > > ImportAvro class to create the Hfiles that I eventually moved
> > into
> > > > > HBase
> > > > > > > with completebulkload.  I haven't committed my class anywhere
> > > because
> > > > > > it's
> > > > > > > a pretty ugly hack, but I'm happy to share it with you as a
> > > starting
> > > > > > point.
> > > > > > >  Doing billions of puts will just drive you crazy.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Oliver
> > > > > > >
> > > > > > > On 2012-05-09, at 4:51 PM, Something Something wrote:
> > > > > > >
> > > > > > > > I ran the following MR job that reads AVRO files & puts them
> on
> > > > > HBase.
> > > > > > >  The
> > > > > > > > files have tons of data (billions).  We have a fairly decent
> > size
> > > > > > > cluster.
> > > > > > > > When I ran this MR job, it brought down HBase.  When I
> > commented
> > > > out
> > > > > > the
> > > > > > > > Puts on HBase, the job completed in 45 seconds (yes that's
> > > > seconds).
> > > > > > > >
> > > > > > > > Obviously, my HBase configuration is not ideal.  I am using
> all
> > > the
> > > > > > > default
> > > > > > > > HBase configurations that come out of Cloudera's
> distribution:
> > > > > > >  0.90.4+49.
> > > > > > > >
> > > > > > > > I am planning to read up on the following two:
> > > > > > > >
> > > > > > > > http://hbase.apache.org/book/important_configurations.html
> > > > > > > > http://www.cloudera.com/blog/2011/04/hbase-dos-and-donts/
> > > > > > > >
> > > > > > > > But can someone quickly take a look and recommend a list of
> > > > > priorities,
> > > > > > > > such as "try this first..."?  That would be greatly
> > appreciated.
> > > >  As
> > > > > > > > always, thanks for the time.
> > > > > > > >
> > > > > > > >
> > > > > > > > Here's the Mapper. (There's no reducer):
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > public class AvroProfileMapper extends
> > > > AvroMapper<GenericData.Record,
> > > > > > > > NullWritable> {
> > > > > > > >    private static final Logger logger =
> > > > > > > > LoggerFactory.getLogger(AvroProfileMapper.class);
> > > > > > > >
> > > > > > > >    final private String SEPARATOR = "*";
> > > > > > > >
> > > > > > > >    private HTable table;
> > > > > > > >
> > > > > > > >    private String datasetDate;
> > > > > > > >    private String tableName;
> > > > > > > >
> > > > > > > >    @Override
> > > > > > > >    public void configure(JobConf jobConf) {
> > > > > > > >        super.configure(jobConf);
> > > > > > > >        datasetDate = jobConf.get("datasetDate");
> > > > > > > >        tableName = jobConf.get("tableName");
> > > > > > > >
> > > > > > > >        // Open table for writing
> > > > > > > >        try {
> > > > > > > >            table = new HTable(jobConf, tableName);
> > > > > > > >            table.setAutoFlush(false);
> > > > > > > >            table.setWriteBufferSize(1024 * 1024 * 12);
> > > > > > > >        } catch (IOException e) {
> > > > > > > >            throw new RuntimeException("Failed table
> > > construction",
> > > > > e);
> > > > > > > >        }
> > > > > > > >    }
> > > > > > > >
> > > > > > > >    @Override
> > > > > > > >    public void map(GenericData.Record record,
> > > > > > AvroCollector<NullWritable>
> > > > > > > > collector,
> > > > > > > >                    Reporter reporter) throws IOException {
> > > > > > > >
> > > > > > > >        String u1 = record.get("u1").toString();
> > > > > > > >
> > > > > > > >        GenericData.Array<GenericData.Record> fields =
> > > > > > > > (GenericData.Array<GenericData.Record>) record.get("bag");
> > > > > > > >        for (GenericData.Record rec : fields) {
> > > > > > > >            Integer s1 = (Integer) rec.get("s1");
> > > > > > > >            Integer n1 = (Integer) rec.get("n1");
> > > > > > > >            Integer c1 = (Integer) rec.get("c1");
> > > > > > > >            Integer freq = (Integer) rec.get("freq");
> > > > > > > >            if (freq == null) {
> > > > > > > >                freq = 0;
> > > > > > > >            }
> > > > > > > >
> > > > > > > >            String key = u1 + SEPARATOR + n1 + SEPARATOR + c1
> +
> > > > > > SEPARATOR
> > > > > > > +
> > > > > > > > s1;
> > > > > > > >            Put put = new Put(Bytes.toBytes(key));
> > > > > > > >            put.setWriteToWAL(false);
> > > > > > > >            put.add(Bytes.toBytes("info"),
> > > > Bytes.toBytes("frequency"),
> > > > > > > > Bytes.toBytes(freq.toString()));
> > > > > > > >            try {
> > > > > > > >                table.put(put);
> > > > > > > >            } catch (IOException e) {
> > > > > > > >                throw new RuntimeException("Error while
> writing
> > to
> > > > " +
> > > > > > > > table + " table.", e);
> > > > > > > >            }
> > > > > > > >
> > > > > > > >        }
> > > > > > > >        logger.error("------------  Finished processing user:
> "
> > +
> > > > u1);
> > > > > > > >    }
> > > > > > > >
> > > > > > > >    @Override
> > > > > > > >    public void close() throws IOException {
> > > > > > > >        table.close();
> > > > > > > >    }
> > > > > > > >
> > > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Oliver Meyn
> > > > > > > Software Developer
> > > > > > > Global Biodiversity Information Facility (GBIF)
> > > > > > > +45 35 32 15 12
> > > > > > > http://www.gbif.org
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to