Repository: hbase Updated Branches: refs/heads/master 6f904fe4c -> 332515ed3
HBASE-13028 Cleanup MapReduce InputFormats Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/332515ed Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/332515ed Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/332515ed Branch: refs/heads/master Commit: 332515ed346e1ffc104ce3bac986bb7030747a03 Parents: 6f904fe Author: Sean Busbey <bus...@apache.org> Authored: Fri Feb 13 15:47:11 2015 -0600 Committer: Sean Busbey <bus...@cloudera.com> Committed: Sat Feb 14 14:13:36 2015 -0600 ---------------------------------------------------------------------- .../hadoop/hbase/mapred/TableInputFormat.java | 18 +- .../hbase/mapred/TableInputFormatBase.java | 186 ++++++++++++++++--- .../hbase/mapreduce/TableInputFormat.java | 4 +- .../hbase/mapreduce/TableInputFormatBase.java | 119 +++++++----- .../hbase/mapred/TestTableInputFormat.java | 72 ++++++- .../hbase/mapreduce/TestTableInputFormat.java | 63 +++++-- 6 files changed, 364 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java index 368510f..814daea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; @@ -50,6 +49,15 @@ public class TableInputFormat extends TableInputFormatBase implements public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; public void configure(JobConf job) { + try { + initialize(job); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + + @Override + protected void initialize(JobConf job) throws IOException { Path[] tableNames = FileInputFormat.getInputPaths(job); String colArg = job.get(COLUMN_LIST); String[] colNames = colArg.split(" "); @@ -58,12 +66,8 @@ public class TableInputFormat extends TableInputFormatBase implements m_cols[i] = Bytes.toBytes(colNames[i]); } setInputColumns(m_cols); - try { - Connection connection = ConnectionFactory.createConnection(job); - setHTable((HTable) connection.getTable(TableName.valueOf(tableNames[0].getName()))); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } + Connection connection = ConnectionFactory.createConnection(job); + initializeTable(connection, TableName.valueOf(tableNames[0].getName())); } public void validateInput(JobConf job) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java index d98b5f4..b5b79d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.mapred; +import java.io.Closeable; import java.io.IOException; import org.apache.commons.logging.Log; @@ -25,6 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; @@ -40,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter; * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a * byte[] of input columns and optionally a {@link Filter}. * Subclasses may use other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * * <p> * An example of a subclass: * <pre> - * class ExampleTIF extends TableInputFormatBase implements JobConfigurable { + * class ExampleTIF extends TableInputFormatBase { * * {@literal @}Override - * public void configure(JobConf job) { - * try { - * HTable exampleTable = new HTable(HBaseConfiguration.create(job), - * Bytes.toBytes("exampleTable")); - * // mandatory - * setHTable(exampleTable); - * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), - * Bytes.toBytes("columnB") }; - * // mandatory - * setInputColumns(inputColumns); - * // optional, by default we'll get everything for the given columns. - * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); - * setRowFilter(exampleFilter); - * } catch (IOException exception) { - * throw new RuntimeException("Failed to configure for job.", exception); - * } + * protected void initialize(JobConf context) throws IOException { + * // We are responsible for the lifecycle of this connection until we hand it over in + * // initializeTable. + * Connection connection = + * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + * TableName tableName = TableName.valueOf("exampleTable"); + * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. + * initializeTable(connection, tableName); + * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + * Bytes.toBytes("columnB") }; + * // mandatory + * setInputColumns(inputColumns); + * // optional, by default we'll get everything for the given columns. + * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + * setRowFilter(exampleFilter); * } * } * </pre> @@ -74,9 +84,17 @@ implements InputFormat<ImmutableBytesWritable, Result> { private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class); private byte [][] inputColumns; private HTable table; + private Connection connection; private TableRecordReader tableRecordReader; private Filter rowFilter; + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + /** * Builds a TableRecordReader. If no TableRecordReader was provided, uses * the default. @@ -87,19 +105,63 @@ implements InputFormat<ImmutableBytesWritable, Result> { public RecordReader<ImmutableBytesWritable, Result> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { + // In case a subclass uses the deprecated approach or calls initializeTable directly + if (table == null) { + initialize(job); + } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + TableSplit tSplit = (TableSplit) split; - TableRecordReader trr = this.tableRecordReader; // if no table record reader was provided use default - if (trr == null) { - trr = new TableRecordReader(); - } + final TableRecordReader trr = this.tableRecordReader == null ? new TableRecordReader() : + this.tableRecordReader; trr.setStartRow(tSplit.getStartRow()); trr.setEndRow(tSplit.getEndRow()); trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); trr.setRowFilter(this.rowFilter); trr.init(); - return trr; + return new RecordReader<ImmutableBytesWritable, Result>() { + + @Override + public void close() throws IOException { + trr.close(); + closeTable(); + } + + @Override + public ImmutableBytesWritable createKey() { + return trr.createKey(); + } + + @Override + public Result createValue() { + return trr.createValue(); + } + + @Override + public long getPos() throws IOException { + return trr.getPos(); + } + + @Override + public float getProgress() throws IOException { + return trr.getProgress(); + } + + @Override + public boolean next(ImmutableBytesWritable key, Result value) throws IOException { + return trr.next(key, value); + } + }; } /** @@ -123,8 +185,18 @@ implements InputFormat<ImmutableBytesWritable, Result> { */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { if (this.table == null) { - throw new IOException("No table was provided"); + initialize(job); } + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); + } + byte [][] startKeys = this.table.getStartKeys(); if (startKeys == null || startKeys.length == 0) { throw new IOException("Expecting at least one region"); @@ -152,6 +224,22 @@ implements InputFormat<ImmutableBytesWritable, Result> { } /** + * Allows subclasses to initialize the table information. + * + * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close. + * @param tableName The {@link TableName} of the table to process. + * @throws IOException + */ + protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (table != null || connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } + this.table = (HTable) connection.getTable(tableName); + this.connection = connection; + } + + /** * @param inputColumns to be passed in {@link Result} to the map task. */ protected void setInputColumns(byte [][] inputColumns) { @@ -160,8 +248,20 @@ implements InputFormat<ImmutableBytesWritable, Result> { /** * Allows subclasses to get the {@link HTable}. + * @deprecated use {@link #getTable()} + */ + @Deprecated + protected HTable getHTable() { + return (HTable) getTable(); + } + + /** + * Allows subclasses to get the {@link Table}. */ - protected Table getHTable() { + protected Table getTable() { + if (table == null) { + throw new IllegalStateException(NOT_INITIALIZED); + } return this.table; } @@ -169,7 +269,9 @@ implements InputFormat<ImmutableBytesWritable, Result> { * Allows subclasses to set the {@link HTable}. * * @param table to get the data from + * @deprecated use {@link #initializeTable(Connection,TableName)} */ + @Deprecated protected void setHTable(HTable table) { this.table = table; } @@ -192,4 +294,40 @@ implements InputFormat<ImmutableBytesWritable, Result> { protected void setRowFilter(Filter rowFilter) { this.rowFilter = rowFilter; } + + /** + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link #getSplits(JobConf, int)}, + * will call {@link #initialize(JobConf)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * + */ + protected void initialize(JobConf job) throws IOException { + } + + /** + * Close the Table and related objects that were initialized via + * {@link #initializeTable(Connection, TableName)}. + * + * @throws IOException + */ + protected void closeTable() throws IOException { + close(table, connection); + table = null; + connection = null; + } + + private void close(Closeable... closables) throws IOException { + for (Closeable c : closables) { + if(c != null) { c.close(); } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 8896eb0..bc2537b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -175,7 +175,9 @@ implements Configurable { } @Override - protected void initialize() { + protected void initialize(JobContext context) throws IOException { + // Do we have to worry about mis-matches between the Configuration from setConf and the one + // in this context? TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); try { initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index adfe493..6c42d7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -64,16 +64,28 @@ import org.apache.hadoop.util.StringUtils; * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName}, * an {@link Scan} instance that defines the input columns etc. Subclasses may use * other TableRecordReader implementations. + * + * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to + * function properly. Each of the entry points to this class used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information. If your subclass overrides either of these + * methods, either call the parent version or call initialize yourself. + * * <p> * An example of a subclass: * <pre> - * class ExampleTIF extends TableInputFormatBase implements JobConfigurable { - * - * private JobConf job; + * class ExampleTIF extends TableInputFormatBase { * * {@literal @}Override - * public void configure(JobConf job) { - * this.job = job; + * protected void initialize(JobContext context) throws IOException { + * // We are responsible for the lifecycle of this connection until we hand it over in + * // initializeTable. + * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( + * job.getConfiguration())); + * TableName tableName = TableName.valueOf("exampleTable"); + * // mandatory. once passed here, TableInputFormatBase will handle closing the connection. + * initializeTable(connection, tableName); * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), * Bytes.toBytes("columnB") }; * // optional, by default we'll get everything for the table. @@ -85,23 +97,6 @@ import org.apache.hadoop.util.StringUtils; * scan.setFilter(exampleFilter); * setScan(scan); * } - * - * {@literal @}Override - * protected void initialize() { - * if (job == null) { - * throw new IllegalStateException("must have already gotten the JobConf before " + - * "initialize is called."); - * } - * try { - * Connection connection = - * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); - * TableName tableName = TableName.valueOf("exampleTable"); - * // mandatory - * initializeTable(connection, tableName); - * } catch (IOException exception) { - * throw new RuntimeException("Failed to initialize.", exception); - * } - * } * } * </pre> */ @@ -122,6 +117,13 @@ extends InputFormat<ImmutableBytesWritable, Result> { final Log LOG = LogFactory.getLog(TableInputFormatBase.class); + private static final String NOT_INITIALIZED = "The input format instance has not been properly " + + "initialized. Ensure you call initializeTable either in your constructor or initialize " + + "method"; + private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."; + /** Holds the details for the internal scanner. * * @see Scan */ @@ -158,14 +160,18 @@ extends InputFormat<ImmutableBytesWritable, Result> { public RecordReader<ImmutableBytesWritable, Result> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { + // Just in case a subclass is relying on JobConfigurable magic. if (table == null) { - initialize(); + initialize(context); } - if (getTable() == null) { - // initialize() must not have been implemented in the subclass. - throw new IOException("Cannot create a record reader because of a" + - " previous error. Please look at the previous logs lines from" + - " the task's full log for more details."); + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); } TableSplit tSplit = (TableSplit) split; LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes."); @@ -230,14 +236,20 @@ extends InputFormat<ImmutableBytesWritable, Result> { public List<InputSplit> getSplits(JobContext context) throws IOException { boolean closeOnFinish = false; + // Just in case a subclass is relying on JobConfigurable magic. if (table == null) { - initialize(); + initialize(context); closeOnFinish = true; } - if (getTable() == null) { - // initialize() wasn't implemented, so the table is null. - throw new IOException("No table was provided."); + // null check in case our child overrides getTable to not throw. + try { + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. + throw new IOException(INITIALIZATION_ERROR); + } + } catch (IllegalStateException exception) { + throw new IOException(INITIALIZATION_ERROR, exception); } try { @@ -334,6 +346,10 @@ extends InputFormat<ImmutableBytesWritable, Result> { } } + /** + * @deprecated mistakenly made public in 0.98.7. scope will change to package-private + */ + @Deprecated public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { @@ -366,7 +382,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { * @see org.apache.hadoop.mapreduce.InputFormat#getSplits( * org.apache.hadoop.mapreduce.JobContext) */ - public List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context, + private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context, long average) throws IOException { List<InputSplit> resultList = new ArrayList<InputSplit>(); Configuration conf = context.getConfiguration(); @@ -440,6 +456,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { * @param isText It determines to use text key mode or binary key mode * @return The split point in the region. */ + @InterfaceAudience.Private public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) { byte upperLimitByte; byte lowerLimitByte; @@ -519,8 +536,6 @@ extends InputFormat<ImmutableBytesWritable, Result> { } /** - * - * * Test if the given region is to be included in the InputSplit while splitting * the regions of a table. * <p> @@ -547,7 +562,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { /** * Allows subclasses to get the {@link HTable}. * - * @deprecated + * @deprecated use {@link #getTable()} */ @Deprecated protected HTable getHTable() { @@ -559,7 +574,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { */ protected RegionLocator getRegionLocator() { if (regionLocator == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return regionLocator; } @@ -569,7 +584,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { */ protected Table getTable() { if (table == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return table; } @@ -579,7 +594,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { */ protected Admin getAdmin() { if (admin == null) { - initialize(); + throw new IllegalStateException(NOT_INITIALIZED); } return admin; } @@ -587,6 +602,9 @@ extends InputFormat<ImmutableBytesWritable, Result> { /** * Allows subclasses to set the {@link HTable}. * + * Will attempt to reuse the underlying Connection for our own needs, including + * retreiving an Admin interface to the HBase cluster. + * * @param table The table to get the data from. * @throws IOException * @deprecated Use {@link #initializeTable(Connection, TableName)} instead. @@ -623,6 +641,10 @@ extends InputFormat<ImmutableBytesWritable, Result> { * @throws IOException */ protected void initializeTable(Connection connection, TableName tableName) throws IOException { + if (table != null || connection != null) { + LOG.warn("initializeTable called multiple times. Overwriting connection and table " + + "reference; TableInputFormatBase will not close these old references when done."); + } this.table = connection.getTable(tableName); this.regionLocator = connection.getRegionLocator(tableName); this.admin = connection.getAdmin(); @@ -659,12 +681,21 @@ extends InputFormat<ImmutableBytesWritable, Result> { } /** - * This method will be called when any of the following are referenced, but not yet initialized: - * admin, regionLocator, table. Subclasses will have the opportunity to call - * {@link #initializeTable(Connection, TableName)} + * Handle subclass specific set up. + * Each of the entry points used by the MapReduce framework, + * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)}, + * will call {@link #initialize(JobContext)} as a convenient centralized location to handle + * retrieving the necessary configuration information and calling + * {@link #initializeTable(Connection, TableName)}. + * + * Subclasses should implement their initialize call such that it is safe to call multiple times. + * The current TableInputFormatBase implementation relies on a non-null table reference to decide + * if an initialize call is needed, but this behavior may change in the future. In particular, + * it is critical that initializeTable not be called multiple times since this will leak + * Connection instances. + * */ - protected void initialize() { - + protected void initialize(JobContext context) throws IOException { } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java index 234a2e8..d7dd8ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java @@ -36,6 +36,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; @@ -322,8 +325,30 @@ public class TestTableInputFormat { LOG.info("testing use of an InputFormat taht extends InputFormatBase"); final Table table = createTable(Bytes.toBytes("exampleTable"), new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleTIF.class); + } + + @Test + public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "as it was given in 0.98."); + final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleDeprecatedTIF.class); + } + + @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { final JobConf job = MapreduceTestingShim.getJobConf(mrCluster); - job.setInputFormat(ExampleTIF.class); + job.setInputFormat(clazz); job.setOutputFormat(NullOutputFormat.class); job.setMapperClass(ExampleVerifier.class); job.setNumReduceTasks(0); @@ -373,13 +398,13 @@ public class TestTableInputFormat { } - public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable { + public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { @Override public void configure(JobConf job) { try { HTable exampleTable = new HTable(HBaseConfiguration.create(job), - Bytes.toBytes("exampleTable")); + Bytes.toBytes("exampleDeprecatedTable")); // mandatory setHTable(exampleTable); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), @@ -396,5 +421,46 @@ public class TestTableInputFormat { } + public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + initialize(job); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleJobConfigurableTable"); + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleTable"); + } + + protected void initialize(JobConf job, String table) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf(table); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + // mandatory + setInputColumns(inputColumns); + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + // optional + setRowFilter(exampleFilter); + } + + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/332515ed/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java index 2602961..566a642 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java @@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.junit.AfterClass; @@ -343,6 +344,16 @@ public class TestTableInputFormat { } @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() + throws IOException, InterruptedException, ClassNotFoundException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + @Test public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException, InterruptedException, ClassNotFoundException { LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + @@ -422,13 +433,43 @@ public class TestTableInputFormat { } - public static class ExampleTIF extends TableInputFormatBase implements JobConfigurable { - private JobConf job; + public static class ExampleJobConfigurableTIF extends TableInputFormatBase + implements JobConfigurable { @Override public void configure(JobConf job) { - this.job = job; + try { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + //optional + Scan scan = new Scan(); + for (byte[] family : inputColumns) { + scan.addFamily(family); + } + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + scan.setFilter(exampleFilter); + setScan(scan); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobContext job) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( + job.getConfiguration())); + TableName tableName = TableName.valueOf("exampleTable"); + // mandatory + initializeTable(connection, tableName); byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }; //optional @@ -441,22 +482,6 @@ public class TestTableInputFormat { setScan(scan); } - @Override - protected void initialize() { - if (job == null) { - throw new IllegalStateException("must have already gotten the JobConf before initialize " + - "is called."); - } - try { - Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); - TableName tableName = TableName.valueOf("exampleTable"); - // mandatory - initializeTable(connection, tableName); - } catch (IOException exception) { - throw new RuntimeException("Failed to initialize.", exception); - } - } - } }