On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon <mlaff...@semanticresearch.com > wrote:
> Follow-up to this issue, with a test map job to demonstrate it. > > I created RandomInputFormat that allows you to configure the number of > input splits and the size of each split. The record reader generates a > random key (UUID) and a value that is unique to each split. For example, > you can set it up to have 10 splits of 100,000 records each. This will > produce 1,000,000 records, each with a unique key, and values ranging from > 0-99 (each repeated 100,000) times. > > Then I created a simple map job that accepts the input, writes an HBase > row for each record, counts the records, and, at the end of the job in > cleanup(), increments count columns (using HTable.ICV) in a special > "count" row. For the example given, the count row would have columns > c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n > with the total, 1,000,000. > > I'm running all this on a 16 node cluster. For small jobs it works fine. > For larger jobs restricted to a single CPU it works fine. However, if I > crank up the number of splits and split size, and let it run on multiple > nodes, I start to lose counts. For example I just ran: 80 x 120000. All > the individual count column values looked good (120000), but the total was > 8640000, instead of 9600000. > Are you sure that cleanup() always runs? I don't know the semantics of cleanup in the new API, but the fact that you got such a nice round number indicates that several entire processes didn't get counted (not just some lost edits due to a race) > > Is there some behavior of ICV I'm not groking? > > I'm in the process of trying to simplify the test but any advice, ideas, > thoughts would be appreciated. > > Thanks, > Mark > > P.S. here is the code for the mapper ... > > public static class MyMapper extends Mapper<Text, Text, Text, Text> { > private HTable table; > Map<String, Long> counts = new HashMap<String, Long>(); > > @Override > protected void setup(Context context) > throws IOException, InterruptedException { > final HBaseConfiguration conf = new > HBaseConfiguration(context.getConfiguration()); > table = new HTable(conf, TABLENAME); > table.setAutoFlush(false); > } > > @Override > protected void cleanup(Context context) > throws IOException, InterruptedException { > > long totalCount = 0; > for (Map.Entry<String,Long> entry : counts.entrySet()) { > final String countStr = entry.getKey(); > final long count = entry.getValue(); > table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY, > countStr.getBytes(), count); > totalCount += count; > } > table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY, > COUNT_QUALIFIER, totalCount); > table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY, > COUNT_QUALIFIER, totalCount); > table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY, > COUNT2_QUALIFIER, totalCount); > table.close(); > } > > @Override > protected void map(Text key, Text value, Context context) > throws IOException, InterruptedException { > > Put put = new Put(key.getBytes()); > put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes()); > table.put(put); > table.flushCommits(); > final String count = value.toString(); > counts.put(count, 1L + (counts.containsKey(count) ? > counts.get(count) : 0L)); > context.getCounter("Debug", "ICV count").increment(1); > context.write(key, value); > } > } > -- Todd Lipcon Software Engineer, Cloudera