http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java new file mode 100644 index 0000000..9811a97 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -0,0 +1,313 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A Base for {@link TableInputFormat}s. Receives a {@link Table}, a + * byte[] of input columns and optionally a {@link Filter}. + * Subclasses may use other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * + * <p> + * An example of a subclass: + * <pre> + * class ExampleTIF extends TableInputFormatBase { + * + * {@literal @}Override + * protected void initialize(JobConf context) throws IOException { + * // We are responsible for the lifecycle of this connection until we hand it over in + * // initializeTable. + * Connection connection = + * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + * TableName tableName = TableName.valueOf("exampleTable"); + * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. + * initializeTable(connection, tableName); + * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + * Bytes.toBytes("columnB") }; + * // mandatory + * setInputColumns(inputColumns); + * // optional, by default we'll get everything for the given columns. + * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + * setRowFilter(exampleFilter); + * } + * } + * </pre> + */ + +@InterfaceAudience.Public +public abstract class TableInputFormatBase +implements InputFormat<ImmutableBytesWritable, Result> { + private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); + private byte [][] inputColumns; + private Table table; + private RegionLocator regionLocator; + private Connection connection; + private TableRecordReader tableRecordReader; + private Filter rowFilter; + + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses + * the default. + * + * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, + * JobConf, Reporter) + */ + public RecordReader<ImmutableBytesWritable, Result> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) + throws IOException { + // In case a subclass uses the deprecated approach or calls initializeTable directly + if (table == null) { + initialize(job); + } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + + TableSplit tSplit = (TableSplit) split; + // if no table record reader was provided use default + final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : + this.tableRecordReader; + trr.setStartRow(tSplit.getStartRow()); + trr.setEndRow(tSplit.getEndRow()); + trr.setHTable(this.table); + trr.setInputColumns(this.inputColumns); + trr.setRowFilter(this.rowFilter); + trr.init(); + return new RecordReader<ImmutableBytesWritable, Result>() { + + @Override + public void close() throws IOException { + trr.close(); + closeTable(); + } + + @Override + public ImmutableBytesWritable createKey() { + return trr.createKey(); + } + + @Override + public Result createValue() { + return trr.createValue(); + } + + @Override + public long getPos() throws IOException { + return trr.getPos(); + } + + @Override + public float getProgress() throws IOException { + return trr.getProgress(); + } + + @Override + public boolean next(ImmutableBytesWritable key, Result value) throws IOException { + return trr.next(key, value); + } + }; + } + + /** + * Calculates the splits that will serve as input for the map tasks. + * + * Splits are created in number equal to the smallest between numSplits and + * the number of {@link org.apache.hadoop.hbase.regionserver.HRegion}s in the table. + * If the number of splits is smaller than the number of + * {@link org.apache.hadoop.hbase.regionserver.HRegion}s then splits are spanned across + * multiple {@link org.apache.hadoop.hbase.regionserver.HRegion}s + * and are grouped the most evenly possible. In the + * case splits are uneven the bigger splits are placed first in the + * {@link InputSplit} array. + * + * @param job the map task {@link JobConf} + * @param numSplits a hint to calculate the number of splits (mapred.map.tasks). + * + * @return the input splits + * + * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int) + */ + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + if (this.table == null) { + initialize(job); + } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + + byte [][] startKeys = this.regionLocator.getStartKeys(); + if (startKeys == null || startKeys.length == 0) { + throw new IOException("Expecting at least one region"); + } + if (this.inputColumns == null || this.inputColumns.length == 0) { + throw new IOException("Expecting at least one column"); + } + int realNumSplits = numSplits > startKeys.length? startKeys.length: + numSplits; + InputSplit[] splits = new InputSplit[realNumSplits]; + int middle = startKeys.length / realNumSplits; + int startPos = 0; + for (int i = 0; i < realNumSplits; i++) { + int lastPos = startPos + middle; + lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos; + String regionLocation = regionLocator.getRegionLocation(startKeys[startPos]). + getHostname(); + splits[i] = new TableSplit(this.table.getName(), + startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]: + HConstants.EMPTY_START_ROW, regionLocation); + LOG.info("split: " + i + "->" + splits[i]); + startPos = lastPos; + } + return splits; + } + + /** + * Allows subclasses to initialize the table information. + * + * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. + * @param tableName The {@link TableName} of the table to process. + * @throws IOException + */ + protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (this.table != null || this.connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } + this.table = connection.getTable(tableName); + this.regionLocator = connection.getRegionLocator(tableName); + this.connection = connection; + } + + /** + * @param inputColumns to be passed in {@link Result} to the map task. + */ + protected void setInputColumns(byte [][] inputColumns) { + this.inputColumns = inputColumns; + } + + /** + * Allows subclasses to get the {@link Table}. + */ + protected Table getTable() { + if (table == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } + return this.table; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader + * to provide other {@link TableRecordReader} implementations. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } + + /** + * Allows subclasses to set the {@link Filter} to be used. + * + * @param rowFilter + */ + protected void setRowFilter(Filter rowFilter) { + this.rowFilter = rowFilter; + } + + /** + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * + */ + protected void initialize(JobConf job) throws IOException { + } + + /** + * Close the Table and related objects that were initialized via + * {@link #initializeTable(Connection, TableName)}. + * + * @throws IOException + */ + protected void closeTable() throws IOException { + close(table, connection); + table = null; + connection = null; + } + + private void close(Closeable... closables) throws IOException { + for (Closeable c : closables) { + if(c != null) { c.close(); } + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java new file mode 100644 index 0000000..a9f1e61 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMap.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Mapper; + +/** + * Scan an HBase table to sort by a specified sort column. + * If the column does not exist, the record is not passed to Reduce. + * + * @param <K> WritableComparable key class + * @param <V> Writable value class + */ +@InterfaceAudience.Public +public interface TableMap<K extends WritableComparable<? super K>, V> +extends Mapper<ImmutableBytesWritable, Result, K, V> { + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java new file mode 100644 index 0000000..63ec418 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java @@ -0,0 +1,376 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MutationSerialization; +import org.apache.hadoop.hbase.mapreduce.ResultSerialization; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.TokenUtil; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/** + * Utility for {@link TableMap} and {@link TableReduce} + */ +@InterfaceAudience.Public +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class TableMapReduceUtil { + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table The table name to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job configuration to adjust. + */ + public static void initTableMapJob(String table, String columns, + Class<? extends TableMap> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, JobConf job) { + initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, + true, TableInputFormat.class); + } + + public static void initTableMapJob(String table, String columns, + Class<? extends TableMap> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, JobConf job, boolean addDependencyJars) { + initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, TableInputFormat.class); + } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JobConf. + * + * @param table The table name to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job configuration to adjust. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initTableMapJob(String table, String columns, + Class<? extends TableMap> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, JobConf job, boolean addDependencyJars, + Class<? extends InputFormat> inputFormat) { + + job.setInputFormat(inputFormat); + job.setMapOutputValueClass(outputValueClass); + job.setMapOutputKeyClass(outputKeyClass); + job.setMapperClass(mapper); + job.setStrings("io.serializations", job.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); + FileInputFormat.addInputPaths(job, table); + job.set(TableInputFormat.COLUMN_LIST, columns); + if (addDependencyJars) { + try { + addDependencyJars(job); + } catch (IOException e) { + e.printStackTrace(); + } + } + try { + initCredentials(job); + } catch (IOException ioe) { + // just spit out the stack trace? really? + ioe.printStackTrace(); + } + } + + /** + * Sets up the job for reading from one or more multiple table snapshots, with one or more scans + * per snapshot. + * It bypasses hbase servers and read directly from snapshot files. + * + * @param snapshotScans map of snapshot name to scans on that snapshot. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + */ + public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, + Class<? extends TableMap> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, + JobConf job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { + MultiTableSnapshotInputFormat.setInput(job, snapshotScans, tmpRestoreDir); + + job.setInputFormat(MultiTableSnapshotInputFormat.class); + if (outputValueClass != null) { + job.setMapOutputValueClass(outputValueClass); + } + if (outputKeyClass != null) { + job.setMapOutputKeyClass(outputKeyClass); + } + job.setMapperClass(mapper); + if (addDependencyJars) { + addDependencyJars(job); + } + + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + + /** + * Sets up the job for reading from a table snapshot. It bypasses hbase servers + * and read directly from snapshot files. + * + * @param snapshotName The name of the snapshot (of a table) to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job to adjust. Make sure the passed job is + * carrying all necessary HBase configuration. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restore directory can be deleted. + * @throws IOException When setting up the details fails. + * @see TableSnapshotInputFormat + */ + public static void initTableSnapshotMapJob(String snapshotName, String columns, + Class<? extends TableMap> mapper, + Class<?> outputKeyClass, + Class<?> outputValueClass, JobConf job, + boolean addDependencyJars, Path tmpRestoreDir) + throws IOException { + TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); + initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, TableSnapshotInputFormat.class); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(job); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @throws IOException When determining the region count fails. + */ + public static void initTableReduceJob(String table, + Class<? extends TableReduce> reducer, JobConf job) + throws IOException { + initTableReduceJob(table, reducer, job, null); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @param partitioner Partitioner to use. Pass <code>null</code> to use + * default partitioner. + * @throws IOException When determining the region count fails. + */ + public static void initTableReduceJob(String table, + Class<? extends TableReduce> reducer, JobConf job, Class partitioner) + throws IOException { + initTableReduceJob(table, reducer, job, partitioner, true); + } + + /** + * Use this before submitting a TableReduce job. It will + * appropriately set up the JobConf. + * + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job configuration to adjust. + * @param partitioner Partitioner to use. Pass <code>null</code> to use + * default partitioner. + * @param addDependencyJars upload HBase jars and jars for any of the configured + * job classes via the distributed cache (tmpjars). + * @throws IOException When determining the region count fails. + */ + public static void initTableReduceJob(String table, + Class<? extends TableReduce> reducer, JobConf job, Class partitioner, + boolean addDependencyJars) throws IOException { + job.setOutputFormat(TableOutputFormat.class); + job.setReducerClass(reducer); + job.set(TableOutputFormat.OUTPUT_TABLE, table); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Put.class); + job.setStrings("io.serializations", job.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName()); + if (partitioner == HRegionPartitioner.class) { + job.setPartitionerClass(HRegionPartitioner.class); + int regions = + MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); + if (job.getNumReduceTasks() > regions) { + job.setNumReduceTasks(regions); + } + } else if (partitioner != null) { + job.setPartitionerClass(partitioner); + } + if (addDependencyJars) { + addDependencyJars(job); + } + initCredentials(job); + } + + public static void initCredentials(JobConf job) throws IOException { + UserProvider userProvider = UserProvider.instantiate(job); + if (userProvider.isHadoopSecurityEnabled()) { + // propagate delegation related props from launcher job to MR job + if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { + job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + } + + if (userProvider.isHBaseSecurityEnabled()) { + Connection conn = ConnectionFactory.createConnection(job); + try { + // login the server principal (if using secure Hadoop) + User user = userProvider.getCurrent(); + TokenUtil.addTokenForJob(conn, job, user); + } catch (InterruptedException ie) { + ie.printStackTrace(); + Thread.currentThread().interrupt(); + } finally { + conn.close(); + } + } + } + + /** + * Ensures that the given number of reduce tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + // Used by tests. + public static void limitNumReduceTasks(String table, JobConf job) + throws IOException { + int regions = + MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); + if (job.getNumReduceTasks() > regions) + job.setNumReduceTasks(regions); + } + + /** + * Ensures that the given number of map tasks for the given job + * configuration does not exceed the number of regions for the given table. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + // Used by tests. + public static void limitNumMapTasks(String table, JobConf job) + throws IOException { + int regions = + MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table)); + if (job.getNumMapTasks() > regions) + job.setNumMapTasks(regions); + } + + /** + * Sets the number of reduce tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void setNumReduceTasks(String table, JobConf job) + throws IOException { + job.setNumReduceTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), + TableName.valueOf(table))); + } + + /** + * Sets the number of map tasks for the given job configuration to the + * number of regions the given table has. + * + * @param table The table to get the region count for. + * @param job The current job configuration to adjust. + * @throws IOException When retrieving the table details fails. + */ + public static void setNumMapTasks(String table, JobConf job) + throws IOException { + job.setNumMapTasks(MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), + TableName.valueOf(table))); + } + + /** + * Sets the number of rows to return and cache with each scanner iteration. + * Higher caching values will enable faster mapreduce jobs at the expense of + * requiring more heap to contain the cached rows. + * + * @param job The current job configuration to adjust. + * @param batchSize The number of rows to return in batch with each scanner + * iteration. + */ + public static void setScannerCaching(JobConf job, int batchSize) { + job.setInt("hbase.client.scanner.caching", batchSize); + } + + /** + * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) + */ + public static void addDependencyJars(JobConf job) throws IOException { + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addHBaseDependencyJars(job); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses( + job, + // when making changes here, consider also mapreduce.TableMapReduceUtil + // pull job classes + job.getMapOutputKeyClass(), + job.getMapOutputValueClass(), + job.getOutputKeyClass(), + job.getOutputValueClass(), + job.getPartitionerClass(), + job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class), + job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class), + job.getCombinerClass()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java new file mode 100644 index 0000000..06b28ed --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -0,0 +1,134 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +/** + * Convert Map/Reduce output and write it to an HBase table + */ +@InterfaceAudience.Public +public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { + + /** JobConf parameter that specifies the output table */ + public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + + /** + * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) + * and write to an HBase table. + */ + protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { + private BufferedMutator m_mutator; + private Connection conn; + + + /** + * Instantiate a TableRecordWriter with the HBase HClient for writing. + * + * @deprecated Please use {@code #TableRecordWriter(JobConf)} This version does not clean up + * connections and will leak connections (removed in 2.0) + */ + @Deprecated + public TableRecordWriter(final BufferedMutator mutator) throws IOException { + this.m_mutator = mutator; + this.conn = null; + } + + /** + * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. + */ + public TableRecordWriter(JobConf job) throws IOException { + // expecting exactly one path + TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); + try { + this.conn = ConnectionFactory.createConnection(job); + this.m_mutator = conn.getBufferedMutator(tableName); + } finally { + if (this.m_mutator == null) { + conn.close(); + conn = null; + } + } + } + + public void close(Reporter reporter) throws IOException { + try { + if (this.m_mutator != null) { + this.m_mutator.close(); + } + } finally { + if (conn != null) { + this.conn.close(); + } + } + } + + public void write(ImmutableBytesWritable key, Put value) throws IOException { + m_mutator.mutate(new Put(value)); + } + } + + /** + * Creates a new record writer. + * + * Be aware that the baseline javadoc gives the impression that there is a single + * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new + * RecordWriter per call of this method. You must close the returned RecordWriter when done. + * Failure to do so will drop writes. + * + * @param ignored Ignored filesystem + * @param job Current JobConf + * @param name Name of the job + * @param progress + * @return The newly created writer instance. + * @throws IOException When creating the writer fails. + */ + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, + Progressable progress) + throws IOException { + // Clear write buffer on fail is true by default so no need to reset it. + return new TableRecordWriter(job); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + String tableName = job.get(OUTPUT_TABLE); + if (tableName == null) { + throw new IOException("Must specify table name"); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java new file mode 100644 index 0000000..cecef7d --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.RecordReader; + + +/** + * Iterate over an HBase table data, return (Text, RowResult) pairs + */ +@InterfaceAudience.Public +public class TableRecordReader +implements RecordReader<ImmutableBytesWritable, Result> { + + private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restart(byte[] firstRow) throws IOException { + this.recordReaderImpl.restart(firstRow); + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow()); + } + + /** + * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. + */ + public void setHTable(Table htable) { + this.recordReaderImpl.setHTable(htable); + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.recordReaderImpl.setInputColumns(inputColumns); + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.recordReaderImpl.setStartRow(startRow); + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.recordReaderImpl.setEndRow(endRow); + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.recordReaderImpl.setRowFilter(rowFilter); + } + + public void close() { + this.recordReaderImpl.close(); + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return this.recordReaderImpl.createKey(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Result createValue() { + return this.recordReaderImpl.createValue(); + } + + public long getPos() { + + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return this.recordReaderImpl.getPos(); + } + + public float getProgress() { + // Depends on the total number of tuples and getPos + return this.recordReaderImpl.getPos(); + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + return this.recordReaderImpl.next(key, value); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java new file mode 100644 index 0000000..f6b79c3 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java @@ -0,0 +1,259 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; + +/** + * Iterate over an HBase table data, return (Text, RowResult) pairs + */ +@InterfaceAudience.Public +public class TableRecordReaderImpl { + private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); + + private byte [] startRow; + private byte [] endRow; + private byte [] lastSuccessfulRow; + private Filter trrRowFilter; + private ResultScanner scanner; + private Table htable; + private byte [][] trrInputColumns; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; + + /** + * Restart from survivable exceptions by creating a new scanner. + * + * @param firstRow + * @throws IOException + */ + public void restart(byte[] firstRow) throws IOException { + Scan currentScan; + if ((endRow != null) && (endRow.length > 0)) { + if (trrRowFilter != null) { + Scan scan = new Scan(firstRow, endRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + scan.setCacheBlocks(false); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", endRow: " + + Bytes.toStringBinary(endRow)); + Scan scan = new Scan(firstRow, endRow); + TableInputFormat.addColumns(scan, trrInputColumns); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + } else { + LOG.debug("TIFB.restart, firstRow: " + + Bytes.toStringBinary(firstRow) + ", no endRow"); + + Scan scan = new Scan(firstRow); + TableInputFormat.addColumns(scan, trrInputColumns); + scan.setFilter(trrRowFilter); + this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.info("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } + } + + /** + * Build the scanner. Not done in constructor to allow for extension. + * + * @throws IOException + */ + public void init() throws IOException { + restart(startRow); + } + + byte[] getStartRow() { + return this.startRow; + } + /** + * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. + */ + public void setHTable(Table htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); + this.htable = htable; + } + + /** + * @param inputColumns the columns to be placed in {@link Result}. + */ + public void setInputColumns(final byte [][] inputColumns) { + this.trrInputColumns = inputColumns; + } + + /** + * @param startRow the first row in the split + */ + public void setStartRow(final byte [] startRow) { + this.startRow = startRow; + } + + /** + * + * @param endRow the last row in the split + */ + public void setEndRow(final byte [] endRow) { + this.endRow = endRow; + } + + /** + * @param rowFilter the {@link Filter} to be used. + */ + public void setRowFilter(Filter rowFilter) { + this.trrRowFilter = rowFilter; + } + + public void close() { + if (this.scanner != null) { + this.scanner.close(); + } + try { + this.htable.close(); + } catch (IOException ioe) { + LOG.warn("Error closing table", ioe); + } + } + + /** + * @return ImmutableBytesWritable + * + * @see org.apache.hadoop.mapred.RecordReader#createKey() + */ + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + /** + * @return RowResult + * + * @see org.apache.hadoop.mapred.RecordReader#createValue() + */ + public Result createValue() { + return new Result(); + } + + public long getPos() { + // This should be the ordinal tuple in the range; + // not clear how to calculate... + return 0; + } + + public float getProgress() { + // Depends on the total number of tuples and getPos + return 0; + } + + /** + * @param key HStoreKey as input key. + * @param value MapWritable as input value + * @return true if there was more data + * @throws IOException + */ + public boolean next(ImmutableBytesWritable key, Result value) + throws IOException { + Result result; + try { + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount ++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (IOException e) { + // do not retry if the exception tells us not to do so + if (e instanceof DoNotRetryIOException) { + throw e; + } + // try to handle all other IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restart(startRow); + } else { + restart(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); + } + + if (result != null && result.size() > 0) { + key.set(result.getRow()); + lastSuccessfulRow = key.get(); + value.copyFrom(result); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + LOG.info(ioe); + String lastRow = lastSuccessfulRow == null ? + "null" : Bytes.toStringBinary(lastSuccessfulRow); + LOG.info("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java new file mode 100644 index 0000000..91fb4a1 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableReduce.java @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reducer; + +/** + * Write a table, sorting by the input key + * + * @param <K> key class + * @param <V> value class + */ +@InterfaceAudience.Public +@SuppressWarnings("unchecked") +public interface TableReduce<K extends WritableComparable, V> +extends Reducer<K, V, ImmutableBytesWritable, Put> { + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java new file mode 100644 index 0000000..d7b49ff --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +/** + * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further + * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}. + * + * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + */ +@InterfaceAudience.Public +public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> { + + public static class TableSnapshotRegionSplit implements InputSplit { + private TableSnapshotInputFormatImpl.InputSplit delegate; + + // constructor for mapreduce framework / Writable + public TableSnapshotRegionSplit() { + this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); + } + + public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { + this.delegate = delegate; + } + + public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, + List<String> locations, Scan scan, Path restoreDir) { + this.delegate = + new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); + } + + @Override + public long getLength() throws IOException { + return delegate.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return delegate.getLocations(); + } + + @Override + public void write(DataOutput out) throws IOException { + delegate.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + delegate.readFields(in); + } + } + + static class TableSnapshotRecordReader + implements RecordReader<ImmutableBytesWritable, Result> { + + private TableSnapshotInputFormatImpl.RecordReader delegate; + + public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job) + throws IOException { + delegate = new TableSnapshotInputFormatImpl.RecordReader(); + delegate.initialize(split.delegate, job); + } + + @Override + public boolean next(ImmutableBytesWritable key, Result value) throws IOException { + if (!delegate.nextKeyValue()) { + return false; + } + ImmutableBytesWritable currentKey = delegate.getCurrentKey(); + key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength()); + value.copyFrom(delegate.getCurrentValue()); + return true; + } + + @Override + public ImmutableBytesWritable createKey() { + return new ImmutableBytesWritable(); + } + + @Override + public Result createValue() { + return new Result(); + } + + @Override + public long getPos() throws IOException { + return delegate.getPos(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public float getProgress() throws IOException { + return delegate.getProgress(); + } + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + List<TableSnapshotInputFormatImpl.InputSplit> splits = + TableSnapshotInputFormatImpl.getSplits(job); + InputSplit[] results = new InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); i++) { + results[i] = new TableSnapshotRegionSplit(splits.get(i)); + } + return results; + } + + @Override + public RecordReader<ImmutableBytesWritable, Result> + getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); + } + + /** + * Configures the job to use TableSnapshotInputFormat to read from a snapshot. + * @param job the job to configure + * @param snapshotName the name of the snapshot to read from + * @param restoreDir a temporary directory to restore the snapshot into. Current user should + * have write permissions to this directory, and this should not be a subdirectory of rootdir. + * After the job is finished, restoreDir can be deleted. + * @throws IOException if an error occurs + */ + public static void setInput(JobConf job, String snapshotName, Path restoreDir) + throws IOException { + TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java new file mode 100644 index 0000000..0784e5e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/TableSplit.java @@ -0,0 +1,154 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.InputSplit; + +/** + * A table split corresponds to a key range [low, high) + */ +@InterfaceAudience.Public +public class TableSplit implements InputSplit, Comparable<TableSplit> { + private TableName m_tableName; + private byte [] m_startRow; + private byte [] m_endRow; + private String m_regionLocation; + + /** default constructor */ + public TableSplit() { + this((TableName)null, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, ""); + } + + /** + * Constructor + * @param tableName + * @param startRow + * @param endRow + * @param location + */ + public TableSplit(TableName tableName, byte [] startRow, byte [] endRow, + final String location) { + this.m_tableName = tableName; + this.m_startRow = startRow; + this.m_endRow = endRow; + this.m_regionLocation = location; + } + + public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow, + final String location) { + this(TableName.valueOf(tableName), startRow, endRow, + location); + } + + /** @return table name */ + public TableName getTable() { + return this.m_tableName; + } + + /** @return table name */ + public byte [] getTableName() { + return this.m_tableName.getName(); + } + + /** @return starting row key */ + public byte [] getStartRow() { + return this.m_startRow; + } + + /** @return end row key */ + public byte [] getEndRow() { + return this.m_endRow; + } + + /** @return the region's hostname */ + public String getRegionLocation() { + return this.m_regionLocation; + } + + public String[] getLocations() { + return new String[] {this.m_regionLocation}; + } + + public long getLength() { + // Not clear how to obtain this... seems to be used only for sorting splits + return 0; + } + + public void readFields(DataInput in) throws IOException { + this.m_tableName = TableName.valueOf(Bytes.readByteArray(in)); + this.m_startRow = Bytes.readByteArray(in); + this.m_endRow = Bytes.readByteArray(in); + this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in)); + } + + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, this.m_tableName.getName()); + Bytes.writeByteArray(out, this.m_startRow); + Bytes.writeByteArray(out, this.m_endRow); + Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation)); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("HBase table split("); + sb.append("table name: ").append(m_tableName); + sb.append(", start row: ").append(Bytes.toStringBinary(m_startRow)); + sb.append(", end row: ").append(Bytes.toStringBinary(m_endRow)); + sb.append(", region location: ").append(m_regionLocation); + sb.append(")"); + return sb.toString(); + } + + @Override + public int compareTo(TableSplit o) { + return Bytes.compareTo(getStartRow(), o.getStartRow()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof TableSplit)) { + return false; + } + TableSplit other = (TableSplit)o; + return m_tableName.equals(other.m_tableName) && + Bytes.equals(m_startRow, other.m_startRow) && + Bytes.equals(m_endRow, other.m_endRow) && + m_regionLocation.equals(other.m_regionLocation); + } + + @Override + public int hashCode() { + int result = m_tableName != null ? m_tableName.hashCode() : 0; + result = 31 * result + Arrays.hashCode(m_startRow); + result = 31 * result + Arrays.hashCode(m_endRow); + result = 31 * result + (m_regionLocation != null ? m_regionLocation.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java new file mode 100644 index 0000000..1da3a52 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapred/package-info.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** +Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a> +Input/OutputFormats, a table indexing MapReduce job, and utility methods. + +<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a> +in the HBase Reference Guide for mapreduce over hbase documentation. +*/ +package org.apache.hadoop.hbase.mapred; http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java new file mode 100644 index 0000000..078033e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -0,0 +1,333 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; + + +/** + * A job with a a map and reduce phase to count cells in a table. + * The counter lists the following stats for a given table: + * <pre> + * 1. Total number of rows in the table + * 2. Total number of CFs across all rows + * 3. Total qualifiers across all rows + * 4. Total occurrence of each CF + * 5. Total occurrence of each qualifier + * 6. Total number of versions of each qualifier. + * </pre> + * + * The cellcounter can take optional parameters to use a user + * supplied row/family/qualifier string to use in the report and + * second a regex based or prefix based row filter to restrict the + * count operation to a limited subset of rows from the table or a + * start time and/or end time to limit the count to a time range. + */ +@InterfaceAudience.Public +public class CellCounter extends Configured implements Tool { + private static final Log LOG = + LogFactory.getLog(CellCounter.class.getName()); + + + /** + * Name of this 'program'. + */ + static final String NAME = "CellCounter"; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + /** + * Mapper that runs the count. + */ + static class CellCounterMapper + extends TableMapper<Text, IntWritable> { + /** + * Counter enumeration to count the actual rows. + */ + public static enum Counters { + ROWS, + CELLS + } + + private Configuration conf; + private String separator; + + // state of current row, family, column needs to persist across map() invocations + // in order to properly handle scanner batching, where a single qualifier may have too + // many versions for a single map() call + private byte[] lastRow; + private String currentRowKey; + byte[] currentFamily = null; + String currentFamilyName = null; + byte[] currentQualifier = null; + // family + qualifier + String currentQualifierName = null; + // rowkey + family + qualifier + String currentRowQualifierName = null; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + conf = context.getConfiguration(); + separator = conf.get("ReportSeparator",":"); + } + + /** + * Maps the data. + * + * @param row The current table row key. + * @param values The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification="Findbugs is blind to the Precondition null check") + public void map(ImmutableBytesWritable row, Result values, + Context context) + throws IOException { + Preconditions.checkState(values != null, + "values passed to the map is null"); + + try { + byte[] currentRow = values.getRow(); + if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { + lastRow = currentRow; + currentRowKey = Bytes.toStringBinary(currentRow); + currentFamily = null; + currentQualifier = null; + context.getCounter(Counters.ROWS).increment(1); + context.write(new Text("Total ROWS"), new IntWritable(1)); + } + if (!values.isEmpty()) { + int cellCount = 0; + for (Cell value : values.listCells()) { + cellCount++; + if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) { + currentFamily = CellUtil.cloneFamily(value); + currentFamilyName = Bytes.toStringBinary(currentFamily); + currentQualifier = null; + context.getCounter("CF", currentFamilyName).increment(1); + if (1 == context.getCounter("CF", currentFamilyName).getValue()) { + context.write(new Text("Total Families Across all Rows"), new IntWritable(1)); + context.write(new Text(currentFamily), new IntWritable(1)); + } + } + if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) { + currentQualifier = CellUtil.cloneQualifier(value); + currentQualifierName = currentFamilyName + separator + + Bytes.toStringBinary(currentQualifier); + currentRowQualifierName = currentRowKey + separator + currentQualifierName; + + context.write(new Text("Total Qualifiers across all Rows"), + new IntWritable(1)); + context.write(new Text(currentQualifierName), new IntWritable(1)); + } + // Increment versions + context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1)); + } + context.getCounter(Counters.CELLS).increment(cellCount); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + static class IntSumReducer<Key> extends Reducer<Key, IntWritable, + Key, IntWritable> { + + private IntWritable result = new IntWritable(); + public void reduce(Key key, Iterable<IntWritable> values, + Context context) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path outputDir = new Path(args[1]); + String reportSeparatorString = (args.length > 2) ? args[2]: ":"; + conf.set("ReportSeparator", reportSeparatorString); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(CellCounter.class); + Scan scan = getConfiguredScanForJob(conf, args); + TableMapReduceUtil.initTableMapperJob(tableName, scan, + CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); + job.setNumReduceTasks(1); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileOutputFormat.setOutputPath(job, outputDir); + job.setReducerClass(IntSumReducer.class); + return job; + } + + private static Scan getConfiguredScanForJob(Configuration conf, String[] args) + throws IOException { + // create scan with any properties set from TableInputFormat + Scan s = TableInputFormat.createScanFromConfiguration(conf); + // Set Scan Versions + if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) { + // default to all versions unless explicitly set + s.setMaxVersions(Integer.MAX_VALUE); + } + s.setCacheBlocks(false); + // Set RowFilter or Prefix Filter if applicable. + Filter rowFilter = getRowFilter(args); + if (rowFilter!= null) { + LOG.info("Setting Row Filter for counter."); + s.setFilter(rowFilter); + } + // Set TimeRange if defined + long timeRange[] = getTimeRange(args); + if (timeRange != null) { + LOG.info("Setting TimeRange for counter."); + s.setTimeRange(timeRange[0], timeRange[1]); + } + return s; + } + + + private static Filter getRowFilter(String[] args) { + Filter rowFilter = null; + String filterCriteria = (args.length > 3) ? args[3]: null; + if (filterCriteria == null) return null; + if (filterCriteria.startsWith("^")) { + String regexPattern = filterCriteria.substring(1, filterCriteria.length()); + rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern)); + } else { + rowFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); + } + return rowFilter; + } + + private static long[] getTimeRange(String[] args) throws IOException { + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; + long startTime = 0L; + long endTime = 0L; + + for (int i = 1; i < args.length; i++) { + System.out.println("i:" + i + "arg[i]" + args[i]); + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + } + } + + if (startTime == 0 && endTime == 0) + return null; + + endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime; + return new long [] {startTime, endTime}; + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("ERROR: Wrong number of parameters: " + args.length); + System.err.println("Usage: CellCounter "); + System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " + + "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]"); + System.err.println(" Note: -D properties will be applied to the conf used. "); + System.err.println(" Additionally, all of the SCAN properties from TableInputFormat"); + System.err.println(" can be specified to get fine grained control on what is counted.."); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>"); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>"); + System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\""); + System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ..."); + System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>"); + System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>"); + System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>"); + System.err.println(" <reportSeparator> parameter can be used to override the default report separator " + + "string : used to separate the rowId/column family name and qualifier name."); + System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " + + "operation to a limited subset of rows from the table based on regex or prefix pattern."); + return -1; + } + Job job = createSubmittableJob(getConf(), args); + return (job.waitForCompletion(true) ? 0 : 1); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args); + System.exit(errCode); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java new file mode 100644 index 0000000..1d4d37b --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type. + */ +@InterfaceAudience.Public +public class CellCreator { + + public static final String VISIBILITY_EXP_RESOLVER_CLASS = + "hbase.mapreduce.visibility.expression.resolver.class"; + + private VisibilityExpressionResolver visExpResolver; + + public CellCreator(Configuration conf) { + Class<? extends VisibilityExpressionResolver> clazz = conf.getClass( + VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class, + VisibilityExpressionResolver.class); + this.visExpResolver = ReflectionUtils.newInstance(clazz, conf); + this.visExpResolver.init(); + } + + /** + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param value column value + * @param voffset value offset + * @param vlength value length + * @return created Cell + * @throws IOException + */ + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength) throws IOException { + return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength, + timestamp, value, voffset, vlength, (List<Tag>)null); + } + + /** + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param value column value + * @param voffset value offset + * @param vlength value length + * @param visExpression visibility expression to be associated with cell + * @return created Cell + * @throws IOException + */ + @Deprecated + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, String visExpression) throws IOException { + List<Tag> visTags = null; + if (visExpression != null) { + visTags = this.visExpResolver.createVisibilityExpTags(visExpression); + } + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags); + } + + /** + * @param row row key + * @param roffset row offset + * @param rlength row length + * @param family family name + * @param foffset family offset + * @param flength family length + * @param qualifier column qualifier + * @param qoffset qualifier offset + * @param qlength qualifier length + * @param timestamp version timestamp + * @param value column value + * @param voffset value offset + * @param vlength value length + * @param tags + * @return created Cell + * @throws IOException + */ + public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, + byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset, + int vlength, List<Tag> tags) throws IOException { + return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, + qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags); + } + + /** + * @return Visibility expression resolver + */ + public VisibilityExpressionResolver getVisibilityExpressionResolver() { + return this.visExpResolver; + } +}