http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java deleted file mode 100644 index 7fea254..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ /dev/null @@ -1,902 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * Writes HFiles. Passed Cells must arrive in order. - * Writes current time as the sequence id for the file. Sets the major compacted - * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll - * all HFiles being written. - * <p> - * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. - */ -@InterfaceAudience.Public -public class HFileOutputFormat2 - extends FileOutputFormat<ImmutableBytesWritable, Cell> { - private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class); - static class TableInfo { - private TableDescriptor tableDesctiptor; - private RegionLocator regionLocator; - - public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { - this.tableDesctiptor = tableDesctiptor; - this.regionLocator = regionLocator; - } - - /** - * The modification for the returned HTD doesn't affect the inner TD. - * @return A clone of inner table descriptor - * @deprecated use {@link #getTableDescriptor} - */ - @Deprecated - public HTableDescriptor getHTableDescriptor() { - return new HTableDescriptor(tableDesctiptor); - } - - public TableDescriptor getTableDescriptor() { - return tableDesctiptor; - } - - public RegionLocator getRegionLocator() { - return regionLocator; - } - } - - protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8); - - protected static byte[] combineTableNameSuffix(byte[] tableName, - byte[] suffix ) { - return Bytes.add(tableName, tableSeparator, suffix); - } - - // The following constants are private since these are used by - // HFileOutputFormat2 to internally transfer data between job setup and - // reducer run using conf. - // These should not be changed by the client. - static final String COMPRESSION_FAMILIES_CONF_KEY = - "hbase.hfileoutputformat.families.compression"; - static final String BLOOM_TYPE_FAMILIES_CONF_KEY = - "hbase.hfileoutputformat.families.bloomtype"; - static final String BLOCK_SIZE_FAMILIES_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.blocksize"; - static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; - - // This constant is public since the client can modify this when setting - // up their conf object and thus refer to this symbol. - // It is present for backwards compatibility reasons. Use it only to - // override the auto-detection of datablock encoding. - public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - - /** - * Keep locality while generating HFiles for bulkload. See HBASE-12596 - */ - public static final String LOCALITY_SENSITIVE_CONF_KEY = - "hbase.bulkload.locality.sensitive.enabled"; - private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; - static final String OUTPUT_TABLE_NAME_CONF_KEY = - "hbase.mapreduce.hfileoutputformat.table.name"; - static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = - "hbase.mapreduce.use.multi.table.hfileoutputformat"; - - public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy"; - public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; - - @Override - public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( - final TaskAttemptContext context) throws IOException, InterruptedException { - return createRecordWriter(context); - } - - protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { - return combineTableNameSuffix(tableName, family); - } - - static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> - createRecordWriter(final TaskAttemptContext context) - throws IOException { - - // Get the path of the temporary output file - final Path outputPath = FileOutputFormat.getOutputPath(context); - final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); - final Configuration conf = context.getConfiguration(); - final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; - final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); - if (writeTableNames==null || writeTableNames.isEmpty()) { - throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY - + " cannot be empty"); - } - final FileSystem fs = outputDir.getFileSystem(conf); - // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompressionStr = conf.get("hfile.compression", - Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = HFileWriterImpl - .compressionByName(defaultCompressionStr); - final boolean compactionExclude = conf.getBoolean( - "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); - - final Set<String> allTableNames = Arrays.stream(writeTableNames.split( - Bytes.toString(tableSeparator))).collect(Collectors.toSet()); - - // create a map from column family to the compression algorithm - final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); - final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); - final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); - - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - final Map<byte[], DataBlockEncoding> datablockEncodingMap - = createFamilyDataBlockEncodingMap(conf); - final DataBlockEncoding overriddenEncoding; - if (dataBlockEncodingStr != null) { - overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); - } else { - overriddenEncoding = null; - } - - return new RecordWriter<ImmutableBytesWritable, V>() { - // Map of families to writers and how much has been output on the writer. - private final Map<byte[], WriterLength> writers = - new TreeMap<>(Bytes.BYTES_COMPARATOR); - private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); - private boolean rollRequested = false; - - @Override - public void write(ImmutableBytesWritable row, V cell) - throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - - // null input == user explicitly wants to flush - if (row == null && kv == null) { - rollWriters(); - return; - } - - byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); - byte[] family = CellUtil.cloneFamily(kv); - byte[] tableNameBytes = null; - if (writeMultipleTables) { - tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); - if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { - throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + - "' not" + " expected"); - } - } else { - tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8); - } - byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); - WriterLength wl = this.writers.get(tableAndFamily); - - // If this is a new column family, verify that the directory exists - if (wl == null) { - Path writerPath = null; - if (writeMultipleTables) { - writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes - .toString(family))); - } - else { - writerPath = new Path(outputDir, Bytes.toString(family)); - } - fs.mkdirs(writerPath); - configureStoragePolicy(conf, fs, tableAndFamily, writerPath); - } - - // If any of the HFiles for the column families has reached - // maxsize, we need to roll all the writers - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - - // create a new WAL writer, if necessary - if (wl == null || wl.writer == null) { - if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - HRegionLocation loc = null; - - String tableName = Bytes.toString(tableNameBytes); - if (tableName != null) { - try (Connection connection = ConnectionFactory.createConnection(conf); - RegionLocator locator = - connection.getRegionLocator(TableName.valueOf(tableName))) { - loc = locator.getRegionLocation(rowKey); - } catch (Throwable e) { - LOG.warn("There's something wrong when locating rowkey: " + - Bytes.toString(rowKey) + " for tablename: " + tableName, e); - loc = null; - } } - - if (null == loc) { - if (LOG.isTraceEnabled()) { - LOG.trace("failed to get region location, so use default writer for rowkey: " + - Bytes.toString(rowKey)); - } - wl = getNewWriter(tableNameBytes, family, conf, null); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); - } - InetSocketAddress initialIsa = - new InetSocketAddress(loc.getHostname(), loc.getPort()); - if (initialIsa.isUnresolved()) { - if (LOG.isTraceEnabled()) { - LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" - + loc.getPort() + ", so use default writer"); - } - wl = getNewWriter(tableNameBytes, family, conf, null); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); - } - wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa - }); - } - } - } else { - wl = getNewWriter(tableNameBytes, family, conf, null); - } - } - - // we now have the proper WAL writer. full steam ahead - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - - // Copy the row so we know when a row transition. - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info( - "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); - close(wl.writer); - } - wl.writer = null; - wl.written = 0; - } - this.rollRequested = false; - } - - /* - * Create a new StoreFile.Writer. - * @param family - * @return A WriterLength, containing a new StoreFile.Writer. - * @throws IOException - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", - justification="Not important") - private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration - conf, InetSocketAddress[] favoredNodes) throws IOException { - byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); - Path familydir = new Path(outputDir, Bytes.toString(family)); - if (writeMultipleTables) { - familydir = new Path(outputDir, - new Path(Bytes.toString(tableName), Bytes.toString(family))); - } - WriterLength wl = new WriterLength(); - Algorithm compression = compressionMap.get(tableAndFamily); - compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(tableAndFamily); - bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(tableAndFamily); - blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; - DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; - encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder() - .withCompression(compression) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) - .withBlockSize(blockSize); - - if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { - contextBuilder.withIncludesTags(true); - } - - contextBuilder.withDataBlockEncoding(encoding); - HFileContext hFileContext = contextBuilder.build(); - if (null == favoredNodes) { - wl.writer = - new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); - } else { - wl.writer = - new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) - .withOutputDir(familydir).withBloomType(bloomType) - .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) - .withFavoredNodes(favoredNodes).build(); - } - - this.writers.put(tableAndFamily, wl); - return wl; - } - - private void close(final StoreFileWriter w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); - } - } - - @Override - public void close(TaskAttemptContext c) - throws IOException, InterruptedException { - for (WriterLength wl: this.writers.values()) { - close(wl.writer); - } - } - }; - } - - /** - * Configure block storage policy for CF after the directory is created. - */ - static void configureStoragePolicy(final Configuration conf, final FileSystem fs, - byte[] tableAndFamily, Path cfPath) { - if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { - return; - } - - String policy = - conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), - conf.get(STORAGE_POLICY_PROPERTY)); - FSUtils.setStoragePolicy(fs, cfPath, policy); - } - - /* - * Data structure to hold a Writer and amount of data written on it. - */ - static class WriterLength { - long written = 0; - StoreFileWriter writer = null; - } - - /** - * Return the start keys of all of the regions in this table, - * as a list of ImmutableBytesWritable. - */ - private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, - boolean writeMultipleTables) - throws IOException { - - ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); - for(RegionLocator regionLocator : regionLocators) - { - TableName tableName = regionLocator.getName(); - LOG.info("Looking up current regions for table " + tableName); - byte[][] byteKeys = regionLocator.getStartKeys(); - for (byte[] byteKey : byteKeys) { - byte[] fullKey = byteKey; //HFileOutputFormat2 use case - if (writeMultipleTables) - { - //MultiTableHFileOutputFormat use case - fullKey = combineTableNameSuffix(tableName.getName(), byteKey); - } - if (LOG.isDebugEnabled()) { - LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary - (fullKey) + "]"); - } - ret.add(new ImmutableBytesWritable(fullKey)); - } - } - return ret; - } - - /** - * Write out a {@link SequenceFile} that can be read by - * {@link TotalOrderPartitioner} that contains the split points in startKeys. - */ - @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, - List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { - LOG.info("Writing partition information to " + partitionsPath); - if (startKeys.isEmpty()) { - throw new IllegalArgumentException("No regions passed"); - } - - // We're generating a list of split points, and we don't ever - // have keys < the first region (which has an empty start key) - // so we need to remove it. Otherwise we would end up with an - // empty reducer with index 0 - TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); - ImmutableBytesWritable first = sorted.first(); - if (writeMultipleTables) { - first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first - ().get())); - } - if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException( - "First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); - } - sorted.remove(sorted.first()); - - // Write the actual file - FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter( - fs, conf, partitionsPath, ImmutableBytesWritable.class, - NullWritable.class); - - try { - for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); - } - } finally { - writer.close(); - } - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) - throws IOException { - configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, - RegionLocator regionLocator) throws IOException { - ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); - singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); - configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); - } - - static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException { - Configuration conf = job.getConfiguration(); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(cls); - - if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { - throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); - } - boolean writeMultipleTables = false; - if (MultiTableHFileOutputFormat.class.equals(cls)) { - writeMultipleTables = true; - conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); - } - // Based on the configured map output class, set the correct reducer to properly - // sort the incoming values. - // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); - } else if (Put.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(PutSortReducer.class); - } else if (Text.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(TextSortReducer.class); - } else { - LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); - } - - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - - if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { - LOG.info("bulkload locality sensitive enabled"); - } - - /* Now get the region start keys for every table required */ - List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); - List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size()); - List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size()); - - for( TableInfo tableInfo : multiTableInfo ) - { - regionLocators.add(tableInfo.getRegionLocator()); - allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); - tableDescriptors.add(tableInfo.getTableDescriptor()); - } - // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table - conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes - .toString(tableSeparator))); - List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); - // Use table's region boundaries for TOP split points. - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + - "to match current region count for all tables"); - job.setNumReduceTasks(startKeys.size()); - - configurePartitioner(job, startKeys, writeMultipleTables); - // Set compression algorithms based on column families - - conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, - tableDescriptors)); - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, - tableDescriptors)); - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, - tableDescriptors)); - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, - serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); - } - - public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws - IOException { - Configuration conf = job.getConfiguration(); - - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(HFileOutputFormat2.class); - - ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); - singleTableDescriptor.add(tableDescriptor); - - conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); - // Set compression algorithms based on column families - conf.set(COMPRESSION_FAMILIES_CONF_KEY, - serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, - serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, - serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, - serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); - } - - /** - * Runs inside the task to deserialize column family to compression algorithm - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured compression algorithm - */ - @VisibleForTesting - static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration - conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - COMPRESSION_FAMILIES_CONF_KEY); - Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); - compressionMap.put(e.getKey(), algorithm); - } - return compressionMap; - } - - /** - * Runs inside the task to deserialize column family to bloom filter type - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the the configured bloom filter type - */ - @VisibleForTesting - static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - BLOOM_TYPE_FAMILIES_CONF_KEY); - Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - BloomType bloomType = BloomType.valueOf(e.getValue()); - bloomTypeMap.put(e.getKey(), bloomType); - } - return bloomTypeMap; - } - - /** - * Runs inside the task to deserialize column family to block size - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured block size - */ - @VisibleForTesting - static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - BLOCK_SIZE_FAMILIES_CONF_KEY); - Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Integer blockSize = Integer.parseInt(e.getValue()); - blockSizeMap.put(e.getKey(), blockSize); - } - return blockSizeMap; - } - - /** - * Runs inside the task to deserialize column family to data block encoding - * type map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to HFileDataBlockEncoder for the - * configured data block type for the family - */ - @VisibleForTesting - static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( - Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, - DATABLOCK_ENCODING_FAMILIES_CONF_KEY); - Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); - } - return encoderMap; - } - - - /** - * Run inside the task to deserialize column family to given conf value map. - * - * @param conf to read the serialized values from - * @param confName conf key to read from the configuration - * @return a map of column family to the given configuration value - */ - private static Map<byte[], String> createFamilyConfValueMap( - Configuration conf, String confName) { - Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - String confVal = conf.get(confName, ""); - for (String familyConf : confVal.split("&")) { - String[] familySplit = familyConf.split("="); - if (familySplit.length != 2) { - continue; - } - try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8), - URLDecoder.decode(familySplit[1], "UTF-8")); - } catch (UnsupportedEncodingException e) { - // will not happen with UTF-8 encoding - throw new AssertionError(e); - } - } - return confValMap; - } - - /** - * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against - * <code>splitPoints</code>. Cleans up the partitions file after job exists. - */ - static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean - writeMultipleTables) - throws IOException { - Configuration conf = job.getConfiguration(); - // create the partitions file - FileSystem fs = FileSystem.get(conf); - String hbaseTmpFsDir = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); - fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); - fs.deleteOnExit(partitionsPath); - - // configure job to use it - job.setPartitionerClass(TotalOrderPartitioner.class); - TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") - @VisibleForTesting - static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) - throws UnsupportedEncodingException { - StringBuilder attributeValue = new StringBuilder(); - int i = 0; - for (TableDescriptor tableDescriptor : allTables) { - if (tableDescriptor == null) { - // could happen with mock table instance - // CODEREVIEW: Can I set an empty string in conf if mock table instance? - return ""; - } - for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { - if (i++ > 0) { - attributeValue.append('&'); - } - attributeValue.append(URLEncoder.encode( - Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), - "UTF-8")); - attributeValue.append('='); - attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); - } - } - // Get rid of the last ampersand - return attributeValue.toString(); - } - - /** - * Serialize column family to compression algorithm map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> - familyDescriptor.getCompressionType().getName(); - - /** - * Serialize column family to block size map to configuration. Invoked while - * configuring the MR job for incremental load. - * - * @param tableDescriptor - * to read the properties from - * @param conf - * to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String - .valueOf(familyDescriptor.getBlocksize()); - - /** - * Serialize column family to bloom type map to configuration. Invoked while - * configuring the MR job for incremental load. - * - * @param tableDescriptor - * to read the properties from - * @param conf - * to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); - } - return bloomType; - }; - - /** - * Serialize column family to data block encoding map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param tableDescriptor - * to read the properties from - * @param conf - * to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { - DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); - if (encoding == null) { - encoding = DataBlockEncoding.NONE; - } - return encoding.toString(); - }; - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java deleted file mode 100644 index 3475a48..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapred.TableOutputFormat; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * This is used to partition the output keys into groups of keys. - * Keys are grouped according to the regions that currently exist - * so that each reducer fills a single region so load is distributed. - * - * <p>This class is not suitable as partitioner creating hfiles - * for incremental bulk loads as region spread will likely change between time of - * hfile creation and load time. See {@link LoadIncrementalHFiles} - * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>. - * - * @param <KEY> The type of the key. - * @param <VALUE> The type of the value. - */ -@InterfaceAudience.Public -public class HRegionPartitioner<KEY, VALUE> -extends Partitioner<ImmutableBytesWritable, VALUE> -implements Configurable { - - private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class); - private Configuration conf = null; - // Connection and locator are not cleaned up; they just die when partitioner is done. - private Connection connection; - private RegionLocator locator; - private byte[][] startKeys; - - /** - * Gets the partition number for a given key (hence record) given the total - * number of partitions i.e. number of reduce-tasks for the job. - * - * <p>Typically a hash function on a all or a subset of the key.</p> - * - * @param key The key to be partitioned. - * @param value The entry value. - * @param numPartitions The total number of partitions. - * @return The partition number for the <code>key</code>. - * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( - * java.lang.Object, java.lang.Object, int) - */ - @Override - public int getPartition(ImmutableBytesWritable key, - VALUE value, int numPartitions) { - byte[] region = null; - // Only one region return 0 - if (this.startKeys.length == 1){ - return 0; - } - try { - // Not sure if this is cached after a split so we could have problems - // here if a region splits while mapping - region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey(); - } catch (IOException e) { - LOG.error(e); - } - for (int i = 0; i < this.startKeys.length; i++){ - if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ - if (i >= numPartitions-1){ - // cover if we have less reduces then regions. - return (Integer.toString(i).hashCode() - & Integer.MAX_VALUE) % numPartitions; - } - return i; - } - } - // if above fails to find start key that match we need to return something - return 0; - } - - /** - * Returns the current configuration. - * - * @return The current configuration. - * @see org.apache.hadoop.conf.Configurable#getConf() - */ - @Override - public Configuration getConf() { - return conf; - } - - /** - * Sets the configuration. This is used to determine the start keys for the - * given table. - * - * @param configuration The configuration to set. - * @see org.apache.hadoop.conf.Configurable#setConf( - * org.apache.hadoop.conf.Configuration) - */ - @Override - public void setConf(Configuration configuration) { - this.conf = HBaseConfiguration.create(configuration); - try { - this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf)); - TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE)); - this.locator = this.connection.getRegionLocator(tableName); - } catch (IOException e) { - LOG.error(e); - } - try { - this.startKeys = this.locator.getStartKeys(); - } catch (IOException e) { - LOG.error(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java deleted file mode 100644 index dfac471..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java +++ /dev/null @@ -1,747 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.MapFile; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering; - -public class HashTable extends Configured implements Tool { - - private static final Log LOG = LogFactory.getLog(HashTable.class); - - private static final int DEFAULT_BATCH_SIZE = 8000; - - private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; - final static String PARTITIONS_FILE_NAME = "partitions"; - final static String MANIFEST_FILE_NAME = "manifest"; - final static String HASH_DATA_DIR = "hashes"; - final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; - private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; - - TableHash tableHash = new TableHash(); - Path destPath; - - public HashTable(Configuration conf) { - super(conf); - } - - public static class TableHash { - - Path hashDir; - - String tableName; - String families = null; - long batchSize = DEFAULT_BATCH_SIZE; - int numHashFiles = 0; - byte[] startRow = HConstants.EMPTY_START_ROW; - byte[] stopRow = HConstants.EMPTY_END_ROW; - int scanBatch = 0; - int versions = -1; - long startTime = 0; - long endTime = 0; - - List<ImmutableBytesWritable> partitions; - - public static TableHash read(Configuration conf, Path hashDir) throws IOException { - TableHash tableHash = new TableHash(); - FileSystem fs = hashDir.getFileSystem(conf); - tableHash.hashDir = hashDir; - tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); - tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); - return tableHash; - } - - void writePropertiesFile(FileSystem fs, Path path) throws IOException { - Properties p = new Properties(); - p.setProperty("table", tableName); - if (families != null) { - p.setProperty("columnFamilies", families); - } - p.setProperty("targetBatchSize", Long.toString(batchSize)); - p.setProperty("numHashFiles", Integer.toString(numHashFiles)); - if (!isTableStartRow(startRow)) { - p.setProperty("startRowHex", Bytes.toHex(startRow)); - } - if (!isTableEndRow(stopRow)) { - p.setProperty("stopRowHex", Bytes.toHex(stopRow)); - } - if (scanBatch > 0) { - p.setProperty("scanBatch", Integer.toString(scanBatch)); - } - if (versions >= 0) { - p.setProperty("versions", Integer.toString(versions)); - } - if (startTime != 0) { - p.setProperty("startTimestamp", Long.toString(startTime)); - } - if (endTime != 0) { - p.setProperty("endTimestamp", Long.toString(endTime)); - } - - try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { - p.store(osw, null); - } - } - - void readPropertiesFile(FileSystem fs, Path path) throws IOException { - Properties p = new Properties(); - try (FSDataInputStream in = fs.open(path)) { - try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { - p.load(isr); - } - } - tableName = p.getProperty("table"); - families = p.getProperty("columnFamilies"); - batchSize = Long.parseLong(p.getProperty("targetBatchSize")); - numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); - - String startRowHex = p.getProperty("startRowHex"); - if (startRowHex != null) { - startRow = Bytes.fromHex(startRowHex); - } - String stopRowHex = p.getProperty("stopRowHex"); - if (stopRowHex != null) { - stopRow = Bytes.fromHex(stopRowHex); - } - - String scanBatchString = p.getProperty("scanBatch"); - if (scanBatchString != null) { - scanBatch = Integer.parseInt(scanBatchString); - } - - String versionString = p.getProperty("versions"); - if (versionString != null) { - versions = Integer.parseInt(versionString); - } - - String startTimeString = p.getProperty("startTimestamp"); - if (startTimeString != null) { - startTime = Long.parseLong(startTimeString); - } - - String endTimeString = p.getProperty("endTimestamp"); - if (endTimeString != null) { - endTime = Long.parseLong(endTimeString); - } - } - - Scan initScan() throws IOException { - Scan scan = new Scan(); - scan.setCacheBlocks(false); - if (startTime != 0 || endTime != 0) { - scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); - } - if (scanBatch > 0) { - scan.setBatch(scanBatch); - } - if (versions >= 0) { - scan.setMaxVersions(versions); - } - if (!isTableStartRow(startRow)) { - scan.setStartRow(startRow); - } - if (!isTableEndRow(stopRow)) { - scan.setStopRow(stopRow); - } - if(families != null) { - for(String fam : families.split(",")) { - scan.addFamily(Bytes.toBytes(fam)); - } - } - return scan; - } - - /** - * Choose partitions between row ranges to hash to a single output file - * Selects region boundaries that fall within the scan range, and groups them - * into the desired number of partitions. - */ - void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { - List<byte[]> startKeys = new ArrayList<>(); - for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { - byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; - byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; - - // if scan begins after this region, or starts before this region, then drop this region - // in other words: - // IF (scan begins before the end of this region - // AND scan ends before the start of this region) - // THEN include this region - if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey) - || Bytes.compareTo(startRow, regionEndKey) < 0) - && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) - || Bytes.compareTo(stopRow, regionStartKey) > 0)) { - startKeys.add(regionStartKey); - } - } - - int numRegions = startKeys.size(); - if (numHashFiles == 0) { - numHashFiles = numRegions / 100; - } - if (numHashFiles == 0) { - numHashFiles = 1; - } - if (numHashFiles > numRegions) { - // can't partition within regions - numHashFiles = numRegions; - } - - // choose a subset of start keys to group regions into ranges - partitions = new ArrayList<>(numHashFiles - 1); - // skip the first start key as it is not a partition between ranges. - for (long i = 1; i < numHashFiles; i++) { - int splitIndex = (int) (numRegions * i / numHashFiles); - partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); - } - } - - void writePartitionFile(Configuration conf, Path path) throws IOException { - FileSystem fs = path.getFileSystem(conf); - @SuppressWarnings("deprecation") - SequenceFile.Writer writer = SequenceFile.createWriter( - fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); - - for (int i = 0; i < partitions.size(); i++) { - writer.append(partitions.get(i), NullWritable.get()); - } - writer.close(); - } - - private void readPartitionFile(FileSystem fs, Configuration conf, Path path) - throws IOException { - @SuppressWarnings("deprecation") - SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); - ImmutableBytesWritable key = new ImmutableBytesWritable(); - partitions = new ArrayList<>(); - while (reader.next(key)) { - partitions.add(new ImmutableBytesWritable(key.copyBytes())); - } - reader.close(); - - if (!Ordering.natural().isOrdered(partitions)) { - throw new IOException("Partitions are not ordered!"); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("tableName=").append(tableName); - if (families != null) { - sb.append(", families=").append(families); - } - sb.append(", batchSize=").append(batchSize); - sb.append(", numHashFiles=").append(numHashFiles); - if (!isTableStartRow(startRow)) { - sb.append(", startRowHex=").append(Bytes.toHex(startRow)); - } - if (!isTableEndRow(stopRow)) { - sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); - } - if (scanBatch >= 0) { - sb.append(", scanBatch=").append(scanBatch); - } - if (versions >= 0) { - sb.append(", versions=").append(versions); - } - if (startTime != 0) { - sb.append("startTime=").append(startTime); - } - if (endTime != 0) { - sb.append("endTime=").append(endTime); - } - return sb.toString(); - } - - static String getDataFileName(int hashFileIndex) { - return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); - } - - /** - * Open a TableHash.Reader starting at the first hash at or after the given key. - * @throws IOException - */ - public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) - throws IOException { - return new Reader(conf, startKey); - } - - public class Reader implements java.io.Closeable { - private final Configuration conf; - - private int hashFileIndex; - private MapFile.Reader mapFileReader; - - private boolean cachedNext; - private ImmutableBytesWritable key; - private ImmutableBytesWritable hash; - - Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { - this.conf = conf; - int partitionIndex = Collections.binarySearch(partitions, startKey); - if (partitionIndex >= 0) { - // if the key is equal to a partition, then go the file after that partition - hashFileIndex = partitionIndex+1; - } else { - // if the key is between partitions, then go to the file between those partitions - hashFileIndex = -1-partitionIndex; - } - openHashFile(); - - // MapFile's don't make it easy to seek() so that the subsequent next() returns - // the desired key/value pair. So we cache it for the first call of next(). - hash = new ImmutableBytesWritable(); - key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); - if (key == null) { - cachedNext = false; - hash = null; - } else { - cachedNext = true; - } - } - - /** - * Read the next key/hash pair. - * Returns true if such a pair exists and false when at the end of the data. - */ - public boolean next() throws IOException { - if (cachedNext) { - cachedNext = false; - return true; - } - key = new ImmutableBytesWritable(); - hash = new ImmutableBytesWritable(); - while (true) { - boolean hasNext = mapFileReader.next(key, hash); - if (hasNext) { - return true; - } - hashFileIndex++; - if (hashFileIndex < TableHash.this.numHashFiles) { - mapFileReader.close(); - openHashFile(); - } else { - key = null; - hash = null; - return false; - } - } - } - - /** - * Get the current key - * @return the current key or null if there is no current key - */ - public ImmutableBytesWritable getCurrentKey() { - return key; - } - - /** - * Get the current hash - * @return the current hash or null if there is no current hash - */ - public ImmutableBytesWritable getCurrentHash() { - return hash; - } - - private void openHashFile() throws IOException { - if (mapFileReader != null) { - mapFileReader.close(); - } - Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); - Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); - mapFileReader = new MapFile.Reader(dataFile, conf); - } - - @Override - public void close() throws IOException { - mapFileReader.close(); - } - } - } - - static boolean isTableStartRow(byte[] row) { - return Bytes.equals(HConstants.EMPTY_START_ROW, row); - } - - static boolean isTableEndRow(byte[] row) { - return Bytes.equals(HConstants.EMPTY_END_ROW, row); - } - - public Job createSubmittableJob(String[] args) throws IOException { - Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); - generatePartitions(partitionsPath); - - Job job = Job.getInstance(getConf(), - getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); - Configuration jobConf = job.getConfiguration(); - jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); - job.setJarByClass(HashTable.class); - - TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), - HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - - // use a TotalOrderPartitioner and reducers to group region output into hash files - job.setPartitionerClass(TotalOrderPartitioner.class); - TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); - job.setReducerClass(Reducer.class); // identity reducer - job.setNumReduceTasks(tableHash.numHashFiles); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(ImmutableBytesWritable.class); - job.setOutputFormatClass(MapFileOutputFormat.class); - FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); - - return job; - } - - private void generatePartitions(Path partitionsPath) throws IOException { - Connection connection = ConnectionFactory.createConnection(getConf()); - Pair<byte[][], byte[][]> regionKeys - = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); - connection.close(); - - tableHash.selectPartitions(regionKeys); - LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); - - tableHash.writePartitionFile(getConf(), partitionsPath); - } - - static class ResultHasher { - private MessageDigest digest; - - private boolean batchStarted = false; - private ImmutableBytesWritable batchStartKey; - private ImmutableBytesWritable batchHash; - private long batchSize = 0; - - - public ResultHasher() { - try { - digest = MessageDigest.getInstance("MD5"); - } catch (NoSuchAlgorithmException e) { - Throwables.propagate(e); - } - } - - public void startBatch(ImmutableBytesWritable row) { - if (batchStarted) { - throw new RuntimeException("Cannot start new batch without finishing existing one."); - } - batchStarted = true; - batchSize = 0; - batchStartKey = row; - batchHash = null; - } - - public void hashResult(Result result) { - if (!batchStarted) { - throw new RuntimeException("Cannot add to batch that has not been started."); - } - for (Cell cell : result.rawCells()) { - int rowLength = cell.getRowLength(); - int familyLength = cell.getFamilyLength(); - int qualifierLength = cell.getQualifierLength(); - int valueLength = cell.getValueLength(); - digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); - digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); - digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); - long ts = cell.getTimestamp(); - for (int i = 8; i > 0; i--) { - digest.update((byte) ts); - ts >>>= 8; - } - digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); - - batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; - } - } - - public void finishBatch() { - if (!batchStarted) { - throw new RuntimeException("Cannot finish batch that has not started."); - } - batchStarted = false; - batchHash = new ImmutableBytesWritable(digest.digest()); - } - - public boolean isBatchStarted() { - return batchStarted; - } - - public ImmutableBytesWritable getBatchStartKey() { - return batchStartKey; - } - - public ImmutableBytesWritable getBatchHash() { - return batchHash; - } - - public long getBatchSize() { - return batchSize; - } - } - - public static class HashMapper - extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { - - private ResultHasher hasher; - private long targetBatchSize; - - private ImmutableBytesWritable currentRow; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - targetBatchSize = context.getConfiguration() - .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); - hasher = new ResultHasher(); - - TableSplit split = (TableSplit) context.getInputSplit(); - hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); - } - - @Override - protected void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - - if (currentRow == null || !currentRow.equals(key)) { - currentRow = new ImmutableBytesWritable(key); // not immutable - - if (hasher.getBatchSize() >= targetBatchSize) { - hasher.finishBatch(); - context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); - hasher.startBatch(currentRow); - } - } - - hasher.hashResult(value); - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - hasher.finishBatch(); - context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); - } - } - - private void writeTempManifestFile() throws IOException { - Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); - FileSystem fs = tempManifestPath.getFileSystem(getConf()); - tableHash.writePropertiesFile(fs, tempManifestPath); - } - - private void completeManifest() throws IOException { - Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); - Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); - FileSystem fs = tempManifestPath.getFileSystem(getConf()); - fs.rename(tempManifestPath, manifestPath); - } - - private static final int NUM_ARGS = 2; - private static void printUsage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - System.err.println(); - } - System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); - System.err.println(); - System.err.println("Options:"); - System.err.println(" batchsize the target amount of bytes to hash in each batch"); - System.err.println(" rows are added to the batch until this size is reached"); - System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); - System.err.println(" numhashfiles the number of hash files to create"); - System.err.println(" if set to fewer than number of regions then"); - System.err.println(" the job will create this number of reducers"); - System.err.println(" (defaults to 1/100 of regions -- at least 1)"); - System.err.println(" startrow the start row"); - System.err.println(" stoprow the stop row"); - System.err.println(" starttime beginning of the time range (unixtime in millis)"); - System.err.println(" without endtime means from starttime to forever"); - System.err.println(" endtime end of the time range. Ignored if no starttime specified."); - System.err.println(" scanbatch scanner batch size to support intra row scans"); - System.err.println(" versions number of cell versions to include"); - System.err.println(" families comma-separated list of families to include"); - System.err.println(); - System.err.println("Args:"); - System.err.println(" tablename Name of the table to hash"); - System.err.println(" outputpath Filesystem path to put the output data"); - System.err.println(); - System.err.println("Examples:"); - System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); - System.err.println(" $ hbase " + - "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" - + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" - + " TestTable /hashes/testTable"); - } - - private boolean doCommandLine(final String[] args) { - if (args.length < NUM_ARGS) { - printUsage(null); - return false; - } - try { - - tableHash.tableName = args[args.length-2]; - destPath = new Path(args[args.length-1]); - - for (int i = 0; i < args.length - NUM_ARGS; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(null); - return false; - } - - final String batchSizeArgKey = "--batchsize="; - if (cmd.startsWith(batchSizeArgKey)) { - tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); - continue; - } - - final String numHashFilesArgKey = "--numhashfiles="; - if (cmd.startsWith(numHashFilesArgKey)) { - tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); - continue; - } - - final String startRowArgKey = "--startrow="; - if (cmd.startsWith(startRowArgKey)) { - tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); - continue; - } - - final String stopRowArgKey = "--stoprow="; - if (cmd.startsWith(stopRowArgKey)) { - tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); - continue; - } - - final String startTimeArgKey = "--starttime="; - if (cmd.startsWith(startTimeArgKey)) { - tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); - continue; - } - - final String endTimeArgKey = "--endtime="; - if (cmd.startsWith(endTimeArgKey)) { - tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); - continue; - } - - final String scanBatchArgKey = "--scanbatch="; - if (cmd.startsWith(scanBatchArgKey)) { - tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); - continue; - } - - final String versionsArgKey = "--versions="; - if (cmd.startsWith(versionsArgKey)) { - tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); - continue; - } - - final String familiesArgKey = "--families="; - if (cmd.startsWith(familiesArgKey)) { - tableHash.families = cmd.substring(familiesArgKey.length()); - continue; - } - - printUsage("Invalid argument '" + cmd + "'"); - return false; - } - if ((tableHash.startTime != 0 || tableHash.endTime != 0) - && (tableHash.startTime >= tableHash.endTime)) { - printUsage("Invalid time range filter: starttime=" - + tableHash.startTime + " >= endtime=" + tableHash.endTime); - return false; - } - - } catch (Exception e) { - e.printStackTrace(); - printUsage("Can't start because " + e.getMessage()); - return false; - } - return true; - } - - /** - * Main entry point. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); - if (!doCommandLine(otherArgs)) { - return 1; - } - - Job job = createSubmittableJob(otherArgs); - writeTempManifestFile(); - if (!job.waitForCompletion(true)) { - LOG.info("Map-reduce job failed!"); - return 1; - } - completeManifest(); - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java deleted file mode 100644 index 7103ef8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.Job; - -/** - * Pass the given key and record as-is to the reduce phase. - */ -@InterfaceAudience.Public -public class IdentityTableMapper -extends TableMapper<ImmutableBytesWritable, Result> { - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name. - * @param scan The scan with the columns to scan. - * @param mapper The mapper class. - * @param job The job configuration. - * @throws IOException When setting up the job fails. - */ - @SuppressWarnings("rawtypes") - public static void initJob(String table, Scan scan, - Class<? extends TableMapper> mapper, Job job) throws IOException { - TableMapReduceUtil.initTableMapperJob(table, scan, mapper, - ImmutableBytesWritable.class, Result.class, job); - } - - /** - * Pass the key, value to reduce. - * - * @param key The current key. - * @param value The current value. - * @param context The current context. - * @throws IOException When writing the record fails. - * @throws InterruptedException When the job is aborted. - */ - public void map(ImmutableBytesWritable key, Result value, Context context) - throws IOException, InterruptedException { - context.write(key, value); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java deleted file mode 100644 index 5289f46..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.io.Writable; - -/** - * Convenience class that simply writes all values (which must be - * {@link org.apache.hadoop.hbase.client.Put Put} or - * {@link org.apache.hadoop.hbase.client.Delete Delete} instances) - * passed to it out to the configured HBase table. This works in combination - * with {@link TableOutputFormat} which actually does the writing to HBase.<p> - * - * Keys are passed along but ignored in TableOutputFormat. However, they can - * be used to control how your values will be divided up amongst the specified - * number of reducers. <p> - * - * You can also use the {@link TableMapReduceUtil} class to set up the two - * classes in one step: - * <blockquote><code> - * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job); - * </code></blockquote> - * This will also set the proper {@link TableOutputFormat} which is given the - * <code>table</code> parameter. The - * {@link org.apache.hadoop.hbase.client.Put Put} or - * {@link org.apache.hadoop.hbase.client.Delete Delete} define the - * row and columns implicitly. - */ -@InterfaceAudience.Public -public class IdentityTableReducer -extends TableReducer<Writable, Mutation, Writable> { - - @SuppressWarnings("unused") - private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class); - - /** - * Writes each given record, consisting of the row key and the given values, - * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}. - * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put} - * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs. - * - * @param key The current row key. - * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or - * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given - * row. - * @param context The context of the reduce. - * @throws IOException When writing the record fails. - * @throws InterruptedException When the job gets interrupted. - */ - @Override - public void reduce(Writable key, Iterable<Mutation> values, Context context) - throws IOException, InterruptedException { - for(Mutation putOrDelete : values) { - context.write(key, putOrDelete); - } - } -}