cleanup() must be being called because most of the counts are set
properly, and all the ICV calls are happening in cleanup(). It could be
that cleanup() isn't finishing because of an exception or something, but I
have tried really hard to figure out if that is happening and I don't see
it. The round numbers are due to the batch nature of the count updates; I
should probably make the split sizes slight random variations from the set
value so I can discern a pattern.

The other thing I didn't mention: I ran the 80x120000 test a few more
times. Sometimes it works, and sometimes it doesn't <sigh>. Could there be
an issue with data being moved around regions?

Thanks,
Mark


-----Original Message-----
From: Ryan Rawson [mailto:ryano...@gmail.com]
Sent: Friday, June 11, 2010 4:06 PM
To: user@hbase.apache.org
Subject: Re: ICV concurrency problem (?)

I would tend to agree with Todd here, I have generally used 'close()'
in the Hadoop 0.20 MR API to accomplish such tasks.

-ryan

On Fri, Jun 11, 2010 at 4:02 PM, Todd Lipcon <t...@cloudera.com> wrote:
> On Fri, Jun 11, 2010 at 3:56 PM, Mark Laffoon
<mlaff...@semanticresearch.com
>> wrote:
>
>> Follow-up to this issue, with a test map job to demonstrate it.
>>
>> I created RandomInputFormat that allows you to configure the number of
>> input splits and the size of each split. The record reader generates a
>> random key (UUID) and a value that is unique to each split. For
example,
>> you can set it up to have 10 splits of 100,000 records each. This will
>> produce 1,000,000 records, each with a unique key, and values ranging
from
>> 0-99 (each repeated 100,000) times.
>>
>> Then I created a simple map job that accepts the input, writes an HBase
>> row for each record, counts the records, and, at the end of the job in
>> cleanup(), increments count columns (using HTable.ICV) in a special
>> "count" row. For the example given, the count row would have columns
>> c:0,c:1,...,c:9 each with the value 100,000, and an additional column
c:n
>> with the total, 1,000,000.
>>
>> I'm running all this on a 16 node cluster. For small jobs it works
fine.
>> For larger jobs restricted to a single CPU it works fine. However, if I
>> crank up the number of splits and split size, and let it run on
multiple
>> nodes, I start to lose counts. For example I just ran: 80 x 120000. All
>> the individual count column values looked good (120000), but the total
was
>> 8640000, instead of 9600000.
>>
>
> Are you sure that cleanup() always runs? I don't know the semantics of
> cleanup in the new API, but the fact that you got such a nice round
number
> indicates that several entire processes didn't get counted (not just
some
> lost edits due to a race)
>
>
>>
>> Is there some behavior of ICV I'm not groking?
>>
>> I'm in the process of trying to simplify the test but any advice,
ideas,
>> thoughts would be appreciated.
>>
>> Thanks,
>> Mark
>>
>> P.S. here is the code for the mapper ...
>>
>>    public static class MyMapper extends Mapper<Text, Text, Text, Text>
{
>>        private HTable table;
>>        Map<String, Long> counts = new HashMap<String, Long>();
>>
>>       �...@override
>>        protected void setup(Context context)
>>                throws IOException, InterruptedException {
>>            final HBaseConfiguration conf = new
>> HBaseConfiguration(context.getConfiguration());
>>            table = new HTable(conf, TABLENAME);
>>            table.setAutoFlush(false);
>>        }
>>
>>       �...@override
>>        protected void cleanup(Context context)
>>                throws IOException, InterruptedException {
>>
>>            long totalCount = 0;
>>            for (Map.Entry<String,Long> entry : counts.entrySet()) {
>>                final String countStr = entry.getKey();
>>                final long count = entry.getValue();
>>                table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> countStr.getBytes(), count);
>>                totalCount += count;
>>            }
>>            table.incrementColumnValue(COUNT_ROWKEY, COUNT_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT_QUALIFIER, totalCount);
>>            table.incrementColumnValue(COUNT_ROWKEY, DATA_FAMILY,
>> COUNT2_QUALIFIER, totalCount);
>>            table.close();
>>        }
>>
>>       �...@override
>>        protected void map(Text key, Text value, Context context)
>>                throws IOException, InterruptedException {
>>
>>            Put put = new Put(key.getBytes());
>>            put.add(DATA_FAMILY, DATA_QUALIFIER, value.getBytes());
>>            table.put(put);
>>            table.flushCommits();
>>            final String count = value.toString();
>>            counts.put(count, 1L + (counts.containsKey(count) ?
>> counts.get(count) : 0L));
>>            context.getCounter("Debug", "ICV count").increment(1);
>>            context.write(key, value);
>>        }
>>    }
>>
>
>
>
> --
> Todd Lipcon
> Software Engineer, Cloudera
>

Reply via email to