I ran the following MR job that reads AVRO files & puts them on HBase.  The
files have tons of data (billions).  We have a fairly decent size cluster.
When I ran this MR job, it brought down HBase.  When I commented out the
Puts on HBase, the job completed in 45 seconds (yes that's seconds).

Obviously, my HBase configuration is not ideal.  I am using all the default
HBase configurations that come out of Cloudera's distribution:  0.90.4+49.

I am planning to read up on the following two:

http://hbase.apache.org/book/important_configurations.html
http://www.cloudera.com/blog/2011/04/hbase-dos-and-donts/

But can someone quickly take a look and recommend a list of priorities,
such as "try this first..."?  That would be greatly appreciated.  As
always, thanks for the time.


Here's the Mapper. (There's no reducer):



public class AvroProfileMapper extends AvroMapper<GenericData.Record,
NullWritable> {
    private static final Logger logger =
LoggerFactory.getLogger(AvroProfileMapper.class);

    final private String SEPARATOR = "*";

    private HTable table;

    private String datasetDate;
    private String tableName;

    @Override
    public void configure(JobConf jobConf) {
        super.configure(jobConf);
        datasetDate = jobConf.get("datasetDate");
        tableName = jobConf.get("tableName");

        // Open table for writing
        try {
            table = new HTable(jobConf, tableName);
            table.setAutoFlush(false);
            table.setWriteBufferSize(1024 * 1024 * 12);
        } catch (IOException e) {
            throw new RuntimeException("Failed table construction", e);
        }
    }

    @Override
    public void map(GenericData.Record record, AvroCollector<NullWritable>
collector,
                    Reporter reporter) throws IOException {

        String u1 = record.get("u1").toString();

        GenericData.Array<GenericData.Record> fields =
(GenericData.Array<GenericData.Record>) record.get("bag");
        for (GenericData.Record rec : fields) {
            Integer s1 = (Integer) rec.get("s1");
            Integer n1 = (Integer) rec.get("n1");
            Integer c1 = (Integer) rec.get("c1");
            Integer freq = (Integer) rec.get("freq");
            if (freq == null) {
                freq = 0;
            }

            String key = u1 + SEPARATOR + n1 + SEPARATOR + c1 + SEPARATOR +
s1;
            Put put = new Put(Bytes.toBytes(key));
            put.setWriteToWAL(false);
            put.add(Bytes.toBytes("info"), Bytes.toBytes("frequency"),
Bytes.toBytes(freq.toString()));
            try {
                table.put(put);
            } catch (IOException e) {
                throw new RuntimeException("Error while writing to " +
table + " table.", e);
            }

        }
        logger.error("------------  Finished processing user: " + u1);
    }

    @Override
    public void close() throws IOException {
        table.close();
    }

}

Reply via email to