http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java new file mode 100644 index 0000000..c72a0c3 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -0,0 +1,786 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Collections; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators; + +public class SyncTable extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(SyncTable.class); + + static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; + static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; + static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; + static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; + static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; + static final String DRY_RUN_CONF_KEY="sync.table.dry.run"; + + Path sourceHashDir; + String sourceTableName; + String targetTableName; + + String sourceZkCluster; + String targetZkCluster; + boolean dryRun; + + Counters counters; + + public SyncTable(Configuration conf) { + super(conf); + } + + public Job createSubmittableJob(String[] args) throws IOException { + FileSystem fs = sourceHashDir.getFileSystem(getConf()); + if (!fs.exists(sourceHashDir)) { + throw new IOException("Source hash dir not found: " + sourceHashDir); + } + + HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir); + LOG.info("Read source hash manifest: " + tableHash); + LOG.info("Read " + tableHash.partitions.size() + " partition keys"); + if (!tableHash.tableName.equals(sourceTableName)) { + LOG.warn("Table name mismatch - manifest indicates hash was taken from: " + + tableHash.tableName + " but job is reading from: " + sourceTableName); + } + if (tableHash.numHashFiles != tableHash.partitions.size() + 1) { + throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" + + " should be 1 more than the number of partition keys. However, the manifest file " + + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys" + + " found in the partitions file is " + tableHash.partitions.size()); + } + + Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR); + int dataSubdirCount = 0; + for (FileStatus file : fs.listStatus(dataDir)) { + if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) { + dataSubdirCount++; + } + } + + if (dataSubdirCount != tableHash.numHashFiles) { + throw new RuntimeException("Hash data appears corrupt. The number of of hash files created" + + " should be 1 more than the number of partition keys. However, the number of data dirs" + + " found is " + dataSubdirCount + " but the number of partition keys" + + " found in the partitions file is " + tableHash.partitions.size()); + } + + Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name", + "syncTable_" + sourceTableName + "-" + targetTableName)); + Configuration jobConf = job.getConfiguration(); + job.setJarByClass(HashTable.class); + jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); + jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); + jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); + if (sourceZkCluster != null) { + jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); + } + if (targetZkCluster != null) { + jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); + } + jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun); + + TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), + SyncMapper.class, null, null, job); + + job.setNumReduceTasks(0); + + if (dryRun) { + job.setOutputFormatClass(NullOutputFormat.class); + } else { + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, + targetZkCluster, null, null); + + // would be nice to add an option for bulk load instead + } + + // Obtain an authentication token, for the specified cluster, on behalf of the current user + if (sourceZkCluster != null) { + Configuration peerConf = + HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); + TableMapReduceUtil.initCredentialsForCluster(job, peerConf); + } + return job; + } + + public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> { + Path sourceHashDir; + + Connection sourceConnection; + Connection targetConnection; + Table sourceTable; + Table targetTable; + boolean dryRun; + + HashTable.TableHash sourceTableHash; + HashTable.TableHash.Reader sourceHashReader; + ImmutableBytesWritable currentSourceHash; + ImmutableBytesWritable nextSourceKey; + HashTable.ResultHasher targetHasher; + + Throwable mapperException; + + public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS, + SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES, + MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED}; + + @Override + protected void setup(Context context) throws IOException { + + Configuration conf = context.getConfiguration(); + sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); + sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); + targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, + TableOutputFormat.OUTPUT_CONF_PREFIX); + sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); + targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); + dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); + + sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir); + LOG.info("Read source hash manifest: " + sourceTableHash); + LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys"); + + TableSplit split = (TableSplit) context.getInputSplit(); + ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow()); + + sourceHashReader = sourceTableHash.newReader(conf, splitStartKey); + findNextKeyHashPair(); + + // create a hasher, but don't start it right away + // instead, find the first hash batch at or after the start row + // and skip any rows that come before. they will be caught by the previous task + targetHasher = new HashTable.ResultHasher(); + } + + private static Connection openConnection(Configuration conf, String zkClusterConfKey, + String configPrefix) + throws IOException { + String zkCluster = conf.get(zkClusterConfKey); + Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, + zkCluster, configPrefix); + return ConnectionFactory.createConnection(clusterConf); + } + + private static Table openTable(Connection connection, Configuration conf, + String tableNameConfKey) throws IOException { + return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey))); + } + + /** + * Attempt to read the next source key/hash pair. + * If there are no more, set nextSourceKey to null + */ + private void findNextKeyHashPair() throws IOException { + boolean hasNext = sourceHashReader.next(); + if (hasNext) { + nextSourceKey = sourceHashReader.getCurrentKey(); + } else { + // no more keys - last hash goes to the end + nextSourceKey = null; + } + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + try { + // first, finish any hash batches that end before the scanned row + while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) { + moveToNextBatch(context); + } + + // next, add the scanned row (as long as we've reached the first batch) + if (targetHasher.isBatchStarted()) { + targetHasher.hashResult(value); + } + } catch (Throwable t) { + mapperException = t; + Throwables.propagateIfInstanceOf(t, IOException.class); + Throwables.propagateIfInstanceOf(t, InterruptedException.class); + Throwables.propagate(t); + } + } + + /** + * If there is an open hash batch, complete it and sync if there are diffs. + * Start a new batch, and seek to read the + */ + private void moveToNextBatch(Context context) throws IOException, InterruptedException { + if (targetHasher.isBatchStarted()) { + finishBatchAndCompareHashes(context); + } + targetHasher.startBatch(nextSourceKey); + currentSourceHash = sourceHashReader.getCurrentHash(); + + findNextKeyHashPair(); + } + + /** + * Finish the currently open hash batch. + * Compare the target hash to the given source hash. + * If they do not match, then sync the covered key range. + */ + private void finishBatchAndCompareHashes(Context context) + throws IOException, InterruptedException { + targetHasher.finishBatch(); + context.getCounter(Counter.BATCHES).increment(1); + if (targetHasher.getBatchSize() == 0) { + context.getCounter(Counter.EMPTY_BATCHES).increment(1); + } + ImmutableBytesWritable targetHash = targetHasher.getBatchHash(); + if (targetHash.equals(currentSourceHash)) { + context.getCounter(Counter.HASHES_MATCHED).increment(1); + } else { + context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1); + + ImmutableBytesWritable stopRow = nextSourceKey == null + ? new ImmutableBytesWritable(sourceTableHash.stopRow) + : nextSourceKey; + + if (LOG.isDebugEnabled()) { + LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey()) + + " to " + toHex(stopRow) + + " sourceHash: " + toHex(currentSourceHash) + + " targetHash: " + toHex(targetHash)); + } + + syncRange(context, targetHasher.getBatchStartKey(), stopRow); + } + } + private static String toHex(ImmutableBytesWritable bytes) { + return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength()); + } + + private static final CellScanner EMPTY_CELL_SCANNER + = new CellScanner(Collections.<Result>emptyIterator()); + + /** + * Rescan the given range directly from the source and target tables. + * Count and log differences, and if this is not a dry run, output Puts and Deletes + * to make the target table match the source table for this range + */ + private void syncRange(Context context, ImmutableBytesWritable startRow, + ImmutableBytesWritable stopRow) throws IOException, InterruptedException { + Scan scan = sourceTableHash.initScan(); + scan.setStartRow(startRow.copyBytes()); + scan.setStopRow(stopRow.copyBytes()); + + ResultScanner sourceScanner = sourceTable.getScanner(scan); + CellScanner sourceCells = new CellScanner(sourceScanner.iterator()); + + ResultScanner targetScanner = targetTable.getScanner(new Scan(scan)); + CellScanner targetCells = new CellScanner(targetScanner.iterator()); + + boolean rangeMatched = true; + byte[] nextSourceRow = sourceCells.nextRow(); + byte[] nextTargetRow = targetCells.nextRow(); + while(nextSourceRow != null || nextTargetRow != null) { + boolean rowMatched; + int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow); + if (rowComparison < 0) { + if (LOG.isInfoEnabled()) { + LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow)); + } + context.getCounter(Counter.TARGETMISSINGROWS).increment(1); + + rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER); + nextSourceRow = sourceCells.nextRow(); // advance only source to next row + } else if (rowComparison > 0) { + if (LOG.isInfoEnabled()) { + LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow)); + } + context.getCounter(Counter.SOURCEMISSINGROWS).increment(1); + + rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells); + nextTargetRow = targetCells.nextRow(); // advance only target to next row + } else { + // current row is the same on both sides, compare cell by cell + rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells); + nextSourceRow = sourceCells.nextRow(); + nextTargetRow = targetCells.nextRow(); + } + + if (!rowMatched) { + rangeMatched = false; + } + } + + sourceScanner.close(); + targetScanner.close(); + + context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED) + .increment(1); + } + + private static class CellScanner { + private final Iterator<Result> results; + + private byte[] currentRow; + private Result currentRowResult; + private int nextCellInRow; + + private Result nextRowResult; + + public CellScanner(Iterator<Result> results) { + this.results = results; + } + + /** + * Advance to the next row and return its row key. + * Returns null iff there are no more rows. + */ + public byte[] nextRow() { + if (nextRowResult == null) { + // no cached row - check scanner for more + while (results.hasNext()) { + nextRowResult = results.next(); + Cell nextCell = nextRowResult.rawCells()[0]; + if (currentRow == null + || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(), + nextCell.getRowOffset(), nextCell.getRowLength())) { + // found next row + break; + } else { + // found another result from current row, keep scanning + nextRowResult = null; + } + } + + if (nextRowResult == null) { + // end of data, no more rows + currentRowResult = null; + currentRow = null; + return null; + } + } + + // advance to cached result for next row + currentRowResult = nextRowResult; + nextCellInRow = 0; + currentRow = currentRowResult.getRow(); + nextRowResult = null; + return currentRow; + } + + /** + * Returns the next Cell in the current row or null iff none remain. + */ + public Cell nextCellInRow() { + if (currentRowResult == null) { + // nothing left in current row + return null; + } + + Cell nextCell = currentRowResult.rawCells()[nextCellInRow]; + nextCellInRow++; + if (nextCellInRow == currentRowResult.size()) { + if (results.hasNext()) { + Result result = results.next(); + Cell cell = result.rawCells()[0]; + if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength())) { + // result is part of current row + currentRowResult = result; + nextCellInRow = 0; + } else { + // result is part of next row, cache it + nextRowResult = result; + // current row is complete + currentRowResult = null; + } + } else { + // end of data + currentRowResult = null; + } + } + return nextCell; + } + } + + /** + * Compare the cells for the given row from the source and target tables. + * Count and log any differences. + * If not a dry run, output a Put and/or Delete needed to sync the target table + * to match the source table. + */ + private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells, + CellScanner targetCells) throws IOException, InterruptedException { + Put put = null; + Delete delete = null; + long matchingCells = 0; + boolean matchingRow = true; + Cell sourceCell = sourceCells.nextCellInRow(); + Cell targetCell = targetCells.nextCellInRow(); + while (sourceCell != null || targetCell != null) { + + int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell); + if (cellKeyComparison < 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Target missing cell: " + sourceCell); + } + context.getCounter(Counter.TARGETMISSINGCELLS).increment(1); + matchingRow = false; + + if (!dryRun) { + if (put == null) { + put = new Put(rowKey); + } + put.add(sourceCell); + } + + sourceCell = sourceCells.nextCellInRow(); + } else if (cellKeyComparison > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Source missing cell: " + targetCell); + } + context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1); + matchingRow = false; + + if (!dryRun) { + if (delete == null) { + delete = new Delete(rowKey); + } + // add a tombstone to exactly match the target cell that is missing on the source + delete.addColumn(CellUtil.cloneFamily(targetCell), + CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp()); + } + + targetCell = targetCells.nextCellInRow(); + } else { + // the cell keys are equal, now check values + if (CellUtil.matchingValue(sourceCell, targetCell)) { + matchingCells++; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Different values: "); + LOG.debug(" source cell: " + sourceCell + + " value: " + Bytes.toHex(sourceCell.getValueArray(), + sourceCell.getValueOffset(), sourceCell.getValueLength())); + LOG.debug(" target cell: " + targetCell + + " value: " + Bytes.toHex(targetCell.getValueArray(), + targetCell.getValueOffset(), targetCell.getValueLength())); + } + context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1); + matchingRow = false; + + if (!dryRun) { + // overwrite target cell + if (put == null) { + put = new Put(rowKey); + } + put.add(sourceCell); + } + } + sourceCell = sourceCells.nextCellInRow(); + targetCell = targetCells.nextCellInRow(); + } + + if (!dryRun && sourceTableHash.scanBatch > 0) { + if (put != null && put.size() >= sourceTableHash.scanBatch) { + context.write(new ImmutableBytesWritable(rowKey), put); + put = null; + } + if (delete != null && delete.size() >= sourceTableHash.scanBatch) { + context.write(new ImmutableBytesWritable(rowKey), delete); + delete = null; + } + } + } + + if (!dryRun) { + if (put != null) { + context.write(new ImmutableBytesWritable(rowKey), put); + } + if (delete != null) { + context.write(new ImmutableBytesWritable(rowKey), delete); + } + } + + if (matchingCells > 0) { + context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells); + } + if (matchingRow) { + context.getCounter(Counter.MATCHINGROWS).increment(1); + return true; + } else { + context.getCounter(Counter.ROWSWITHDIFFS).increment(1); + return false; + } + } + + /** + * Compare row keys of the given Result objects. + * Nulls are after non-nulls + */ + private static int compareRowKeys(byte[] r1, byte[] r2) { + if (r1 == null) { + return 1; // source missing row + } else if (r2 == null) { + return -1; // target missing row + } else { + // Sync on no META tables only. We can directly do what CellComparator is doing inside. + // Never the call going to MetaCellComparator. + return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length); + } + } + + /** + * Compare families, qualifiers, and timestamps of the given Cells. + * They are assumed to be of the same row. + * Nulls are after non-nulls. + */ + private static int compareCellKeysWithinRow(Cell c1, Cell c2) { + if (c1 == null) { + return 1; // source missing cell + } + if (c2 == null) { + return -1; // target missing cell + } + + int result = CellComparator.compareFamilies(c1, c2); + if (result != 0) { + return result; + } + + result = CellComparator.compareQualifiers(c1, c2); + if (result != 0) { + return result; + } + + // note timestamp comparison is inverted - more recent cells first + return CellComparator.compareTimestamps(c1, c2); + } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + if (mapperException == null) { + try { + finishRemainingHashRanges(context); + } catch (Throwable t) { + mapperException = t; + } + } + + try { + sourceTable.close(); + targetTable.close(); + sourceConnection.close(); + targetConnection.close(); + } catch (Throwable t) { + if (mapperException == null) { + mapperException = t; + } else { + LOG.error("Suppressing exception from closing tables", t); + } + } + + // propagate first exception + if (mapperException != null) { + Throwables.propagateIfInstanceOf(mapperException, IOException.class); + Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); + Throwables.propagate(mapperException); + } + } + + private void finishRemainingHashRanges(Context context) throws IOException, + InterruptedException { + TableSplit split = (TableSplit) context.getInputSplit(); + byte[] splitEndRow = split.getEndRow(); + boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow); + + // if there are more hash batches that begin before the end of this split move to them + while (nextSourceKey != null + && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) { + moveToNextBatch(context); + } + + if (targetHasher.isBatchStarted()) { + // need to complete the final open hash batch + + if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0) + || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) { + // the open hash range continues past the end of this region + // add a scan to complete the current hash range + Scan scan = sourceTableHash.initScan(); + scan.setStartRow(splitEndRow); + if (nextSourceKey == null) { + scan.setStopRow(sourceTableHash.stopRow); + } else { + scan.setStopRow(nextSourceKey.copyBytes()); + } + + ResultScanner targetScanner = null; + try { + targetScanner = targetTable.getScanner(scan); + for (Result row : targetScanner) { + targetHasher.hashResult(row); + } + } finally { + if (targetScanner != null) { + targetScanner.close(); + } + } + } // else current batch ends exactly at split end row + + finishBatchAndCompareHashes(context); + } + } + } + + private static final int NUM_ARGS = 3; + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + System.err.println(); + } + System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>"); + System.err.println(); + System.err.println("Options:"); + + System.err.println(" sourcezkcluster ZK cluster key of the source table"); + System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" targetzkcluster ZK cluster key of the target table"); + System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" dryrun if true, output counters but no writes"); + System.err.println(" (defaults to false)"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" sourcehashdir path to HashTable output dir for source table"); + System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)"); + System.err.println(" sourcetable Name of the source table to sync from"); + System.err.println(" targettable Name of the target table to sync to"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" For a dry run SyncTable of tableA from a remote source cluster"); + System.err.println(" to a local target cluster:"); + System.err.println(" $ hbase " + + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true" + + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase" + + " hdfs://nn:9000/hashes/tableA tableA tableA"); + } + + private boolean doCommandLine(final String[] args) { + if (args.length < NUM_ARGS) { + printUsage(null); + return false; + } + try { + sourceHashDir = new Path(args[args.length - 3]); + sourceTableName = args[args.length - 2]; + targetTableName = args[args.length - 1]; + + for (int i = 0; i < args.length - NUM_ARGS; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String sourceZkClusterKey = "--sourcezkcluster="; + if (cmd.startsWith(sourceZkClusterKey)) { + sourceZkCluster = cmd.substring(sourceZkClusterKey.length()); + continue; + } + + final String targetZkClusterKey = "--targetzkcluster="; + if (cmd.startsWith(targetZkClusterKey)) { + targetZkCluster = cmd.substring(targetZkClusterKey.length()); + continue; + } + + final String dryRunKey = "--dryrun="; + if (cmd.startsWith(dryRunKey)) { + dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length())); + continue; + } + + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + + + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (!doCommandLine(otherArgs)) { + return 1; + } + + Job job = createSubmittableJob(otherArgs); + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + return 1; + } + counters = job.getCounters(); + return 0; + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java new file mode 100644 index 0000000..63868da --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Locale; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; + +/** + * Convert HBase tabular data into a format that is consumable by Map/Reduce. + */ +@InterfaceAudience.Public +public class TableInputFormat extends TableInputFormatBase +implements Configurable { + + @SuppressWarnings("hiding") + private static final Log LOG = LogFactory.getLog(TableInputFormat.class); + + /** Job parameter that specifies the input table. */ + public static final String INPUT_TABLE = "hbase.mapreduce.inputtable"; + /** + * If specified, use start keys of this table to split. + * This is useful when you are preparing data for bulkload. + */ + private static final String SPLIT_TABLE = "hbase.mapreduce.splittable"; + /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified. + * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details. + */ + public static final String SCAN = "hbase.mapreduce.scan"; + /** Scan start row */ + public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start"; + /** Scan stop row */ + public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop"; + /** Column Family to Scan */ + public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family"; + /** Space delimited list of columns and column families to scan. */ + public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns"; + /** The timestamp used to filter columns with a specific timestamp. */ + public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp"; + /** The starting timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start"; + /** The ending timestamp used to filter columns with a specific range of versions. */ + public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end"; + /** The maximum number of version to return. */ + public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions"; + /** Set to false to disable server-side caching of blocks for this scan. */ + public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks"; + /** The number of rows for caching that will be passed to scanners. */ + public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows"; + /** Set the maximum number of values to return for each call to next(). */ + public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize"; + /** Specify if we have to shuffle the map tasks. */ + public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set the details for the table to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", + justification="Intentional") + public void setConf(Configuration configuration) { + this.conf = configuration; + + Scan scan = null; + + if (conf.get(SCAN) != null) { + try { + scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN)); + } catch (IOException e) { + LOG.error("An error occurred.", e); + } + } else { + try { + scan = createScanFromConfiguration(conf); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + setScan(scan); + } + + /** + * Sets up a {@link Scan} instance, applying settings from the configuration property + * constants defined in {@code TableInputFormat}. This allows specifying things such as: + * <ul> + * <li>start and stop rows</li> + * <li>column qualifiers or families</li> + * <li>timestamps or timerange</li> + * <li>scanner caching and batch size</li> + * </ul> + */ + public static Scan createScanFromConfiguration(Configuration conf) throws IOException { + Scan scan = new Scan(); + + if (conf.get(SCAN_ROW_START) != null) { + scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); + } + + if (conf.get(SCAN_ROW_STOP) != null) { + scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); + } + + if (conf.get(SCAN_COLUMNS) != null) { + addColumns(scan, conf.get(SCAN_COLUMNS)); + } + + for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) { + scan.addFamily(Bytes.toBytes(columnFamily)); + } + + if (conf.get(SCAN_TIMESTAMP) != null) { + scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); + } + + if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { + scan.setTimeRange( + Long.parseLong(conf.get(SCAN_TIMERANGE_START)), + Long.parseLong(conf.get(SCAN_TIMERANGE_END))); + } + + if (conf.get(SCAN_MAXVERSIONS) != null) { + scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); + } + + if (conf.get(SCAN_CACHEDROWS) != null) { + scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); + } + + if (conf.get(SCAN_BATCHSIZE) != null) { + scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); + } + + // false by default, full table scans generate too much BC churn + scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); + + return scan; + } + + @Override + protected void initialize(JobContext context) throws IOException { + // Do we have to worry about mis-matches between the Configuration from setConf and the one + // in this context? + TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); + try { + initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + /** + * Parses a combined family and qualifier and adds either both or just the + * family in case there is no qualifier. This assumes the older colon + * divided notation, e.g. "family:qualifier". + * + * @param scan The Scan to update. + * @param familyAndQualifier family and qualifier + * @throws IllegalArgumentException When familyAndQualifier is invalid. + */ + private static void addColumn(Scan scan, byte[] familyAndQualifier) { + byte [][] fq = KeyValue.parseColumn(familyAndQualifier); + if (fq.length == 1) { + scan.addFamily(fq[0]); + } else if (fq.length == 2) { + scan.addColumn(fq[0], fq[1]); + } else { + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + } + } + + /** + * Adds an array of columns specified using old format, family:qualifier. + * <p> + * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the + * input. + * + * @param scan The Scan to update. + * @param columns array of columns, formatted as <code>family:qualifier</code> + * @see Scan#addColumn(byte[], byte[]) + */ + public static void addColumns(Scan scan, byte [][] columns) { + for (byte[] column : columns) { + addColumn(scan, column); + } + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. Splits are shuffled if + * required. + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException { + List<InputSplit> splits = super.getSplits(context); + if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) { + Collections.shuffle(splits); + } + return splits; + } + + /** + * Convenience method to parse a string representation of an array of column specifiers. + * + * @param scan The Scan to update. + * @param columns The columns to parse. + */ + private static void addColumns(Scan scan, String columns) { + String[] cols = columns.split(" "); + for (String col : cols) { + addColumn(scan, Bytes.toBytes(col)); + } + } + + @Override + protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { + if (conf.get(SPLIT_TABLE) != null) { + TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE)); + try (Connection conn = ConnectionFactory.createConnection(getConf())) { + try (RegionLocator rl = conn.getRegionLocator(splitTableName)) { + return rl.getStartEndKeys(); + } + } + } + + return super.getStartEndKeys(); + } + + /** + * Sets split table in map-reduce job. + */ + public static void configureSplitTable(Job job, TableName tableName) { + job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java new file mode 100644 index 0000000..fb38ebe --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -0,0 +1,652 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.StringUtils; + +/** + * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, + * an {@link Scan} instance that defines the input columns etc. Subclasses may use + * other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * + * <p> + * An example of a subclass: + * <pre> + * class ExampleTIF extends TableInputFormatBase { + * + * {@literal @}Override + * protected void initialize(JobContext context) throws IOException { + * // We are responsible for the lifecycle of this connection until we hand it over in + * // initializeTable. + * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( + * job.getConfiguration())); + * TableName tableName = TableName.valueOf("exampleTable"); + * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. + * initializeTable(connection, tableName); + * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + * Bytes.toBytes("columnB") }; + * // optional, by default we'll get everything for the table. + * Scan scan = new Scan(); + * for (byte[] family : inputColumns) { + * scan.addFamily(family); + * } + * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + * scan.setFilter(exampleFilter); + * setScan(scan); + * } + * } + * </pre> + */ +@InterfaceAudience.Public +public abstract class TableInputFormatBase +extends InputFormat<ImmutableBytesWritable, Result> { + + /** Specify if we enable auto-balance for input in M/R jobs.*/ + public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance"; + /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce + * .input.autobalance property.*/ + public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" + + ".maxskewratio"; + /** Specify if the row key in table is text (ASCII between 32~126), + * default is true. False means the table is using binary row key*/ + public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey"; + + private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); + + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + + /** Holds the details for the internal scanner. + * + * @see Scan */ + private Scan scan = null; + /** The {@link Admin}. */ + private Admin admin; + /** The {@link Table} to scan. */ + private Table table; + /** The {@link RegionLocator} of the table. */ + private RegionLocator regionLocator; + /** The reader scanning the table, can be a custom one. */ + private TableRecordReader tableRecordReader = null; + /** The underlying {@link Connection} of the table. */ + private Connection connection; + + + /** The reverse DNS lookup cache mapping: IPAddress => HostName */ + private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>(); + + /** + * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses + * the default. + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws IOException When creating the reader fails. + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader<ImmutableBytesWritable, Result> createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException { + // Just in case a subclass is relying on JobConfigurable magic. + if (table == null) { + initialize(context); + } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + TableSplit tSplit = (TableSplit) split; + LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); + final TableRecordReader trr = + this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); + Scan sc = new Scan(this.scan); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setTable(getTable()); + return new RecordReader<ImmutableBytesWritable, Result>() { + + @Override + public void close() throws IOException { + trr.close(); + closeTable(); + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return trr.getCurrentKey(); + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return trr.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return trr.getProgress(); + } + + @Override + public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, + InterruptedException { + trr.initialize(inputsplit, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return trr.nextKeyValue(); + } + }; + } + + protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { + return getRegionLocator().getStartEndKeys(); + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. + * + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException { + boolean closeOnFinish = false; + + // Just in case a subclass is relying on JobConfigurable magic. + if (table == null) { + initialize(context); + closeOnFinish = true; + } + + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + + try { + RegionSizeCalculator sizeCalculator = + new RegionSizeCalculator(getRegionLocator(), getAdmin()); + + TableName tableName = getTable().getName(); + + Pair<byte[][], byte[][]> keys = getStartEndKeys(); + if (keys == null || keys.getFirst() == null || + keys.getFirst().length == 0) { + HRegionLocation regLoc = + getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List<InputSplit> splits = new ArrayList<>(1); + long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName()); + TableSplit split = new TableSplit(tableName, scan, + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc + .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize); + splits.add(split); + return splits; + } + List<InputSplit> splits = new ArrayList<>(keys.getFirst().length); + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || + Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; + + HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false); + // The below InetSocketAddress creation does a name resolution. + InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); + if (isa.isUnresolved()) { + LOG.warn("Failed resolve " + isa); + } + InetAddress regionAddress = isa.getAddress(); + String regionLocation; + regionLocation = reverseDNS(regionAddress); + + byte[] regionName = location.getRegionInfo().getRegionName(); + String encodedRegionName = location.getRegionInfo().getEncodedName(); + long regionSize = sizeCalculator.getRegionSize(regionName); + TableSplit split = new TableSplit(tableName, scan, + splitStart, splitStop, regionLocation, encodedRegionName, regionSize); + splits.add(split); + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits: split -> " + i + " -> " + split); + } + } + } + //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled. + boolean enableAutoBalance = context.getConfiguration() + .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false); + if (enableAutoBalance) { + long totalRegionSize=0; + for (int i = 0; i < splits.size(); i++){ + TableSplit ts = (TableSplit)splits.get(i); + totalRegionSize += ts.getLength(); + } + long averageRegionSize = totalRegionSize / splits.size(); + // the averageRegionSize must be positive. + if (averageRegionSize <= 0) { + LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " + + "set it to 1."); + averageRegionSize = 1; + } + return calculateRebalancedSplits(splits, context, averageRegionSize); + } else { + return splits; + } + } finally { + if (closeOnFinish) { + closeTable(); + } + } + } + + String reverseDNS(InetAddress ipAddress) throws UnknownHostException { + String hostName = this.reverseDNSCacheMap.get(ipAddress); + if (hostName == null) { + String ipAddressString = null; + try { + ipAddressString = DNS.reverseDns(ipAddress, null); + } catch (Exception e) { + // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the + // name service. Also, in case of ipv6, we need to use the InetAddress since resolving + // reverse DNS using jndi doesn't work well with ipv6 addresses. + ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); + } + if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); + hostName = Strings.domainNamePointerToHostName(ipAddressString); + this.reverseDNSCacheMap.put(ipAddress, hostName); + } + return hostName; + } + + /** + * Calculates the number of MapReduce input splits for the map tasks. The number of + * MapReduce input splits depends on the average region size and the "data skew ratio" user set in + * configuration. + * + * @param list The list of input splits before balance. + * @param context The current job context. + * @param average The average size of all regions . + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( + * org.apache.hadoop.mapreduce.JobContext) + */ + private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context, + long average) throws IOException { + List<InputSplit> resultList = new ArrayList<>(); + Configuration conf = context.getConfiguration(); + //The default data skew ratio is 3 + long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3); + //It determines which mode to use: text key mode or binary key mode. The default is text mode. + boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true); + long dataSkewThreshold = dataSkewRatio * average; + int count = 0; + while (count < list.size()) { + TableSplit ts = (TableSplit)list.get(count); + TableName tableName = ts.getTable(); + String regionLocation = ts.getRegionLocation(); + String encodedRegionName = ts.getEncodedRegionName(); + long regionSize = ts.getLength(); + if (regionSize >= dataSkewThreshold) { + // if the current region size is large than the data skew threshold, + // split the region into two MapReduce input splits. + byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey); + if (Arrays.equals(ts.getEndRow(), splitKey)) { + // Not splitting since the end key is the same as the split key + resultList.add(ts); + } else { + //Set the size of child TableSplit as 1/2 of the region size. The exact size of the + // MapReduce input splits is not far off. + TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey, + regionLocation, regionSize / 2); + TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation, + regionSize - regionSize / 2); + resultList.add(t1); + resultList.add(t2); + } + count++; + } else if (regionSize >= average) { + // if the region size between average size and data skew threshold size, + // make this region as one MapReduce input split. + resultList.add(ts); + count++; + } else { + // if the total size of several small continuous regions less than the average region size, + // combine them into one MapReduce input split. + long totalSize = regionSize; + byte[] splitStartKey = ts.getStartRow(); + byte[] splitEndKey = ts.getEndRow(); + count++; + for (; count < list.size(); count++) { + TableSplit nextRegion = (TableSplit)list.get(count); + long nextRegionSize = nextRegion.getLength(); + if (totalSize + nextRegionSize <= dataSkewThreshold) { + totalSize = totalSize + nextRegionSize; + splitEndKey = nextRegion.getEndRow(); + } else { + break; + } + } + TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey, + regionLocation, encodedRegionName, totalSize); + resultList.add(t); + } + } + return resultList; + } + + /** + * select a split point in the region. The selection of the split point is based on an uniform + * distribution assumption for the keys in a region. + * Here are some examples: + * + * <table> + * <tr> + * <th>start key</th> + * <th>end key</th> + * <th>is text</th> + * <th>split point</th> + * </tr> + * <tr> + * <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td> + * <td>'a', 'a', 'a', 'f', 'f', 'f'</td> + * <td>true</td> + * <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td> + * </tr> + * <tr> + * <td>'1', '1', '1', '0', '0', '0'</td> + * <td>'1', '1', '2', '5', '7', '9', '0'</td> + * <td>true</td> + * <td>'1', '1', '1', -78, -77, -76, -104</td> + * </tr> + * <tr> + * <td>'1', '1', '1', '0'</td> + * <td>'1', '1', '2', '0'</td> + * <td>true</td> + * <td>'1', '1', '1', -80</td> + * </tr> + * <tr> + * <td>13, -19, 126, 127</td> + * <td>13, -19, 127, 0</td> + * <td>false</td> + * <td>13, -19, 126, -65</td> + * </tr> + * </table> + * + * Set this function as "public static", make it easier for test. + * + * @param start Start key of the region + * @param end End key of the region + * @param isText It determines to use text key mode or binary key mode + * @return The split point in the region. + */ + @InterfaceAudience.Private + public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { + byte upperLimitByte; + byte lowerLimitByte; + //Use text mode or binary mode. + if (isText) { + //The range of text char set in ASCII is [32,126], the lower limit is space and the upper + // limit is '~'. + upperLimitByte = '~'; + lowerLimitByte = ' '; + } else { + upperLimitByte = -1; + lowerLimitByte = 0; + } + // For special case + // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h" + // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~" + if (start.length == 0 && end.length == 0){ + return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)}; + } + if (start.length == 0 && end.length != 0){ + return new byte[]{ end[0] }; + } + if (start.length != 0 && end.length == 0){ + byte[] result =new byte[start.length]; + result[0]=start[0]; + for (int k = 1; k < start.length; k++){ + result[k] = upperLimitByte; + } + return result; + } + return Bytes.split(start, end, false, 1)[1]; + } + + /** + * Test if the given region is to be included in the InputSplit while splitting + * the regions of a table. + * <p> + * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job, + * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br> + * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing, + * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys. + * <br> + * <br> + * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. + * <br> + * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included). + * + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input (default). + * + */ + protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { + return true; + } + + /** + * Allows subclasses to get the {@link RegionLocator}. + */ + protected RegionLocator getRegionLocator() { + if (regionLocator == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } + return regionLocator; + } + + /** + * Allows subclasses to get the {@link Table}. + */ + protected Table getTable() { + if (table == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } + return table; + } + + /** + * Allows subclasses to get the {@link Admin}. + */ + protected Admin getAdmin() { + if (admin == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } + return admin; + } + + /** + * Allows subclasses to initialize the table information. + * + * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. + * @param tableName The {@link TableName} of the table to process. + * @throws IOException + */ + protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (this.table != null || this.connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } + this.table = connection.getTable(tableName); + this.regionLocator = connection.getRegionLocator(tableName); + this.admin = connection.getAdmin(); + this.connection = connection; + } + + /** + * Gets the scan defining the actual details like columns etc. + * + * @return The internal scan instance. + */ + public Scan getScan() { + if (this.scan == null) this.scan = new Scan(); + return scan; + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.scan = scan; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader A different {@link TableRecordReader} + * implementation. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + /** + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * + */ + protected void initialize(JobContext context) throws IOException { + } + + /** + * Close the Table and related objects that were initialized via + * {@link #initializeTable(Connection, TableName)}. + * + * @throws IOException + */ + protected void closeTable() throws IOException { + close(admin, table, regionLocator, connection); + admin = null; + table = null; + regionLocator = null; + connection = null; + } + + private void close(Closeable... closables) throws IOException { + for (Closeable c : closables) { + if(c != null) { c.close(); } + } + } + +}