Ruslan, I see you did all the required homework but this mail is really hard to read. Can you create a jira (http://issues.apache.org/jira/browse/HBASE) and attach all the code? This will also make it easier to track.
thx! J-D On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <rusla...@gmail.com> wrote: > Hi! > > I'm trying to use bulk import that writing HFiles directly into HDFS and > have a problem with multiple reducers. If I run MR to prepare HFIles with > more than one reducer then some values for keys are not appeared in the > table after loadtable.rb script execution. With one reducer everything works > fine. Let's take a look at details: > > Environment: > - Hadoop 0.20.1 > - HBase release 0.20.3 > > http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk > - the row id must be formatted as a ImmutableBytesWritable > - MR job should ensure a total ordering among all keys > > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt) > - TotalOrderPartitioner that uses the new API > > https://issues.apache.org/jira/browse/HBASE-2063 > - patched HFileOutputFormat > > Sample data of my keys: > 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1 > 1.306.CAN.ON.-1.LONDON.1.1.0.1 > 1.306.USA.CO.751.FT COLLINS.1.1.1.0 > 1.306.USA.CO.751.LITTLETON.1.1.1.0 > 4.6.USA.TX.623.MUENSTER.1.1.0.0 > 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0 > 4.68.USA.CT.533.WILLINGTON.1.1.1.0 > 4.68.USA.LA.642.LAFAYETTE.1.1.1.0 > 4.9.USA.CT.501.STAMFORD.1.1.0.0 > 4.9.USA.NJ.504.PRINCETON.1.1.0.1 > 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0 > > I've put everything together: > > 1) Test of TotalOrderPartitioner that checks how it works with my keys. > note that I've set up my comparator to pass that test > conf.setClass("mapred.output.key.comparator.class", MyKeyComparator.class, > Object.class); > > import java.io.IOException; > import java.util.ArrayList; > > import junit.framework.TestCase; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.WritableComparable; > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; > > public class TestTotalOrderPartitionerForHFileKeys extends TestCase { > > private static final ImmutableBytesWritable[] splitKeys = new > ImmutableBytesWritable[] { > // -inf > // 0 > new > ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")), > // 1 > new > ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")), > // 2 > new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW > YORK.1.1.0.0")), // 3 > new > ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")), > // 4 > new > ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")), > // 5 > new > ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")), > // 6 > new > ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),// > 7 > new > ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")), > // 8 > new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY > CITY.1.1.0.0")) // 9 > }; > > private static final ArrayList<Check<ImmutableBytesWritable>> testKeys = > new ArrayList<Check<ImmutableBytesWritable>>(); > static { > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9)); > testKeys.add(new Check<ImmutableBytesWritable>(new > ImmutableBytesWritable(Bytes > .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9)); > }; > > public void testTotalOrderHFileKeyBinarySearch() throws Exception { > TotalOrderPartitioner<ImmutableBytesWritable, NullWritable> > partitioner = new TotalOrderPartitioner<ImmutableBytesWritable, > NullWritable>(); > Configuration conf = new Configuration(); > Path p = > TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable> > writePartitionFile( > "totalorderbinarysearch", conf, splitKeys); > conf.setBoolean("total.order.partitioner.natural.order", false); > conf.setClass("mapred.mapoutput.key.class", > ImmutableBytesWritable.class, Object.class); > conf.setClass("mapred.output.key.comparator.class", > MyKeyComparator.class, Object.class); > > try { > partitioner.setConf(conf); > NullWritable nw = NullWritable.get(); > for (Check<ImmutableBytesWritable> chk : testKeys) { > log(Bytes.toString(chk.data.get()) + ", chk.part: " + > chk.part + ", should be: " > + partitioner.getPartition(chk.data, nw, > splitKeys.length + 1)); > > assertEquals(Bytes.toString(chk.data.get()), chk.part, > partitioner.getPartition(chk.data, nw, > splitKeys.length + 1)); > > } > } finally { > p.getFileSystem(conf).delete(p, true); > } > } > > public void testInventoryKeyComparator() { > InventoryKeyComparator comparator = new InventoryKeyComparator(); > for (int i = 0; i < splitKeys.length - 2; i++) { > // splitKeys should be sorted in ascending order > int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]); > assertTrue(res1 < 0); > > int res2 = comparator.compare(splitKeys[i].get(), 0, > splitKeys[i].get().length, > splitKeys[i + 1].get(), 0, splitKeys[i + > 1].get().length); > > assertTrue(res2 < 0); > assertTrue(res1 == res2); > } > } > > // ---------------------------------------- > // Copy-Paste from TestTotalOrderPartitoner > // http://issues.apache.org/jira/browse/MAPREDUCE-366 > // ---------------------------------------- > static class Check<T> { > T data; > int part; > > Check(T data, int part) { > this.data = data; > this.part = part; > } > } > > private static <T extends WritableComparable<?>> Path > writePartitionFile(String testname, > Configuration conf, T[] splits) throws IOException { > final FileSystem fs = FileSystem.getLocal(conf); > final Path testdir = new Path(System.getProperty("test.build.data", > "/tmp")) > .makeQualified(fs); > Path p = new Path(testdir, testname + "/_partition.lst"); > TotalOrderPartitioner.setPartitionFile(conf, p); > conf.setInt("mapred.reduce.tasks", splits.length + 1); > SequenceFile.Writer w = null; > try { > w = SequenceFile.createWriter(fs, conf, p, splits[0].getClass(), > NullWritable.class, > SequenceFile.CompressionType.NONE); > for (int i = 0; i < splits.length; ++i) { > w.append(splits[i], NullWritable.get()); > } > } finally { > if (null != w) > w.close(); > } > return p; > } > > private static void log(String message) { > System.out.println(message); > } > } > > 2) MyKeyComparator > I've wrote it to pass test above. > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.hadoop.io.RawComparator; > > public class MyKeyComparator implements > RawComparator<ImmutableBytesWritable> { > > public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2) > { > return Bytes.compareTo(o1.get(), o2.get()); > } > > public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) > { > return Bytes.compareTo(b1, s1, l1, b2, s2, l2); > } > } > > 3) MySampler > this code is based on InputSampler from > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt) > BUT I've put the following string into getSample: > reader.initialize(splits.get(i), new > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())); > without that string it doesn't work > > > import java.io.IOException; > import java.util.ArrayList; > import java.util.Arrays; > import java.util.List; > import java.util.Random; > import java.util.StringTokenizer; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.hadoop.io.NullWritable; > import org.apache.hadoop.io.RawComparator; > import org.apache.hadoop.io.SequenceFile; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.InputFormat; > import org.apache.hadoop.mapreduce.InputSplit; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.RecordReader; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > import org.apache.hadoop.mapreduce.TaskAttemptID; > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; > import org.apache.hadoop.util.ReflectionUtils; > > /** > * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler > */ > public class MySampler extends Configured { > private static final Log LOG = LogFactory.getLog(MySampler.class); > > public MySampler(Configuration conf) { > setConf(conf); > } > > /** > * Sample from random points in the input. > * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs > from > * each split. > */ > public static class RandomSampler { > private double freq; > private final int numSamples; > private final int maxSplitsSampled; > > /** > * Create a new RandomSampler sampling <em>all</em> splits. > * This will read every split at the client, which is very > expensive. > * @param freq Probability with which a key will be chosen. > * @param numSamples Total number of samples to obtain from all > selected > * splits. > */ > public RandomSampler(double freq, int numSamples) { > this(freq, numSamples, Integer.MAX_VALUE); > } > > /** > * Create a new RandomSampler. > * @param freq Probability with which a key will be chosen. > * @param numSamples Total number of samples to obtain from all > selected > * splits. > * @param maxSplitsSampled The maximum number of splits to examine. > */ > public RandomSampler(double freq, int numSamples, int > maxSplitsSampled) { > this.freq = freq; > this.numSamples = numSamples; > this.maxSplitsSampled = maxSplitsSampled; > } > > /** > * Randomize the split order, then take the specified number of keys > from > * each split sampled, where each key is selected with the specified > * probability and possibly replaced by a subsequently selected key > when > * the quota of keys from that split is satisfied. > */ > �...@suppresswarnings("unchecked") // ArrayList::toArray doesn't > preserve type > public ImmutableBytesWritable[] > getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job) > throws IOException, InterruptedException { > List<InputSplit> splits = inf.getSplits(job); > ArrayList<ImmutableBytesWritable> samples = new > ArrayList<ImmutableBytesWritable>(numSamples); > int splitsToSample = Math.min(maxSplitsSampled, splits.size()); > > Random r = new Random(); > long seed = r.nextLong(); > r.setSeed(seed); > LOG.debug("seed: " + seed); > // shuffle splits > for (int i = 0; i < splits.size(); ++i) { > InputSplit tmp = splits.get(i); > int j = r.nextInt(splits.size()); > splits.set(i, splits.get(j)); > splits.set(j, tmp); > } > // our target rate is in terms of the maximum number of sample > splits, > // but we accept the possibility of sampling additional splits to > hit > // the target sample keyset > for (int i = 0; i < splitsToSample || > (i < splits.size() && samples.size() < numSamples); > ++i) { > RecordReader<ImmutableBytesWritable, Text> reader = > inf.createRecordReader(splits.get(i), > new TaskAttemptContext(job.getConfiguration(), new > TaskAttemptID())); > > reader.initialize(splits.get(i), new > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())); > > while (reader.nextKeyValue()) { > if (r.nextDouble() <= freq) { > if (samples.size() < numSamples) { > samples.add(composeKey(reader.getCurrentValue())); > } else { > // When exceeding the maximum number of samples, replace a > // random element with this one, then adjust the frequency > // to reflect the possibility of existing elements being > // pushed out > int ind = r.nextInt(numSamples); > if (ind != numSamples) { > samples.set(ind, composeKey(reader.getCurrentValue())); > } > freq *= (numSamples - 1) / (double) numSamples; > } > } > } > reader.close(); > } > return (ImmutableBytesWritable[])samples.toArray(new > ImmutableBytesWritable[samples.size()]); > } > > } > > private static ImmutableBytesWritable composeKey(Text value) { > StringTokenizer itr = new StringTokenizer(value.toString(), "\t"); > int cnt = 0; > String[] vals = new > String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS]; > while (itr.hasMoreTokens()) { > vals[cnt] = itr.nextToken(); > cnt++; > } > > String newKeyStr = > AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0, > AdvancedRawLogUploader.NUM_KEY_FILEDS, > AdvancedRawLogUploader.KEY_PARTS_DELIMITER); > info(newKeyStr); > ImmutableBytesWritable newKey = new ImmutableBytesWritable(Bytes > .toBytes(newKeyStr)); > return newKey; > } > > /** > * Write a partition file for the given job, using the Sampler > provided. > * Queries the sampler for a sample keyset, sorts by the output key > * comparator, selects the keys for each rank, and writes to the > destination > * returned from {...@link TotalOrderPartitioner#getPartitionFile}. > */ > �...@suppresswarnings("unchecked") // getInputFormat, > getOutputKeyComparator > public static void writePartitionFile(Job job, RandomSampler sampler) > throws IOException, ClassNotFoundException, InterruptedException { > Configuration conf = job.getConfiguration(); > final InputFormat inf = > ReflectionUtils.newInstance(job.getInputFormatClass(), conf); > int numPartitions = job.getNumReduceTasks(); > ImmutableBytesWritable[] samples = sampler.getSample(inf, job); > LOG.info("Using " + samples.length + " samples"); > RawComparator<ImmutableBytesWritable> comparator = > (RawComparator<ImmutableBytesWritable>) job.getSortComparator(); > Arrays.sort(samples, comparator); > Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); > FileSystem fs = dst.getFileSystem(conf); > if (fs.exists(dst)) { > fs.delete(dst, false); > } > SequenceFile.Writer writer = SequenceFile.createWriter(fs, > conf, dst, job.getMapOutputKeyClass(), NullWritable.class); > NullWritable nullValue = NullWritable.get(); > float stepSize = samples.length / (float) numPartitions; > int last = -1; > for(int i = 1; i < numPartitions; ++i) { > int k = Math.round(stepSize * i); > while (last >= k && comparator.compare(samples[last], samples[k]) > == 0) { > ++k; > } > writer.append(samples[k], nullValue); > last = k; > } > writer.close(); > } > > private static void info(String message) { > // LOG.info(message); > System.out.println(message); > } > > > } > > 4) and finally the definition of my MR job: > > MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000, > 10); > in = in.makeQualified(in.getFileSystem(conf)); > Path partitionFile = new Path(in.getParent(), "_partitions"); > // Use TotalOrderPartitioner based on the new API: from > http://issues.apache.org/jira/browse/MAPREDUCE-366 > TotalOrderPartitioner.setPartitionFile(conf, partitionFile); > URI partitionUri = new URI(partitionFile.toString() + "#_partitions"); > DistributedCache.addCacheFile(partitionUri, conf); > DistributedCache.createSymlink(conf); > > Job job = new Job(conf, "Write HFiles at " + out.getName()); > job.setNumReduceTasks(numReduceTasks); > job.setJarByClass(MyHFilesWriter.class); > job.setMapperClass(MyMapper.class); > job.setMapOutputKeyClass(ImmutableBytesWritable.class); > job.setMapOutputValueClass(KeyValue.class); > job.setReducerClass(KeyValueSortReducer.class); > job.setOutputFormatClass(HFileOutputFormat.class); > > //job.setSortComparatorClass(MyKeyComparator.class); > // if you uncomment the code above you will get: > /* > 10/03/24 15:00:43 INFO mapred.JobClient: Task Id : > attempt_201003171417_0063_r_000000_0, Status : FAILED > java.io.IOException: Added a key not lexically larger than previous > key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag, > lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId > at > org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551) > at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513) > at org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481) > at > com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77) > at > com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49) > at > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) > at > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at > org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46) > at > org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at org.apache.hadoop.mapred.Child.main(Child.java:170) > > */ > > FileOutputFormat.setOutputPath(job, out); > FileInputFormat.addInputPath(job, in); > job.setPartitionerClass(TotalOrderPartitioner.class); > MySampler.writePartitionFile(job, sampler); > > System.exit(job.waitForCompletion(true) ? 0 : 1); > > So if I do not use the MyKeyComparator class ( > job.setSortComparatorClass(MyKeyComparator.class);) then nothing is changed > - it works but the values for some keys are not appeared in the table, > otherwise (with MyKeyComparator) the error occurs "Added a key not lexically > larger than previous key". > > What am I doing wrong? I want to run my MR with more than one reducer and > get all data in HBase table after loadtable.rb execution. > Thank you have read this far, I hope you didn't get headache :) > > Ruslan Salyakhov | rus...@jalent.ru >