http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java new file mode 100644 index 0000000..ff458ff --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -0,0 +1,1027 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.TokenUtil; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.StringUtils; + +import com.codahale.metrics.MetricRegistry; + +/** + * Utility for {@link TableMapper} and {@link TableReducer} + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +@InterfaceAudience.Public +public class TableMapReduceUtil { + private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class); + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, + job, true); + } + + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(TableName table, + Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, + Job job) throws IOException { + initTableMapperJob(table.getNameAsString(), + scan, + mapper, + outputKeyClass, + outputValueClass, + job, + true); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, + job, true); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true, inputFormatClass); + } + + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job + * @param inputFormatClass the input format + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars, boolean initCredentials, + Class<? extends InputFormat> inputFormatClass) + throws IOException { + job.setInputFormatClass(inputFormatClass); + if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + if (Put.class.equals(outputValueClass)) { + job.setCombinerClass(PutCombiner.class); + } + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + conf.set(TableInputFormat.INPUT_TABLE, table); + conf.set(TableInputFormat.SCAN, convertScanToString(scan)); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + if (addDependencyJars) { + addDependencyJars(job); + } + if (initCredentials) { + initCredentials(job); + } + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param inputFormatClass The class of the input format + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, inputFormatClass); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table Binary representation of the table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(byte[] table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars) + throws IOException { + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, TableInputFormat.class); + } + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(String table, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars) + throws IOException { + initTableMapperJob(table, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, TableInputFormat.class); + } + + /** + * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on + * direct memory will likely cause the map tasks to OOM when opening the region. This + * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user + * wants to override this behavior in their job. + */ + public static void resetCacheConfig(Configuration conf) { + conf.setFloat( + HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); + conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); + conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); + } + + /** + * Sets up the job for reading from one or more table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, + Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, + Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); + + job.setInputFormatClass(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + + if (addDependencyJars) { + addDependencyJars(job); + addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); + } + + resetCacheConfig(job.getConfiguration()); + } + + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param scan The scan instance with the columns, time range etc. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * + * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restore directory can be deleted. + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars, Path tmpRestoreDir) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); + initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, + outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); + resetCacheConfig(job.getConfiguration()); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(List<Scan> scans, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job) throws IOException { + initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, + true); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(List<Scan> scans, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars) throws IOException { + initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, true); + } + + /** + * Use this before submitting a Multi TableMap job. It will appropriately set + * up the job. + * + * @param scans The list of {@link Scan} objects to read from. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is carrying + * all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the + * configured job classes via the distributed cache (tmpjars). + * @param initCredentials whether to initialize hbase auth credentials for the job + * @throws IOException When setting up the details fails. + */ + public static void initTableMapperJob(List<Scan> scans, + Class<? extends TableMapper> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, Job job, + boolean addDependencyJars, + boolean initCredentials) throws IOException { + job.setInputFormatClass(MultiTableInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + List<String> scanStrings = new ArrayList<>(); + + for (Scan scan : scans) { + scanStrings.add(convertScanToString(scan)); + } + job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, + scanStrings.toArray(new String[scanStrings.size()])); + + if (addDependencyJars) { + addDependencyJars(job); + } + + if (initCredentials) { + initCredentials(job); + } + } + + public static void initCredentials(Job job) throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHadoopSecurityEnabled()) { + // propagate delegation related props from launcher job to MR job + if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { + job.getConfiguration().set("mapreduce.job.credentials.binary", + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + } + + if (userProvider.isHBaseSecurityEnabled()) { + try { + // init credentials for remote cluster + String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); + User user = userProvider.getCurrent(); + if (quorumAddress != null) { + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); + Connection peerConn = ConnectionFactory.createConnection(peerConf); + try { + TokenUtil.addTokenForJob(peerConn, user, job); + } finally { + peerConn.close(); + } + } + + Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); + try { + TokenUtil.addTokenForJob(conn, user, job); + } finally { + conn.close(); + } + } catch (InterruptedException ie) { + LOG.info("Interrupted obtaining user authentication token"); + Thread.currentThread().interrupt(); + } + } + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * The quorumAddress is the key to the ZK ensemble, which contains: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port and + * zookeeper.znode.parent + * + * @param job The job that requires the permission. + * @param quorumAddress string that contains the 3 required configuratins + * @throws IOException When the authentication token cannot be obtained. + * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. + */ + @Deprecated + public static void initCredentialsForCluster(Job job, String quorumAddress) + throws IOException { + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress); + initCredentialsForCluster(job, peerConf); + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * @param job The job that requires the permission. + * @param conf The configuration to use in connecting to the peer cluster + * @throws IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, Configuration conf) + throws IOException { + UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); + if (userProvider.isHBaseSecurityEnabled()) { + try { + Connection peerConn = ConnectionFactory.createConnection(conf); + try { + TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); + } finally { + peerConn.close(); + } + } catch (InterruptedException e) { + LOG.info("Interrupted obtaining user authentication token"); + Thread.interrupted(); + } + } + } + + /** + * Writes the given scan into a Base64 encoded string. + * + * @param scan The scan to write out. + * @return The scan saved in a Base64 encoded string. + * @throws IOException When writing the scan fails. + */ + public static String convertScanToString(Scan scan) throws IOException { + ClientProtos.Scan proto = ProtobufUtil.toScan(scan); + return Base64.encodeBytes(proto.toByteArray()); + } + + /** + * Converts the given Base64 string back into a Scan instance. + * + * @param base64 The scan details. + * @return The newly created Scan instance. + * @throws IOException When reading the scan instance fails. + */ + public static Scan convertStringToScan(String base64) throws IOException { + byte [] decoded = Base64.decode(base64); + return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class<? extends TableReducer> reducer, Job job) + throws IOException { + initTableReducerJob(table, reducer, job, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. + * @param partitioner Partitioner to use. Pass <code>null</code> to use + * default partitioner. + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class<? extends TableReducer> reducer, Job job, + Class partitioner) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, null, null, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param partitioner Partitioner to use. Pass <code>null</code> to use + * default partitioner. + * @param quorumAddress Distant cluster to write to; default is null for + * output to the cluster that is designated in <code>hbase-site.xml</code>. + * Set this String to the zookeeper ensemble of an alternate remote cluster + * when you would have the reduce write a cluster that is other than the + * default; e.g. copying tables between clusters, the source would be + * designated by <code>hbase-site.xml</code> and this param would have the + * ensemble address of the remote cluster. The format to pass is particular. + * Pass <code> <hbase.zookeeper.quorum>:< + * hbase.zookeeper.client.port>:<zookeeper.znode.parent> + * </code> such as <code>server,server2,server3:2181:/hbase</code>. + * @param serverClass redefined hbase.regionserver.class + * @param serverImpl redefined hbase.regionserver.impl + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class<? extends TableReducer> reducer, Job job, + Class partitioner, String quorumAddress, String serverClass, + String serverImpl) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, quorumAddress, + serverClass, serverImpl, true); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param partitioner Partitioner to use. Pass <code>null</code> to use + * default partitioner. + * @param quorumAddress Distant cluster to write to; default is null for + * output to the cluster that is designated in <code>hbase-site.xml</code>. + * Set this String to the zookeeper ensemble of an alternate remote cluster + * when you would have the reduce write a cluster that is other than the + * default; e.g. copying tables between clusters, the source would be + * designated by <code>hbase-site.xml</code> and this param would have the + * ensemble address of the remote cluster. The format to pass is particular. + * Pass <code> <hbase.zookeeper.quorum>:< + * hbase.zookeeper.client.port>:<zookeeper.znode.parent> + * </code> such as <code>server,server2,server3:2181:/hbase</code>. + * @param serverClass redefined hbase.regionserver.class + * @param serverImpl redefined hbase.regionserver.impl + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, + Class<? extends TableReducer> reducer, Job job, + Class partitioner, String quorumAddress, String serverClass, + String serverImpl, boolean addDependencyJars) throws IOException { + + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + job.setOutputFormatClass(TableOutputFormat.class); + if (reducer != null) job.setReducerClass(reducer); + conf.set(TableOutputFormat.OUTPUT_TABLE, table); + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); + // If passed a quorum/ensemble address, pass it on to TableOutputFormat. + if (quorumAddress != null) { + // Calling this will validate the format + ZKConfig.validateClusterKey(quorumAddress); + conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); + } + if (serverClass != null && serverImpl != null) { + conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); + conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); + } + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Writable.class); + if (partitioner == HRegionPartitioner.class) { + job.setPartitionerClass(HRegionPartitioner.class); + int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table)); + if (job.getNumReduceTasks() > regions) { + job.setNumReduceTasks(regions); + } + } else if (partitioner != null) { + job.setPartitionerClass(partitioner); + } + + if (addDependencyJars) { + addDependencyJars(job); + } + + initCredentials(job); + } + + /** + * Ensures that the given number of reduce tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void limitNumReduceTasks(String table, Job job) + throws IOException { + int regions = + MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table)); + if (job.getNumReduceTasks() > regions) + job.setNumReduceTasks(regions); + } + + /** + * Sets the number of reduce tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void setNumReduceTasks(String table, Job job) + throws IOException { + job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), + TableName.valueOf(table))); + } + + /** + * Sets the number of rows to return and cache with each scanner iteration. + * Higher caching values will enable faster mapreduce jobs at the expense of + * requiring more heap to contain the cached rows. + * + * @param job The current job to adjust. + * @param batchSize The number of rows to return in batch with each scanner + * iteration. + */ + public static void setScannerCaching(Job job, int batchSize) { + job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); + } + + /** + * Add HBase and its dependencies (only) to the job configuration. + * <p> + * This is intended as a low-level API, facilitating code reuse between this + * class and its mapred counterpart. It also of use to external tools that + * need to build a MapReduce job that interacts with HBase but want + * fine-grained control over the jars shipped to the cluster. + * </p> + * @param conf The Configuration object to extend with dependencies. + * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil + * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> + */ + public static void addHBaseDependencyJars(Configuration conf) throws IOException { + + // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar + // dependencies, MR jobs that write encoded hfiles will fail. + // We used reflection here so to prevent a circular module dependency. + // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree. + Class prefixTreeCodecClass = null; + try { + prefixTreeCodecClass = + Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"); + } catch (ClassNotFoundException e) { + // this will show up in unit tests but should not show in real deployments + LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." + + " Continuing without it."); + } + + addDependencyJarsForClasses(conf, + // explicitly pull a class from each module + org.apache.hadoop.hbase.HConstants.class, // hbase-common + org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol + org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded + org.apache.hadoop.hbase.client.Put.class, // hbase-client + org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat + org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat + org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server + org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics + org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api + prefixTreeCodecClass, // hbase-prefix-tree (if null will be skipped) + // pull necessary dependencies + org.apache.zookeeper.ZooKeeper.class, + org.apache.hadoop.hbase.shaded.io.netty.channel.Channel.class, + com.google.protobuf.Message.class, + org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists.class, + org.apache.htrace.Trace.class, + com.codahale.metrics.MetricRegistry.class); + } + + /** + * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. + * Also exposed to shell scripts via `bin/hbase mapredcp`. + */ + public static String buildDependencyClasspath(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("Must provide a configuration object."); + } + Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); + if (paths.isEmpty()) { + throw new IllegalArgumentException("Configuration contains no tmpjars."); + } + StringBuilder sb = new StringBuilder(); + for (String s : paths) { + // entries can take the form 'file:/path/to/file.jar'. + int idx = s.indexOf(":"); + if (idx != -1) s = s.substring(idx + 1); + if (sb.length() > 0) sb.append(File.pathSeparator); + sb.append(s); + } + return sb.toString(); + } + + /** + * Add the HBase dependency jars as well as jars for any of the configured + * job classes to the job configuration, so that JobClient will ship them + * to the cluster and add them to the DistributedCache. + */ + public static void addDependencyJars(Job job) throws IOException { + addHBaseDependencyJars(job.getConfiguration()); + try { + addDependencyJarsForClasses(job.getConfiguration(), + // when making changes here, consider also mapred.TableMapReduceUtil + // pull job classes + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getInputFormatClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getOutputFormatClass(), + job.getPartitionerClass(), + job.getCombinerClass()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + /** + * Add the jars containing the given classes to the job's configuration + * such that JobClient will ship them to the cluster and add them to + * the DistributedCache. + * @deprecated rely on {@link #addDependencyJars(Job)} instead. + */ + @Deprecated + public static void addDependencyJars(Configuration conf, + Class<?>... classes) throws IOException { + LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it" + + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " + + "instead. See HBASE-8386 for more details."); + addDependencyJarsForClasses(conf, classes); + } + + /** + * Add the jars containing the given classes to the job's configuration + * such that JobClient will ship them to the cluster and add them to + * the DistributedCache. + * + * N.B. that this method at most adds one jar per class given. If there is more than one + * jar available containing a class with the same name as a given class, we don't define + * which of those jars might be chosen. + * + * @param conf The Hadoop Configuration to modify + * @param classes will add just those dependencies needed to find the given classes + * @throws IOException if an underlying library call fails. + */ + @InterfaceAudience.Private + public static void addDependencyJarsForClasses(Configuration conf, + Class<?>... classes) throws IOException { + + FileSystem localFs = FileSystem.getLocal(conf); + Set<String> jars = new HashSet<>(); + // Add jars that are already in the tmpjars variable + jars.addAll(conf.getStringCollection("tmpjars")); + + // add jars as we find them to a map of contents jar name so that we can avoid + // creating new jars for classes that have already been packaged. + Map<String, String> packagedClasses = new HashMap<>(); + + // Add jars containing the specified classes + for (Class<?> clazz : classes) { + if (clazz == null) continue; + + Path path = findOrCreateJar(clazz, localFs, packagedClasses); + if (path == null) { + LOG.warn("Could not find jar for class " + clazz + + " in order to ship it to the cluster."); + continue; + } + if (!localFs.exists(path)) { + LOG.warn("Could not validate jar file " + path + " for class " + + clazz); + continue; + } + jars.add(path.toString()); + } + if (jars.isEmpty()) return; + + conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + } + + /** + * Finds the Jar for a class or creates it if it doesn't exist. If the class is in + * a directory in the classpath, it creates a Jar on the fly with the + * contents of the directory and returns the path to that Jar. If a Jar is + * created, it is created in the system temporary directory. Otherwise, + * returns an existing jar that contains a class of the same name. Maintains + * a mapping from jar contents to the tmp jar created. + * @param my_class the class to find. + * @param fs the FileSystem with which to qualify the returned path. + * @param packagedClasses a map of class name to path. + * @return a jar file that contains the class. + * @throws IOException + */ + private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, + Map<String, String> packagedClasses) + throws IOException { + // attempt to locate an existing jar for the class. + String jar = findContainingJar(my_class, packagedClasses); + if (null == jar || jar.isEmpty()) { + jar = getJar(my_class); + updateMap(jar, packagedClasses); + } + + if (null == jar || jar.isEmpty()) { + return null; + } + + LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); + return new Path(jar).makeQualified(fs); + } + + /** + * Add entries to <code>packagedClasses</code> corresponding to class files + * contained in <code>jar</code>. + * @param jar The jar who's content to list. + * @param packagedClasses map[class -> jar] + */ + private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException { + if (null == jar || jar.isEmpty()) { + return; + } + ZipFile zip = null; + try { + zip = new ZipFile(jar); + for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { + ZipEntry entry = iter.nextElement(); + if (entry.getName().endsWith("class")) { + packagedClasses.put(entry.getName(), jar); + } + } + } finally { + if (null != zip) zip.close(); + } + } + + /** + * Find a jar that contains a class of the same name, if any. It will return + * a jar file, even if that is not the first thing on the class path that + * has a class with the same name. Looks first on the classpath and then in + * the <code>packagedClasses</code> map. + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + * @throws IOException + */ + private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) + throws IOException { + ClassLoader loader = my_class.getClassLoader(); + + String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; + + if (loader != null) { + // first search the classpath + for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = itr.nextElement(); + if ("jar".equals(url.getProtocol())) { + String toReturn = url.getPath(); + if (toReturn.startsWith("file:")) { + toReturn = toReturn.substring("file:".length()); + } + // URLDecoder is a misnamed class, since it actually decodes + // x-www-form-urlencoded MIME type rather than actual + // URL encoding (which the file path has). Therefore it would + // decode +s to ' 's which is incorrect (spaces are actually + // either unencoded or encoded as "%20"). Replace +s first, so + // that they are kept sacred during the decoding process. + toReturn = toReturn.replaceAll("\\+", "%2B"); + toReturn = URLDecoder.decode(toReturn, "UTF-8"); + return toReturn.replaceAll("!.*$", ""); + } + } + } + + // now look in any jars we've packaged using JarFinder. Returns null when + // no jar is found. + return packagedClasses.get(class_file); + } + + /** + * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job + * configuration contexts (HBASE-8140) and also for testing on MRv2. + * check if we have HADOOP-9426. + * @param my_class the class to find. + * @return a jar file that contains the class, or null. + */ + private static String getJar(Class<?> my_class) { + String ret = null; + try { + ret = JarFinder.getJar(my_class); + } catch (Exception e) { + // toss all other exceptions, related to reflection failure + throw new RuntimeException("getJar invocation failed.", e); + } + + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java new file mode 100644 index 0000000..9a7dcb7 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Mapper; + +/** + * Extends the base <code>Mapper</code> class to add the required input key + * and value classes. + * + * @param <KEYOUT> The type of the key. + * @param <VALUEOUT> The type of the value. + * @see org.apache.hadoop.mapreduce.Mapper + */ +@InterfaceAudience.Public +public abstract class TableMapper<KEYOUT, VALUEOUT> +extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java new file mode 100644 index 0000000..749fd85 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Small committer class that does not do anything. + */ +@InterfaceAudience.Public +public class TableOutputCommitter extends OutputCommitter { + + @Override + public void abortTask(TaskAttemptContext arg0) throws IOException { + } + + @Override + public void cleanupJob(JobContext arg0) throws IOException { + } + + @Override + public void commitTask(TaskAttemptContext arg0) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { + return false; + } + + @Override + public void setupJob(JobContext arg0) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext arg0) throws IOException { + } + + public boolean isRecoverySupported() { + return true; + } + + public void recoverTask(TaskAttemptContext taskContext) + throws IOException + { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java new file mode 100644 index 0000000..604ef00 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -0,0 +1,239 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored + * while the output value <u>must</u> be either a {@link Put} or a + * {@link Delete} instance. + */ +@InterfaceAudience.Public +public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> +implements Configurable { + + private static final Log LOG = LogFactory.getLog(TableOutputFormat.class); + + /** Job parameter that specifies the output table. */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + + /** + * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. + * For keys matching this prefix, the prefix is stripped, and the value is set in the + * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" + * would be set in the configuration as "key1 = value1". Use this to set properties + * which should only be applied to the {@code TableOutputFormat} configuration and not the + * input configuration. + */ + public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; + + /** + * Optional job parameter to specify a peer cluster. + * Used specifying remote cluster when copying between hbase clusters (the + * source is picked up from <code>hbase-site.xml</code>). + * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) + */ + public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; + + /** Optional job parameter to specify peer cluster's ZK client port */ + public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; + + /** Optional specification of the rs class name of the peer cluster */ + public static final String + REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; + /** Optional specification of the rs impl name of the peer cluster */ + public static final String + REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Writes the reducer output to an HBase table. + */ + protected class TableRecordWriter + extends RecordWriter<KEY, Mutation> { + + private Connection connection; + private BufferedMutator mutator; + + /** + * @throws IOException + * + */ + public TableRecordWriter() throws IOException { + String tableName = conf.get(OUTPUT_TABLE); + this.connection = ConnectionFactory.createConnection(conf); + this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); + LOG.info("Created table instance for " + tableName); + } + /** + * Closes the writer, in this case flush table commits. + * + * @param context The context. + * @throws IOException When closing the writer fails. + * @see RecordWriter#close(TaskAttemptContext) + */ + @Override + public void close(TaskAttemptContext context) throws IOException { + try { + if (mutator != null) { + mutator.close(); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /** + * Writes a key/value pair into the table. + * + * @param key The key. + * @param value The value. + * @throws IOException When writing fails. + * @see RecordWriter#write(Object, Object) + */ + @Override + public void write(KEY key, Mutation value) + throws IOException { + if (!(value instanceof Put) && !(value instanceof Delete)) { + throw new IOException("Pass a Delete or a Put"); + } + mutator.mutate(value); + } + } + + /** + * Creates a new record writer. + * + * Be aware that the baseline javadoc gives the impression that there is a single + * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new + * RecordWriter per call of this method. You must close the returned RecordWriter when done. + * Failure to do so will drop writes. + * + * @param context The current task context. + * @return The newly created writer instance. + * @throws IOException When creating the writer fails. + * @throws InterruptedException When the jobs is cancelled. + */ + @Override + public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TableRecordWriter(); + } + + /** + * Checks if the output table exists and is enabled. + * + * @param context The current context. + * @throws IOException When the check fails. + * @throws InterruptedException When the job is aborted. + * @see OutputFormat#checkOutputSpecs(JobContext) + */ + @Override + public void checkOutputSpecs(JobContext context) throws IOException, + InterruptedException { + + try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) { + TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE)); + if (!admin.tableExists(tableName)) { + throw new TableNotFoundException("Can't write, table does not exist:" + + tableName.getNameAsString()); + } + + if (!admin.isTableEnabled(tableName)) { + throw new TableNotEnabledException("Can't write, table is not enabled: " + + tableName.getNameAsString()); + } + } + } + + /** + * Returns the output committer. + * + * @param context The current context. + * @return The committer. + * @throws IOException When creating the committer fails. + * @throws InterruptedException When the job is aborted. + * @see OutputFormat#getOutputCommitter(TaskAttemptContext) + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TableOutputCommitter(); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration otherConf) { + String tableName = otherConf.get(OUTPUT_TABLE); + if(tableName == null || tableName.length() <= 0) { + throw new IllegalArgumentException("Must specify table name"); + } + + String address = otherConf.get(QUORUM_ADDRESS); + int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); + String serverClass = otherConf.get(REGION_SERVER_CLASS); + String serverImpl = otherConf.get(REGION_SERVER_IMPL); + + try { + this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); + + if (serverClass != null) { + this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); + } + if (zkClientPort != 0) { + this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); + } + } catch(IOException e) { + LOG.error(e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java new file mode 100644 index 0000000..f66520b --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) + * pairs. + */ +@InterfaceAudience.Public +public class TableRecordReader +extends RecordReader<ImmutableBytesWritable, Result> { + + private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow The first row to start at. + * @throws IOException When restarting fails. + */ + public void restart(byte[] firstRow) throws IOException { + this.recordReaderImpl.restart(firstRow); + } + + /** + * @param table the {@link Table} to scan. + */ + public void setTable(Table table) { + this.recordReaderImpl.setHTable(table); + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.recordReaderImpl.setScan(scan); + } + + /** + * Closes the split. + * + * @see org.apache.hadoop.mapreduce.RecordReader#close() + */ + @Override + public void close() { + this.recordReaderImpl.close(); + } + + /** + * Returns the current key. + * + * @return The current key. + * @throws IOException + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() + */ + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return this.recordReaderImpl.getCurrentKey(); + } + + /** + * Returns the current value. + * + * @return The current value. + * @throws IOException When the value is faulty. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() + */ + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return this.recordReaderImpl.getCurrentValue(); + } + + /** + * Initializes the reader. + * + * @param inputsplit The split to work with. + * @param context The current task context. + * @throws IOException When setting up the reader fails. + * @throws InterruptedException When the job is aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#initialize( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public void initialize(InputSplit inputsplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + this.recordReaderImpl.initialize(inputsplit, context); + } + + /** + * Positions the record reader to the next record. + * + * @return <code>true</code> if there was another record. + * @throws IOException When reading the record failed. + * @throws InterruptedException When the job was aborted. + * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() + */ + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return this.recordReaderImpl.nextKeyValue(); + } + + /** + * The current progress of the record reader through its data. + * + * @return A number between 0.0 and 1.0, the fraction of the data read. + * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() + */ + @Override + public float getProgress() { + return this.recordReaderImpl.getProgress(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java new file mode 100644 index 0000000..5f85537 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) + * pairs. + */ +@InterfaceAudience.Public +public class TableRecordReaderImpl { + public static final String LOG_PER_ROW_COUNT + = "hbase.mapreduce.log.scanner.rowcount"; + + private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); + + // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase + @VisibleForTesting + static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; + private ResultScanner scanner = null; + private Scan scan = null; + private Scan currentScan = null; + private Table htable = null; + private byte[] lastSuccessfulRow = null; + private ImmutableBytesWritable key = null; + private Result value = null; + private TaskAttemptContext context = null; + private Method getCounter = null; + private long numRestarts = 0; + private long numStale = 0; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow The first row to start at. + * @throws IOException When restarting fails. + */ + public void restart(byte[] firstRow) throws IOException { + currentScan = new Scan(scan); + currentScan.withStartRow(firstRow); + currentScan.setScanMetricsEnabled(true); + if (this.scanner != null) { + if (logScannerActivity) { + LOG.info("Closing the previously opened scanner object."); + } + this.scanner.close(); + } + this.scanner = this.htable.getScanner(currentScan); + if (logScannerActivity) { + LOG.info("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + /** + * In new mapreduce APIs, TaskAttemptContext has two getCounter methods + * Check if getCounter(String, String) method is available. + * @return The getCounter method or null if not available. + * @throws IOException + */ + protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) + throws IOException { + Method m = null; + try { + m = context.getClass().getMethod("getCounter", + new Class [] {String.class, String.class}); + } catch (SecurityException e) { + throw new IOException("Failed test for getCounter", e); + } catch (NoSuchMethodException e) { + // Ignore + } + return m; + } + + /** + * Sets the HBase table. + * + * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. + */ + public void setHTable(Table htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * Sets the scan defining the actual details like columns etc. + * + * @param scan The scan to set. + */ + public void setScan(Scan scan) { + this.scan = scan; + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + * @throws InterruptedException + */ + public void initialize(InputSplit inputsplit, + TaskAttemptContext context) throws IOException, + InterruptedException { + if (context != null) { + this.context = context; + getCounter = retrieveGetCounterWithStringsParams(context); + } + restart(scan.getStartRow()); + } + + /** + * Closes the split. + * + * + */ + public void close() { + if (this.scanner != null) { + this.scanner.close(); + } + try { + this.htable.close(); + } catch (IOException ioe) { + LOG.warn("Error closing table", ioe); + } + } + + /** + * Returns the current key. + * + * @return The current key. + * @throws IOException + * @throws InterruptedException When the job is aborted. + */ + public ImmutableBytesWritable getCurrentKey() throws IOException, + InterruptedException { + return key; + } + + /** + * Returns the current value. + * + * @return The current value. + * @throws IOException When the value is faulty. + * @throws InterruptedException When the job is aborted. + */ + public Result getCurrentValue() throws IOException, InterruptedException { + return value; + } + + + /** + * Positions the record reader to the next record. + * + * @return <code>true</code> if there was another record. + * @throws IOException When reading the record failed. + * @throws InterruptedException When the job was aborted. + */ + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) key = new ImmutableBytesWritable(); + if (value == null) value = new Result(); + try { + try { + value = this.scanner.next(); + if (value != null && value.isStale()) numStale++; + if (logScannerActivity) { + rowcount ++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // do not retry if the exception tells us not to do so + if (e instanceof DoNotRetryIOException) { + throw e; + } + // try to handle all other IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.info("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restart(scan.getStartRow()); + } else { + restart(lastSuccessfulRow); + scanner.next(); // skip presumed already mapped row + } + value = scanner.next(); + if (value != null && value.isStale()) numStale++; + numRestarts++; + } + if (value != null && value.size() > 0) { + key.set(value.getRow()); + lastSuccessfulRow = key.get(); + return true; + } + + updateCounters(); + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + LOG.info(ioe); + String lastRow = lastSuccessfulRow == null ? + "null" : Bytes.toStringBinary(lastSuccessfulRow); + LOG.info("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } + + /** + * If hbase runs on new version of mapreduce, RecordReader has access to + * counters thus can update counters based on scanMetrics. + * If hbase runs on old version of mapreduce, it won't be able to get + * access to counters and TableRecorderReader can't update counter values. + * @throws IOException + */ + private void updateCounters() throws IOException { + ScanMetrics scanMetrics = scanner.getScanMetrics(); + if (scanMetrics == null) { + return; + } + + updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); + } + + protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, + Method getCounter, TaskAttemptContext context, long numStale) { + // we can get access to counters only if hbase uses new mapreduce APIs + if (getCounter == null) { + return; + } + + try { + for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { + Counter ct = (Counter)getCounter.invoke(context, + HBASE_COUNTER_GROUP_NAME, entry.getKey()); + + ct.increment(entry.getValue()); + } + ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); + ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCAN_RESULTS_STALE")).increment(numStale); + } catch (Exception e) { + LOG.debug("can't update counter." + StringUtils.stringifyException(e)); + } + } + + /** + * The current progress of the record reader through its data. + * + * @return A number between 0.0 and 1.0, the fraction of the data read. + */ + public float getProgress() { + // Depends on the total number of tuples + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java new file mode 100644 index 0000000..f0bfc74 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Extends the basic <code>Reducer</code> class to add the required key and + * value input/output classes. While the input key and value as well as the + * output key can be anything handed in from the previous map phase the output + * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put} + * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when + * using the {@link TableOutputFormat} class. + * <p> + * This class is extended by {@link IdentityTableReducer} but can also be + * subclassed to implement similar features or any custom code needed. It has + * the advantage to enforce the output value to a specific basic type. + * + * @param <KEYIN> The type of the input key. + * @param <VALUEIN> The type of the input value. + * @param <KEYOUT> The type of the output key. + * @see org.apache.hadoop.mapreduce.Reducer + */ +@InterfaceAudience.Public +public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> +extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java new file mode 100644 index 0000000..691f0c5 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +/** + * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job + * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, + * wals, etc) directly to provide maximum performance. The snapshot is not required to be + * restored to the live cluster or cloned. This also allows to run the mapreduce job from an + * online or offline hbase cluster. The snapshot files can be exported by using the + * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, + * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. + * The snapshot should not be deleted while there are jobs reading from snapshot files. + * <p> + * Usage is similar to TableInputFormat, and + * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)} + * can be used to configure the job. + * <pre>{@code + * Job job = new Job(conf); + * Scan scan = new Scan(); + * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + * scan, MyTableMapper.class, MyMapKeyOutput.class, + * MyMapOutputValueWritable.class, job, true); + * } + * </pre> + * <p> + * Internally, this input format restores the snapshot into the given tmp directory. Similar to + * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading + * from each RecordReader. An internal RegionScanner is used to execute the + * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. + * <p> + * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from + * snapshot files and data files. + * To read from snapshot files directly from the file system, the user who is running the MR job + * must have sufficient permissions to access snapshot and reference files. + * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase + * user or the user must have group or other privileges in the filesystem (See HBASE-8369). + * Note that, given other users access to read from snapshot/data files will completely circumvent + * the access control enforced by HBase. + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ +@InterfaceAudience.Public +public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { + + public static class TableSnapshotRegionSplit extends InputSplit implements Writable { + private TableSnapshotInputFormatImpl.InputSplit delegate; + + // constructor for mapreduce framework / Writable + public TableSnapshotRegionSplit() { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); + } + + public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { + this.delegate = delegate; + } + + public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, + List<String> locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return delegate.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return delegate.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + delegate.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + delegate.readFields(in); + } + + public HRegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } + + } + + @VisibleForTesting + static class TableSnapshotRegionRecordReader extends + RecordReader<ImmutableBytesWritable, Result> { + private TableSnapshotInputFormatImpl.RecordReader delegate = + new TableSnapshotInputFormatImpl.RecordReader(); + private TaskAttemptContext context; + private Method getCounter; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, + InterruptedException { + this.context = context; + getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); + delegate.initialize( + ((TableSnapshotRegionSplit) split).delegate, + context.getConfiguration()); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean result = delegate.nextKeyValue(); + if (result) { + ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); + if (scanMetrics != null && context != null) { + TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0); + } + } + return result; + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return delegate.getCurrentKey(); + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return delegate.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return delegate.getProgress(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } + + @Override + public RecordReader<ImmutableBytesWritable, Result> createRecordReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new TableSnapshotRegionRecordReader(); + } + + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { + List<InputSplit> results = new ArrayList<>(); + for (TableSnapshotInputFormatImpl.InputSplit split : + TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { + results.add(new TableSnapshotRegionSplit(split)); + } + return results; + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param job the job to configure + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @throws IOException if an error occurs + */ + public static void setInput(Job job, String snapshotName, Path restoreDir) + throws IOException { + TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); + } +}