On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon <mlaff...@semanticresearch.com
> wrote:

> Follow-up to this issue, with a test map job to demonstrate it.
>
> I created RandomInputFormat that allows you to configure the number of
> input splits and the size of each split. The record reader generates a
> random key (UUID) and a value that is unique to each split. For example,
> you can set it up to have 10 splits of 100,000 records each. This will
> produce 1,000,000 records, each with a unique key, and values ranging from
> 0-99 (each repeated 100,000) times.
>
> Then I created a simple map job that accepts the input, writes an HBase
> row for each record, counts the records, and, at the end of the job in
> cleanup(), increments count columns (using HTable.ICV) in a special
> "count" row. For the example given, the count row would have columns
> c:0,c:1,...,c:9 each with the value 100,000, and an additional column c:n
> with the total, 1,000,000.
>
> I'm running all this on a 16 node cluster. For small jobs it works fine.
> For larger jobs restricted to a single CPU it works fine. However, if I
> crank up the number of splits and split size, and let it run on multiple
> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
> the individual count column values looked good (120000), but the total was
> 8640000, instead of 9600000.
>

Are you sure that cleanup() always runs? I don't know the semantics of
cleanup in the new API, but the fact that you got such a nice round number
indicates that several entire processes didn't get counted (not just some
lost edits due to a race)


>
> Is there some behavior of ICV I'm not groking?
>
> I'm in the process of trying to simplify the test but any advice, ideas,
> thoughts would be appreciated.
>
> Thanks,
> Mark
>
> P.S. here is the code for the mapper ...
>
>    public static class MyMapper extends Mapper<Text, Text, Text, Text> {
>        private HTable table;
>        Map<String, Long> counts = new HashMap<String, Long>();
>
>        @Override
>        protected void setup(Context context)
>                throws IOException, InterruptedException {
>            final HBaseConfiguration conf = new
> HBaseConfiguration(context.getConfiguration());
>            table = new HTable(conf, TABLENAME);
>            table.setAutoFlush(false);
>        }
>
>        @Override
>        protected void cleanup(Context context)
>                throws IOException, InterruptedException {
>
>            long totalCount = 0;
>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>                final String countStr = entry.getKey();
>                final long count = entry.getValue();
>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
> countStr.getBytes(), count);
>                totalCount += count;
>            }
>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
> COUNT_QUALIFIER, totalCount);
>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
> COUNT_QUALIFIER, totalCount);
>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
> COUNT2_QUALIFIER, totalCount);
>            table.close();
>        }
>
>        @Override
>        protected void map(Text key, Text value, Context context)
>                throws IOException, InterruptedException {
>
>            Put put = new Put(key.getBytes());
>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>            table.put(put);
>            table.flushCommits();
>            final String count = value.toString();
>            counts.put(count, 1L + (counts.containsKey(count) ?
> counts.get(count) : 0L));
>            context.getCounter("Debug", "ICV count").increment(1);
>            context.write(key, value);
>        }
>    }
>



-- 
Todd Lipcon
Software Engineer, Cloudera

Reply via email to