Hi, Below is some code for improving the read performance of large tables by processing each region on the host holding that region. We measured 50-60% lower network bandwidth.
To use this class instead of org.apache.hadoop.hbase.mapred.TableInputFormat class use: jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix); Please send me feedback, if you can think off better ways to do this. -- Arthur van Hoff - Grand Master of Alphabetical Order The Ellerdale Project, Menlo Park, CA [EMAIL PROTECTED], 650-283-0842 -- TableInputFormatFix.java -- /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Author: Arthur van Hoff, [EMAIL PROTECTED] package ellerdale.mapreduce; import java.io.*; import java.util.*; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.mapred.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.Scanner; import org.apache.hadoop.hbase.io.*; import org.apache.hadoop.hbase.util.*; // // Attempt to fix the localized nature of table segments. // Compute table splits so that they are processed locally. // Combine multiple splits to avoid the number of splits exceeding numSplits. // Sort the resulting splits so that the shortest ones are processed last. // The resulting savings in network bandwidth are significant (we measured 60%). // public class TableInputFormatFix extends TableInputFormat { public static final int ORIGINAL = 0; public static final int LOCALIZED = 1; public static final int OPTIMIZED = 2; // not yet functional // // A table split with a location. // static class LocationTableSplit extends TableSplit implements Comparable { String location; public LocationTableSplit() { } public LocationTableSplit(byte [] tableName, byte [] startRow, byte [] endRow, String location) { super(tableName, startRow, endRow); this.location = location; } public String[] getLocations() { return new String[] {location}; } public void readFields(DataInput in) throws IOException { super.readFields(in); this.location = Bytes.toString(Bytes.readByteArray(in)); } public void write(DataOutput out) throws IOException { super.write(out); Bytes.writeByteArray(out, Bytes.toBytes(location)); } public int compareTo(Object other) { LocationTableSplit otherSplit = (LocationTableSplit)other; int result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow()); return result; } public String toString() { return location.substring(0, location.indexOf('.')) + ": " + Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow()); } } // // A table split with a location that covers multiple regions. // static class MultiRegionTableSplit extends LocationTableSplit { byte[][] regions; public MultiRegionTableSplit() { } public MultiRegionTableSplit(byte[] tableName, String location, byte[][] regions) throws IOException { super(tableName, regions[0], regions[regions.length-1], location); this.location = location; this.regions = regions; } public void readFields(DataInput in) throws IOException { super.readFields(in); int n = in.readInt(); regions = new byte[n][]; for (int i = 0 ; i < n ; i++) { regions[i] = Bytes.readByteArray(in); } } public void write(DataOutput out) throws IOException { super.write(out); out.writeInt(regions.length); for (int i = 0 ; i < regions.length ; i++) { Bytes.writeByteArray(out, regions[i]); } } public String toString() { String str = location.substring(0, location.indexOf('.')) + ": "; for (int i = 0 ; i < regions.length ; i += 2) { if (i > 0) { str += ", "; } str += Bytes.toString(regions[i]) + "-" + Bytes.toString(regions[i+1]); } return str; } public int compareTo(Object other) { MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other; int result = otherSplit.regions.length - regions.length; if (result == 0) { result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow()); } return result; } } // // TableRecordReader that can handle multiple regions. // protected class MultiRegionTableRecordReader implements RecordReader<ImmutableBytesWritable, RowResult> { private HTable htable; private byte [][] trrInputColumns; private int currentregion; private byte[][] regions; private byte [] lastRow; private Scanner scanner; void setHTable(HTable htable) { this.htable = htable; } void setInputColumns(final byte [][] inputColumns) { this.trrInputColumns = inputColumns; } void setRegions(byte[][] regions) { this.regions = regions; } int getRegion(byte[] row) { for (int i = 0 ; i < regions.length ; i += 2) { byte[] startRow = regions[i + 0]; byte[] endRow = regions[i + 1]; if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0) { continue; } if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) { continue; } return i/2; } return -1; } // // The buisiness end the the reader // public void init() throws IOException { restart(regions[0]); } public void restart(byte[] row) throws IOException { currentregion = getRegion(row); byte[] startRow = regions[currentregion*2 + 0]; byte[] endRow = regions[currentregion*2 + 1]; if (endRow.length > 0) { scanner = htable.getScanner(trrInputColumns, startRow, endRow); } else { scanner = htable.getScanner(trrInputColumns, startRow); } } public ImmutableBytesWritable createKey() { return new ImmutableBytesWritable(); } public RowResult createValue() { return new RowResult(); } public long getPos() { return 0; } public float getProgress() { int nregions = regions.length/2; return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f; } public boolean next(ImmutableBytesWritable key, RowResult value) throws IOException { while (true) { RowResult result; try { result = scanner.next(); } catch (UnknownScannerException e) { restart(lastRow); scanner.next(); // skip presumed already mapped row result = scanner.next(); } boolean hasMore = result != null && result.size() > 0; if (!hasMore && currentregion+1 < regions.length/2) { // move to the next region restart(regions[(currentregion+1)*2]); continue; } if (hasMore) { key.set(result.getRow()); lastRow = key.get(); Writables.copyWritable(result, value); } return hasMore; } } public void close() { scanner.close(); } } // // Main class // int type; HTable table; byte[][] inputColumns; MultiRegionTableRecordReader reader; public TableInputFormatFix() { this(OPTIMIZED); } public TableInputFormatFix(int type) { this.type = type; this.reader = new MultiRegionTableRecordReader(); } protected void setHTable(HTable table) { this.table = table; super.setHTable(table); } protected void setInputColums(byte [][] inputColumns) { this.inputColumns = inputColumns; super.setInputColums(inputColumns); } // // Create RecordReader // public RecordReader<ImmutableBytesWritable, RowResult> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { TableSplit tSplit = (TableSplit) split; MultiRegionTableRecordReader trr = reader; trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); //trr.setRowFilter(this.rowFilter); if (tSplit instanceof MultiRegionTableSplit) { trr.setRegions(((MultiRegionTableSplit)tSplit).regions); } else { trr.setRegions(new byte[][] {tSplit.getStartRow(), tSplit.getEndRow()}); } trr.init(); return trr; } // // Compute the splits. // public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { InputSplit[] splits = null; byte [][] startKeys = this.table.getStartKeys(); if (startKeys == null || startKeys.length == 0) { throw new IOException("Expecting at least one region"); } if (this.table == null) { throw new IOException("No table was provided"); } if (this.inputColumns == null || this.inputColumns.length == 0) { throw new IOException("Expecting at least one column"); } switch (type) { case ORIGINAL: { // This is the original algorithm with no locations. int realNumSplits = numSplits > startKeys.length ? startKeys.length : numSplits; splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; int startPos = 0; for (int i = 0; i < realNumSplits; i++) { int lastPos = startPos + middle; lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; splits[i] = new TableSplit(this.table.getTableName(), startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] : HConstants.EMPTY_START_ROW); startPos = lastPos; } break; } case LOCALIZED: { // This is the original algorithm with a minor fix for adding the likely location of each region. int realNumSplits = numSplits > startKeys.length ? startKeys.length : numSplits; splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; int startPos = 0; for (int i = 0; i < realNumSplits; i++) { int lastPos = startPos + middle; lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; String regionLocation = table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostname(); splits[i] = new LocationTableSplit(this.table.getTableName(), startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] : HConstants.EMPTY_START_ROW, regionLocation); startPos = lastPos; } break; } case OPTIMIZED: { // This is an optimized algorithm that bonds multiple regions together in each split. int nregions = 0; int nhosts = 0; HashMap<String, ArrayList<HRegionInfo>> hosts = new HashMap<String, ArrayList<HRegionInfo>>(); for (java.util.Map.Entry<HRegionInfo,HServerAddress> e : table.getRegionsInfo().entrySet()) { String host = e.getValue().getHostname(); ArrayList<HRegionInfo> regions = hosts.get(host); if (regions == null) { regions = new ArrayList<HRegionInfo>(); hosts.put(host, regions); nhosts++; } regions.add(e.getKey()); nregions++; } if (numSplits < nhosts) { numSplits = nhosts; } if (numSplits > nregions) { numSplits = nregions; } float sph = (float)numSplits/nhosts; float sphremainder = 0f; ArrayList<InputSplit> splitlist = new ArrayList<InputSplit>(); for (String host : hosts.keySet()) { ArrayList<HRegionInfo> regions = hosts.get(host); float rps = ((regions.size() - 1) + sphremainder) / (sph - 1); sphremainder = sph; float rpsremainder = 0f; for (int i = 0 ; i < regions.size() ;) { rpsremainder += rps; int splitSize = Math.max(1, (int)rpsremainder); if (i + splitSize > regions.size()) { splitSize = regions.size() - i; } //System.out.println(host + ": " + numSplits + "/" + nregions + "/" + splitSize + ":"); byte[][] splitregions = new byte[splitSize*2][]; for (int j = 0 ; j < splitSize ; j++) { HRegionInfo region = regions.get(i + j); splitregions[j*2 + 0] = region.getStartKey(); splitregions[j*2 + 1] = region.getEndKey(); } splitlist.add(new MultiRegionTableSplit(table.getTableName(), host, splitregions)); i += splitSize; rpsremainder -= splitSize; sphremainder -= 1; } } // copy into a real array (there must be a better way) int n = splitlist.size(); //n = 1; splits = new InputSplit[n]; for (int i = splits.length ; i-- > 0 ;) { splits[i] = splitlist.get(i); } Arrays.sort(splits); break; } } // dump the splits if (false) { System.out.println("---- " + splits.length + " splits ----"); for (int i = 0 ; i < splits.length ; i++) { System.out.println(i + ": " + splits[i].toString()); } System.exit(1); } // return the splits return splits; } }