Jean-Daniel, https://issues.apache.org/jira/browse/HBASE-2378
Ruslan On Thu, Mar 25, 2010 at 8:22 PM, Jean-Daniel Cryans <jdcry...@apache.org>wrote: > Ruslan, > > I see you did all the required homework but this mail is really hard > to read. Can you create a jira > (http://issues.apache.org/jira/browse/HBASE) and attach all the code? > This will also make it easier to track. > > thx! > > J-D > > On Wed, Mar 24, 2010 at 6:02 PM, Ruslan Salyakhov <rusla...@gmail.com> > wrote: > > Hi! > > > > I'm trying to use bulk import that writing HFiles directly into HDFS and > > have a problem with multiple reducers. If I run MR to prepare HFIles with > > more than one reducer then some values for keys are not appeared in the > > table after loadtable.rb script execution. With one reducer everything > works > > fine. Let's take a look at details: > > > > Environment: > > - Hadoop 0.20.1 > > - HBase release 0.20.3 > > > > > http://hadoop.apache.org/hbase/docs/r0.20.3/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk > > - the row id must be formatted as a ImmutableBytesWritable > > - MR job should ensure a total ordering among all keys > > > > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt) > > - TotalOrderPartitioner that uses the new API > > > > https://issues.apache.org/jira/browse/HBASE-2063 > > - patched HFileOutputFormat > > > > Sample data of my keys: > > 1.3.SWE.AB.-1.UPPLANDS-VASBY.1.1.0.1 > > 1.306.CAN.ON.-1.LONDON.1.1.0.1 > > 1.306.USA.CO.751.FT COLLINS.1.1.1.0 > > 1.306.USA.CO.751.LITTLETON.1.1.1.0 > > 4.6.USA.TX.623.MUENSTER.1.1.0.0 > > 4.67.USA.MI.563.GRAND RAPIDS.1.1.0.0 > > 4.68.USA.CT.533.WILLINGTON.1.1.1.0 > > 4.68.USA.LA.642.LAFAYETTE.1.1.1.0 > > 4.9.USA.CT.501.STAMFORD.1.1.0.0 > > 4.9.USA.NJ.504.PRINCETON.1.1.0.1 > > 4.92.USA.IN.527.INDIANAPOLIS.1.1.0.0 > > > > I've put everything together: > > > > 1) Test of TotalOrderPartitioner that checks how it works with my keys. > > note that I've set up my comparator to pass that test > > conf.setClass("mapred.output.key.comparator.class", > MyKeyComparator.class, > > Object.class); > > > > import java.io.IOException; > > import java.util.ArrayList; > > > > import junit.framework.TestCase; > > > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.fs.FileSystem; > > import org.apache.hadoop.fs.Path; > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > import org.apache.hadoop.hbase.util.Bytes; > > import org.apache.hadoop.io.NullWritable; > > import org.apache.hadoop.io.SequenceFile; > > import org.apache.hadoop.io.WritableComparable; > > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; > > > > public class TestTotalOrderPartitionerForHFileKeys extends TestCase { > > > > private static final ImmutableBytesWritable[] splitKeys = new > > ImmutableBytesWritable[] { > > // -inf > > // 0 > > new > > > ImmutableBytesWritable(Bytes.toBytes("0.27.USA.OK.650.FAIRVIEW.1.1.0.1")), > > // 1 > > new > > ImmutableBytesWritable(Bytes.toBytes("0.430.USA.TX.625.Rollup.1.1.0.0")), > > // 2 > > new ImmutableBytesWritable(Bytes.toBytes("0.9.USA.NY.501.NEW > > YORK.1.1.0.0")), // 3 > > new > > ImmutableBytesWritable(Bytes.toBytes("1.103.USA.DC.511.Rollup.1.1.0.0")), > > // 4 > > new > > ImmutableBytesWritable(Bytes.toBytes("1.11.CAN.QC.-1.MONTREAL.1.1.1.0")), > > // 5 > > new > > > ImmutableBytesWritable(Bytes.toBytes("1.220.USA.NC.Rollup.Rollup.1.1.1.0")), > > // 6 > > new > > > ImmutableBytesWritable(Bytes.toBytes("1.225.USA.Rollup.Rollup.Rollup.1.1.0.1")),// > > 7 > > new > > ImmutableBytesWritable(Bytes.toBytes("1.245.ZAF.WC.-1.PAROW.1.1.0.1")), > > // 8 > > new ImmutableBytesWritable(Bytes.toBytes("1.249.USA.MI.513.BAY > > CITY.1.1.0.0")) // 9 > > }; > > > > private static final ArrayList<Check<ImmutableBytesWritable>> testKeys > = > > new ArrayList<Check<ImmutableBytesWritable>>(); > > static { > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("0.10.USA.CA.825.SAN DIEGO.1.1.0.1")), 0)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("0.103.FRA.J.-1.PARIS.1.1.0.1")), 0)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("0.3.GBR.SCT.826032.PERTH.1.1.0.1")), 1)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("0.42.GBR.ENG.Rollup.Rollup.1.1.0.1")), 1)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("0.7.USA.CA.807.SANTA CLARA.1.1.0.0")), 2)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.10.SWE.AB.-1.STOCKHOLM.1.1.0.0")), 3)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.108.ABW.Rollup.Rollup.Rollup.1.1.0.0")), 4)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.11.CAN.NB.-1.SACKVILLE.1.1.0.1")), 4)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.11.CAN.Rollup.Rollup.Rollup.1.1.0.0")), 5)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.220.USA.NM.790.ALBUQUERQUE.1.1.0.0")), 6)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.23.GBR.ENG.826005.NEWHAM.1.1.0.0")), 7)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.248.GBR.ENG.826012.HULL.1.1.0.1")), 8)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.25.CAN.AB.-1.GRANDE PRAIRIE.1.1.0.0")), 9)); > > testKeys.add(new Check<ImmutableBytesWritable>(new > > ImmutableBytesWritable(Bytes > > .toBytes("1.25.CAN.AB.Rollup.Rollup.1.1.0.0")), 9)); > > }; > > > > public void testTotalOrderHFileKeyBinarySearch() throws Exception { > > TotalOrderPartitioner<ImmutableBytesWritable, NullWritable> > > partitioner = new TotalOrderPartitioner<ImmutableBytesWritable, > > NullWritable>(); > > Configuration conf = new Configuration(); > > Path p = > > TestTotalOrderPartitionerForHFileKeys.<ImmutableBytesWritable> > > writePartitionFile( > > "totalorderbinarysearch", conf, splitKeys); > > conf.setBoolean("total.order.partitioner.natural.order", false); > > conf.setClass("mapred.mapoutput.key.class", > > ImmutableBytesWritable.class, Object.class); > > conf.setClass("mapred.output.key.comparator.class", > > MyKeyComparator.class, Object.class); > > > > try { > > partitioner.setConf(conf); > > NullWritable nw = NullWritable.get(); > > for (Check<ImmutableBytesWritable> chk : testKeys) { > > log(Bytes.toString(chk.data.get()) + ", chk.part: " + > > chk.part + ", should be: " > > + partitioner.getPartition(chk.data, nw, > > splitKeys.length + 1)); > > > > assertEquals(Bytes.toString(chk.data.get()), chk.part, > > partitioner.getPartition(chk.data, nw, > > splitKeys.length + 1)); > > > > } > > } finally { > > p.getFileSystem(conf).delete(p, true); > > } > > } > > > > public void testInventoryKeyComparator() { > > InventoryKeyComparator comparator = new InventoryKeyComparator(); > > for (int i = 0; i < splitKeys.length - 2; i++) { > > // splitKeys should be sorted in ascending order > > int res1 = comparator.compare(splitKeys[i], splitKeys[i + 1]); > > assertTrue(res1 < 0); > > > > int res2 = comparator.compare(splitKeys[i].get(), 0, > > splitKeys[i].get().length, > > splitKeys[i + 1].get(), 0, splitKeys[i + > > 1].get().length); > > > > assertTrue(res2 < 0); > > assertTrue(res1 == res2); > > } > > } > > > > // ---------------------------------------- > > // Copy-Paste from TestTotalOrderPartitoner > > // http://issues.apache.org/jira/browse/MAPREDUCE-366 > > // ---------------------------------------- > > static class Check<T> { > > T data; > > int part; > > > > Check(T data, int part) { > > this.data = data; > > this.part = part; > > } > > } > > > > private static <T extends WritableComparable<?>> Path > > writePartitionFile(String testname, > > Configuration conf, T[] splits) throws IOException { > > final FileSystem fs = FileSystem.getLocal(conf); > > final Path testdir = new > Path(System.getProperty("test.build.data", > > "/tmp")) > > .makeQualified(fs); > > Path p = new Path(testdir, testname + "/_partition.lst"); > > TotalOrderPartitioner.setPartitionFile(conf, p); > > conf.setInt("mapred.reduce.tasks", splits.length + 1); > > SequenceFile.Writer w = null; > > try { > > w = SequenceFile.createWriter(fs, conf, p, > splits[0].getClass(), > > NullWritable.class, > > SequenceFile.CompressionType.NONE); > > for (int i = 0; i < splits.length; ++i) { > > w.append(splits[i], NullWritable.get()); > > } > > } finally { > > if (null != w) > > w.close(); > > } > > return p; > > } > > > > private static void log(String message) { > > System.out.println(message); > > } > > } > > > > 2) MyKeyComparator > > I've wrote it to pass test above. > > > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > import org.apache.hadoop.hbase.util.Bytes; > > import org.apache.hadoop.io.RawComparator; > > > > public class MyKeyComparator implements > > RawComparator<ImmutableBytesWritable> { > > > > public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable > o2) > > { > > return Bytes.compareTo(o1.get(), o2.get()); > > } > > > > public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int > l2) > > { > > return Bytes.compareTo(b1, s1, l1, b2, s2, l2); > > } > > } > > > > 3) MySampler > > this code is based on InputSampler from > > http://issues.apache.org/jira/browse/MAPREDUCE-366 (patch-5668-3.txt) > > BUT I've put the following string into getSample: > > reader.initialize(splits.get(i), new > > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())); > > without that string it doesn't work > > > > > > import java.io.IOException; > > import java.util.ArrayList; > > import java.util.Arrays; > > import java.util.List; > > import java.util.Random; > > import java.util.StringTokenizer; > > > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.conf.Configured; > > import org.apache.hadoop.fs.FileSystem; > > import org.apache.hadoop.fs.Path; > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > import org.apache.hadoop.hbase.util.Bytes; > > import org.apache.hadoop.io.NullWritable; > > import org.apache.hadoop.io.RawComparator; > > import org.apache.hadoop.io.SequenceFile; > > import org.apache.hadoop.io.Text; > > import org.apache.hadoop.mapreduce.InputFormat; > > import org.apache.hadoop.mapreduce.InputSplit; > > import org.apache.hadoop.mapreduce.Job; > > import org.apache.hadoop.mapreduce.RecordReader; > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > import org.apache.hadoop.mapreduce.TaskAttemptID; > > import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; > > import org.apache.hadoop.util.ReflectionUtils; > > > > /** > > * based on org.apache.hadoop.mapreduce.lib.partition.InputSampler > > */ > > public class MySampler extends Configured { > > private static final Log LOG = LogFactory.getLog(MySampler.class); > > > > public MySampler(Configuration conf) { > > setConf(conf); > > } > > > > /** > > * Sample from random points in the input. > > * General-purpose sampler. Takes numSamples / maxSplitsSampled > inputs > > from > > * each split. > > */ > > public static class RandomSampler { > > private double freq; > > private final int numSamples; > > private final int maxSplitsSampled; > > > > /** > > * Create a new RandomSampler sampling <em>all</em> splits. > > * This will read every split at the client, which is very > > expensive. > > * @param freq Probability with which a key will be chosen. > > * @param numSamples Total number of samples to obtain from all > > selected > > * splits. > > */ > > public RandomSampler(double freq, int numSamples) { > > this(freq, numSamples, Integer.MAX_VALUE); > > } > > > > /** > > * Create a new RandomSampler. > > * @param freq Probability with which a key will be chosen. > > * @param numSamples Total number of samples to obtain from all > > selected > > * splits. > > * @param maxSplitsSampled The maximum number of splits to > examine. > > */ > > public RandomSampler(double freq, int numSamples, int > > maxSplitsSampled) { > > this.freq = freq; > > this.numSamples = numSamples; > > this.maxSplitsSampled = maxSplitsSampled; > > } > > > > /** > > * Randomize the split order, then take the specified number of > keys > > from > > * each split sampled, where each key is selected with the > specified > > * probability and possibly replaced by a subsequently selected > key > > when > > * the quota of keys from that split is satisfied. > > */ > > @SuppressWarnings("unchecked") // ArrayList::toArray doesn't > > preserve type > > public ImmutableBytesWritable[] > > getSample(InputFormat<ImmutableBytesWritable, Text> inf, Job job) > > throws IOException, InterruptedException { > > List<InputSplit> splits = inf.getSplits(job); > > ArrayList<ImmutableBytesWritable> samples = new > > ArrayList<ImmutableBytesWritable>(numSamples); > > int splitsToSample = Math.min(maxSplitsSampled, splits.size()); > > > > Random r = new Random(); > > long seed = r.nextLong(); > > r.setSeed(seed); > > LOG.debug("seed: " + seed); > > // shuffle splits > > for (int i = 0; i < splits.size(); ++i) { > > InputSplit tmp = splits.get(i); > > int j = r.nextInt(splits.size()); > > splits.set(i, splits.get(j)); > > splits.set(j, tmp); > > } > > // our target rate is in terms of the maximum number of sample > > splits, > > // but we accept the possibility of sampling additional splits > to > > hit > > // the target sample keyset > > for (int i = 0; i < splitsToSample || > > (i < splits.size() && samples.size() < > numSamples); > > ++i) { > > RecordReader<ImmutableBytesWritable, Text> reader = > > inf.createRecordReader(splits.get(i), > > new TaskAttemptContext(job.getConfiguration(), new > > TaskAttemptID())); > > > > reader.initialize(splits.get(i), new > > TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())); > > > > while (reader.nextKeyValue()) { > > if (r.nextDouble() <= freq) { > > if (samples.size() < numSamples) { > > samples.add(composeKey(reader.getCurrentValue())); > > } else { > > // When exceeding the maximum number of samples, replace > a > > // random element with this one, then adjust the > frequency > > // to reflect the possibility of existing elements being > > // pushed out > > int ind = r.nextInt(numSamples); > > if (ind != numSamples) { > > samples.set(ind, > composeKey(reader.getCurrentValue())); > > } > > freq *= (numSamples - 1) / (double) numSamples; > > } > > } > > } > > reader.close(); > > } > > return (ImmutableBytesWritable[])samples.toArray(new > > ImmutableBytesWritable[samples.size()]); > > } > > > > } > > > > private static ImmutableBytesWritable composeKey(Text value) { > > StringTokenizer itr = new StringTokenizer(value.toString(), > "\t"); > > int cnt = 0; > > String[] vals = new > > String[AdvancedRawLogUploader.RAW_LOG_NUM_FILEDS]; > > while (itr.hasMoreTokens()) { > > vals[cnt] = itr.nextToken(); > > cnt++; > > } > > > > String newKeyStr = > > AdvancedRawLogUploader.RawLogMapper.generateKey(vals, 0, > > AdvancedRawLogUploader.NUM_KEY_FILEDS, > > AdvancedRawLogUploader.KEY_PARTS_DELIMITER); > > info(newKeyStr); > > ImmutableBytesWritable newKey = new > ImmutableBytesWritable(Bytes > > .toBytes(newKeyStr)); > > return newKey; > > } > > > > /** > > * Write a partition file for the given job, using the Sampler > > provided. > > * Queries the sampler for a sample keyset, sorts by the output key > > * comparator, selects the keys for each rank, and writes to the > > destination > > * returned from {...@link TotalOrderPartitioner#getPartitionFile}. > > */ > > @SuppressWarnings("unchecked") // getInputFormat, > > getOutputKeyComparator > > public static void writePartitionFile(Job job, RandomSampler > sampler) > > throws IOException, ClassNotFoundException, InterruptedException > { > > Configuration conf = job.getConfiguration(); > > final InputFormat inf = > > ReflectionUtils.newInstance(job.getInputFormatClass(), conf); > > int numPartitions = job.getNumReduceTasks(); > > ImmutableBytesWritable[] samples = sampler.getSample(inf, job); > > LOG.info("Using " + samples.length + " samples"); > > RawComparator<ImmutableBytesWritable> comparator = > > (RawComparator<ImmutableBytesWritable>) job.getSortComparator(); > > Arrays.sort(samples, comparator); > > Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); > > FileSystem fs = dst.getFileSystem(conf); > > if (fs.exists(dst)) { > > fs.delete(dst, false); > > } > > SequenceFile.Writer writer = SequenceFile.createWriter(fs, > > conf, dst, job.getMapOutputKeyClass(), NullWritable.class); > > NullWritable nullValue = NullWritable.get(); > > float stepSize = samples.length / (float) numPartitions; > > int last = -1; > > for(int i = 1; i < numPartitions; ++i) { > > int k = Math.round(stepSize * i); > > while (last >= k && comparator.compare(samples[last], > samples[k]) > > == 0) { > > ++k; > > } > > writer.append(samples[k], nullValue); > > last = k; > > } > > writer.close(); > > } > > > > private static void info(String message) { > > // LOG.info(message); > > System.out.println(message); > > } > > > > > > } > > > > 4) and finally the definition of my MR job: > > > > MySampler.RandomSampler sampler = new MySampler.RandomSampler(0.1, 10000, > > 10); > > in = in.makeQualified(in.getFileSystem(conf)); > > Path partitionFile = new Path(in.getParent(), "_partitions"); > > // Use TotalOrderPartitioner based on the new API: from > > http://issues.apache.org/jira/browse/MAPREDUCE-366 > > TotalOrderPartitioner.setPartitionFile(conf, partitionFile); > > URI partitionUri = new URI(partitionFile.toString() + "#_partitions"); > > DistributedCache.addCacheFile(partitionUri, conf); > > DistributedCache.createSymlink(conf); > > > > Job job = new Job(conf, "Write HFiles at " + out.getName()); > > job.setNumReduceTasks(numReduceTasks); > > job.setJarByClass(MyHFilesWriter.class); > > job.setMapperClass(MyMapper.class); > > job.setMapOutputKeyClass(ImmutableBytesWritable.class); > > job.setMapOutputValueClass(KeyValue.class); > > job.setReducerClass(KeyValueSortReducer.class); > > job.setOutputFormatClass(HFileOutputFormat.class); > > > > //job.setSortComparatorClass(MyKeyComparator.class); > > // if you uncomment the code above you will get: > > /* > > 10/03/24 15:00:43 INFO mapred.JobClient: Task Id : > > attempt_201003171417_0063_r_000000_0, Status : FAILED > > java.io.IOException: Added a key not lexically larger than previous > > key=1.9.USA.AOL.0.AOL.1.1.0.0valsCategoryRollupFlag, > > lastkey=2.14.USA.MA.0.?.1.1.0.0valsTagFormatId > > at > > org.apache.hadoop.hbase.io.hfile.HFile$Writer.checkKey(HFile.java:551) > > at > org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:513) > > at > org.apache.hadoop.hbase.io.hfile.HFile$Writer.append(HFile.java:481) > > at > > > com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:77) > > at > > > com.contextweb.hadoop.hbase.mapred.HFileOutputFormat$1.write(HFileOutputFormat.java:49) > > at > > > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) > > at > > > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > > at > > > org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:46) > > at > > > org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer.reduce(KeyValueSortReducer.java:35) > > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > > at > > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > > at org.apache.hadoop.mapred.Child.main(Child.java:170) > > > > */ > > > > FileOutputFormat.setOutputPath(job, out); > > FileInputFormat.addInputPath(job, in); > > job.setPartitionerClass(TotalOrderPartitioner.class); > > MySampler.writePartitionFile(job, sampler); > > > > System.exit(job.waitForCompletion(true) ? 0 : 1); > > > > So if I do not use the MyKeyComparator class ( > > job.setSortComparatorClass(MyKeyComparator.class);) then nothing is > changed > > - it works but the values for some keys are not appeared in the table, > > otherwise (with MyKeyComparator) the error occurs "Added a key not > lexically > > larger than previous key". > > > > What am I doing wrong? I want to run my MR with more than one reducer and > > get all data in HBase table after loadtable.rb execution. > > Thank you have read this far, I hope you didn't get headache :) > > > > Ruslan Salyakhov | rus...@jalent.ru > > >