Ruslan,

I see you did all the required homework but this mail is really hard
to read. Can you create a jira
(http://issues.apache.org/jira/browse/HBASE) and attach all the code?
This will also make it easier to track.

thx!

J-D

On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <rusla...@gmail.com> wrote:
> Hi!
>
> I'm trying to use bulk import that writing HFiles directly into HDFS and
> have a problem with multiple reducers. If I run MR to prepare HFIles with
> more than one reducer then some values for keys are not appeared in the
> table after loadtable.rb script execution. With one reducer everything works
> fine. Let's take a look at details:
>
> Environment:
> - Hadoop 0.20.1
> - HBase release 0.20.3
>
> http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
> - the row id must be formatted as a ImmutableBytesWritable
> - MR job should ensure a total ordering among all keys
>
> http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> - TotalOrderPartitioner that uses the new API
>
> https://issues.apache.org/jira/browse/HBASE-2063
> - patched HFileOutputFormat
>
> Sample data of my keys:
> 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1
> 1.306.CAN.ON.-1.LONDON.1.1.0.1
> 1.306.USA.CO.751.FT COLLINS.1.1.1.0
> 1.306.USA.CO.751.LITTLETON.1.1.1.0
> 4.6.USA.TX.623.MUENSTER.1.1.0.0
> 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0
> 4.68.USA.CT.533.WILLINGTON.1.1.1.0
> 4.68.USA.LA.642.LAFAYETTE.1.1.1.0
> 4.9.USA.CT.501.STAMFORD.1.1.0.0
> 4.9.USA.NJ.504.PRINCETON.1.1.0.1
> 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0
>
> I've put everything together:
>
> 1) Test of TotalOrderPartitioner that checks how it works with my keys.
> note that I've set up my comparator to pass that test
> conf.setClass("mapred.output.key.comparator.class", MyKeyComparator.class,
> Object.class);
>
> import java.io.IOException;
> import java.util.ArrayList;
>
> import junit.framework.TestCase;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.WritableComparable;
> import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
>
> public class TestTotalOrderPartitionerForHFileKeys extends TestCase {
>
>    private static final ImmutableBytesWritable[] splitKeys = new
> ImmutableBytesWritable[] {
>            // -inf
>                    // 0
>            new
> ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")),
>        // 1
>            new
> ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")),
>        // 2
>            new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW
> YORK.1.1.0.0")),         // 3
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")),
>        // 4
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")),
>        // 5
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")),
>    // 6
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),//
> 7
>            new
> ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")),
>    // 8
>            new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY
> CITY.1.1.0.0"))         // 9
>    };
>
>    private static final ArrayList<Check<ImmutableBytesWritable>> testKeys =
> new ArrayList<Check<ImmutableBytesWritable>>();
>    static {
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9));
>        testKeys.add(new Check<ImmutableBytesWritable>(new
> ImmutableBytesWritable(Bytes
>                .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9));
>    };
>
>    public void testTotalOrderHFileKeyBinarySearch() throws Exception {
>        TotalOrderPartitioner<ImmutableBytesWritable, NullWritable>
> partitioner = new TotalOrderPartitioner<ImmutableBytesWritable,
> NullWritable>();
>        Configuration conf = new Configuration();
>        Path p =
> TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable>
> writePartitionFile(
>                "totalorderbinarysearch", conf, splitKeys);
>        conf.setBoolean("total.order.partitioner.natural.order", false);
>        conf.setClass("mapred.mapoutput.key.class",
> ImmutableBytesWritable.class, Object.class);
>        conf.setClass("mapred.output.key.comparator.class",
> MyKeyComparator.class, Object.class);
>
>        try {
>            partitioner.setConf(conf);
>            NullWritable nw = NullWritable.get();
>            for (Check<ImmutableBytesWritable> chk : testKeys) {
>                log(Bytes.toString(chk.data.get()) + ", chk.part: " +
> chk.part + ", should be: "
>                        + partitioner.getPartition(chk.data, nw,
> splitKeys.length + 1));
>
>                assertEquals(Bytes.toString(chk.data.get()), chk.part,
>                        partitioner.getPartition(chk.data, nw,
> splitKeys.length + 1));
>
>            }
>        } finally {
>            p.getFileSystem(conf).delete(p, true);
>        }
>    }
>
>    public void testInventoryKeyComparator() {
>        InventoryKeyComparator comparator = new InventoryKeyComparator();
>        for (int i = 0; i < splitKeys.length - 2; i++) {
>            // splitKeys should be sorted in ascending order
>            int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]);
>            assertTrue(res1 < 0);
>
>            int res2 = comparator.compare(splitKeys[i].get(), 0,
> splitKeys[i].get().length,
>                    splitKeys[i + 1].get(), 0, splitKeys[i +
> 1].get().length);
>
>            assertTrue(res2 < 0);
>            assertTrue(res1 == res2);
>        }
>    }
>
>    // ----------------------------------------
>    // Copy-Paste from TestTotalOrderPartitoner
>    // http://issues.apache.org/jira/browse/MAPREDUCE-366
>    // ----------------------------------------
>    static class Check<T> {
>        T data;
>        int part;
>
>        Check(T data, int part) {
>            this.data = data;
>            this.part = part;
>        }
>    }
>
>    private static <T extends WritableComparable<?>> Path
> writePartitionFile(String testname,
>            Configuration conf, T[] splits) throws IOException {
>        final FileSystem fs = FileSystem.getLocal(conf);
>        final Path testdir = new Path(System.getProperty("test.build.data",
> "/tmp"))
>                .makeQualified(fs);
>        Path p = new Path(testdir, testname + "/_partition.lst");
>        TotalOrderPartitioner.setPartitionFile(conf, p);
>        conf.setInt("mapred.reduce.tasks", splits.length + 1);
>        SequenceFile.Writer w = null;
>        try {
>            w = SequenceFile.createWriter(fs, conf, p, splits[0].getClass(),
> NullWritable.class,
>                    SequenceFile.CompressionType.NONE);
>            for (int i = 0; i < splits.length; ++i) {
>                w.append(splits[i], NullWritable.get());
>            }
>        } finally {
>            if (null != w)
>                w.close();
>        }
>        return p;
>    }
>
>    private static void log(String message) {
>        System.out.println(message);
>    }
> }
>
> 2) MyKeyComparator
> I've wrote it to pass test above.
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.RawComparator;
>
> public class MyKeyComparator implements
> RawComparator<ImmutableBytesWritable> {
>
>    public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2)
> {
>        return Bytes.compareTo(o1.get(), o2.get());
>    }
>
>    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
> {
>        return Bytes.compareTo(b1, s1, l1, b2, s2, l2);
>    }
> }
>
> 3) MySampler
> this code is based on InputSampler from
> http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt)
> BUT I've put the following string into getSample:
>            reader.initialize(splits.get(i), new
> TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
> without that string it doesn't work
>
>
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.Random;
> import java.util.StringTokenizer;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.RawComparator;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.InputSplit;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.RecordReader;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
> import org.apache.hadoop.mapreduce.TaskAttemptID;
> import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
> import org.apache.hadoop.util.ReflectionUtils;
>
> /**
>  * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler
>  */
> public class MySampler extends Configured  {
>      private static final Log LOG = LogFactory.getLog(MySampler.class);
>
>      public MySampler(Configuration conf) {
>        setConf(conf);
>      }
>
>      /**
>       * Sample from random points in the input.
>       * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs
> from
>       * each split.
>       */
>      public static class RandomSampler {
>        private double freq;
>        private final int numSamples;
>        private final int maxSplitsSampled;
>
>        /**
>         * Create a new RandomSampler sampling <em>all</em> splits.
>         * This will read every split at the client, which is very
> expensive.
>         * @param freq Probability with which a key will be chosen.
>         * @param numSamples Total number of samples to obtain from all
> selected
>         *                   splits.
>         */
>        public RandomSampler(double freq, int numSamples) {
>          this(freq, numSamples, Integer.MAX_VALUE);
>        }
>
>        /**
>         * Create a new RandomSampler.
>         * @param freq Probability with which a key will be chosen.
>         * @param numSamples Total number of samples to obtain from all
> selected
>         *                   splits.
>         * @param maxSplitsSampled The maximum number of splits to examine.
>         */
>        public RandomSampler(double freq, int numSamples, int
> maxSplitsSampled) {
>          this.freq = freq;
>          this.numSamples = numSamples;
>          this.maxSplitsSampled = maxSplitsSampled;
>        }
>
>        /**
>         * Randomize the split order, then take the specified number of keys
> from
>         * each split sampled, where each key is selected with the specified
>         * probability and possibly replaced by a subsequently selected key
> when
>         * the quota of keys from that split is satisfied.
>         */
>       �...@suppresswarnings("unchecked") // ArrayList::toArray doesn't
> preserve type
>        public ImmutableBytesWritable[]
> getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job)
>            throws IOException, InterruptedException {
>          List<InputSplit> splits = inf.getSplits(job);
>          ArrayList<ImmutableBytesWritable> samples = new
> ArrayList<ImmutableBytesWritable>(numSamples);
>          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
>
>          Random r = new Random();
>          long seed = r.nextLong();
>          r.setSeed(seed);
>          LOG.debug("seed: " + seed);
>          // shuffle splits
>          for (int i = 0; i < splits.size(); ++i) {
>            InputSplit tmp = splits.get(i);
>            int j = r.nextInt(splits.size());
>            splits.set(i, splits.get(j));
>            splits.set(j, tmp);
>          }
>          // our target rate is in terms of the maximum number of sample
> splits,
>          // but we accept the possibility of sampling additional splits to
> hit
>          // the target sample keyset
>          for (int i = 0; i < splitsToSample ||
>                         (i < splits.size() && samples.size() < numSamples);
> ++i) {
>            RecordReader<ImmutableBytesWritable, Text> reader =
> inf.createRecordReader(splits.get(i),
>              new TaskAttemptContext(job.getConfiguration(), new
> TaskAttemptID()));
>
>            reader.initialize(splits.get(i), new
> TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
>
>            while (reader.nextKeyValue()) {
>              if (r.nextDouble() <= freq) {
>                if (samples.size() < numSamples) {
>                  samples.add(composeKey(reader.getCurrentValue()));
>                } else {
>                  // When exceeding the maximum number of samples, replace a
>                  // random element with this one, then adjust the frequency
>                  // to reflect the possibility of existing elements being
>                  // pushed out
>                  int ind = r.nextInt(numSamples);
>                  if (ind != numSamples) {
>                    samples.set(ind, composeKey(reader.getCurrentValue()));
>                  }
>                  freq *= (numSamples - 1) / (double) numSamples;
>                }
>              }
>            }
>            reader.close();
>          }
>          return (ImmutableBytesWritable[])samples.toArray(new
> ImmutableBytesWritable[samples.size()]);
>        }
>
>      }
>
>      private static ImmutableBytesWritable composeKey(Text value) {
>          StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
>            int cnt = 0;
>            String[] vals = new
> String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS];
>            while (itr.hasMoreTokens()) {
>                vals[cnt] = itr.nextToken();
>                cnt++;
>            }
>
>            String newKeyStr =
> AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0,
>                    AdvancedRawLogUploader.NUM_KEY_FILEDS,
> AdvancedRawLogUploader.KEY_PARTS_DELIMITER);
>            info(newKeyStr);
>            ImmutableBytesWritable newKey = new ImmutableBytesWritable(Bytes
>                    .toBytes(newKeyStr));
>          return newKey;
>      }
>
>      /**
>       * Write a partition file for the given job, using the Sampler
> provided.
>       * Queries the sampler for a sample keyset, sorts by the output key
>       * comparator, selects the keys for each rank, and writes to the
> destination
>       * returned from {...@link TotalOrderPartitioner#getPartitionFile}.
>       */
>     �...@suppresswarnings("unchecked") // getInputFormat,
> getOutputKeyComparator
>      public static void writePartitionFile(Job job, RandomSampler sampler)
>          throws IOException, ClassNotFoundException, InterruptedException {
>        Configuration conf = job.getConfiguration();
>        final InputFormat inf =
>            ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
>        int numPartitions = job.getNumReduceTasks();
>        ImmutableBytesWritable[] samples = sampler.getSample(inf, job);
>        LOG.info("Using " + samples.length + " samples");
>        RawComparator<ImmutableBytesWritable> comparator =
>          (RawComparator<ImmutableBytesWritable>) job.getSortComparator();
>        Arrays.sort(samples, comparator);
>        Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
>        FileSystem fs = dst.getFileSystem(conf);
>        if (fs.exists(dst)) {
>          fs.delete(dst, false);
>        }
>        SequenceFile.Writer writer = SequenceFile.createWriter(fs,
>          conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
>        NullWritable nullValue = NullWritable.get();
>        float stepSize = samples.length / (float) numPartitions;
>        int last = -1;
>        for(int i = 1; i < numPartitions; ++i) {
>          int k = Math.round(stepSize * i);
>          while (last >= k && comparator.compare(samples[last], samples[k])
> == 0) {
>            ++k;
>          }
>          writer.append(samples[k], nullValue);
>          last = k;
>        }
>        writer.close();
>      }
>
>      private static void info(String message) {
> //          LOG.info(message);
>          System.out.println(message);
>      }
>
>
> }
>
> 4) and finally the definition of my MR job:
>
> MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000,
> 10);
> in = in.makeQualified(in.getFileSystem(conf));
> Path partitionFile = new Path(in.getParent(), "_partitions");
> // Use TotalOrderPartitioner based on the new API: from
> http://issues.apache.org/jira/browse/MAPREDUCE-366
> TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
> URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
> DistributedCache.addCacheFile(partitionUri, conf);
> DistributedCache.createSymlink(conf);
>
> Job job = new Job(conf, "Write HFiles at " + out.getName());
> job.setNumReduceTasks(numReduceTasks);
> job.setJarByClass(MyHFilesWriter.class);
> job.setMapperClass(MyMapper.class);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class);
> job.setMapOutputValueClass(KeyValue.class);
> job.setReducerClass(KeyValueSortReducer.class);
> job.setOutputFormatClass(HFileOutputFormat.class);
>
> //job.setSortComparatorClass(MyKeyComparator.class);
> // if you uncomment the code above you will get:
>        /*
> 10/03/24 15:00:43 INFO mapred.JobClient: Task Id :
> attempt_201003171417_0063_r_000000_0, Status : FAILED
> java.io.IOException: Added a key not lexically larger than previous
> key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag,
> lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId
>    at
> org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551)
>    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513)
>    at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481)
>    at
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77)
>    at
> com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49)
>    at
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
>    at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>    at
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46)
>    at
> org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35)
>    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>    at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>    at org.apache.hadoop.mapred.Child.main(Child.java:170)
>
> */
>
> FileOutputFormat.setOutputPath(job, out);
> FileInputFormat.addInputPath(job, in);
> job.setPartitionerClass(TotalOrderPartitioner.class);
> MySampler.writePartitionFile(job, sampler);
>
> System.exit(job.waitForCompletion(true) ? 0 : 1);
>
> So if I do not use the MyKeyComparator class (
> job.setSortComparatorClass(MyKeyComparator.class);) then nothing is changed
> - it works but the values for some keys are not appeared in the table,
> otherwise (with MyKeyComparator) the error occurs "Added a key not lexically
> larger than previous key".
>
> What am I doing wrong? I want to run my MR with more than one reducer and
> get all data in HBase table after loadtable.rb execution.
> Thank you have read this far, I hope you didn't get headache :)
>
> Ruslan Salyakhov | rus...@jalent.ru
>

Reply via email to