Repository: hbase Updated Branches: refs/heads/master 7442e5bd6 -> 3587fe832
HBASE-12423 Use a non-managed Table in TableOutputFormat (Solomon Duskis) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3587fe83 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3587fe83 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3587fe83 Branch: refs/heads/master Commit: 3587fe8324a94351fa2f518d3c4bcb15d33d8ab0 Parents: 7442e5b Author: stack <st...@apache.org> Authored: Tue Nov 4 16:25:16 2014 -0800 Committer: stack <st...@apache.org> Committed: Tue Nov 4 16:25:16 2014 -0800 ---------------------------------------------------------------------- .../hbase/mapreduce/TableOutputFormat.java | 33 ++++++++------------ .../regionserver/RegionServerServices.java | 6 ++-- 2 files changed, 16 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3587fe83/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 52b8e45..da40b2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -46,7 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * while the output value <u>must</u> be either a {@link Put} or a * {@link Delete} instance. * - * @param <KEY> The type of the key. Ignored in this class. + * <p><KEY> is the type of the key. Ignored in this class. */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -79,28 +81,17 @@ implements Configurable { /** The configuration. */ private Configuration conf = null; - private HTable table; + private Table table; + private Connection connection; /** * Writes the reducer output to an HBase table. * * @param <KEY> The type of the key. */ - protected static class TableRecordWriter<KEY> + protected class TableRecordWriter extends RecordWriter<KEY, Mutation> { - /** The table to write to. */ - private Table table; - - /** - * Instantiate a TableRecordWriter with the HBase HClient for writing. - * - * @param table The table to write to. - */ - public TableRecordWriter(Table table) { - this.table = table; - } - /** * Closes the writer, in this case flush table commits. * @@ -112,6 +103,7 @@ implements Configurable { public void close(TaskAttemptContext context) throws IOException { table.close(); + connection.close(); } /** @@ -125,8 +117,8 @@ implements Configurable { @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) this.table.put(new Put((Put)value)); - else if (value instanceof Delete) this.table.delete(new Delete((Delete)value)); + if (value instanceof Put) table.put(new Put((Put)value)); + else if (value instanceof Delete) table.delete(new Delete((Delete)value)); else throw new IOException("Pass a Delete or a Put"); } } @@ -144,7 +136,7 @@ implements Configurable { public RecordWriter<KEY, Mutation> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { - return new TableRecordWriter<KEY>(this.table); + return new TableRecordWriter(); } /** @@ -205,8 +197,9 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.table = new HTable(this.conf, TableName.valueOf(tableName)); - this.table.setAutoFlush(false, true); + this.connection = ConnectionFactory.createConnection(this.conf); + this.table = connection.getTable(TableName.valueOf(tableName)); + ((HTable) this.table).setAutoFlush(false, true); LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e); http://git-wip-us.apache.org/repos/asf/hbase/blob/3587fe83/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 67c4993..f02b8ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -139,12 +139,12 @@ public interface RegionServerServices * @return all the online tables in this RS */ Set<TableName> getOnlineTables(); - - + + /** * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to be * available for handling - * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint + * @param service the {@code Service} subclass instance to expose as a coprocessor endpoint * @return {@code true} if the registration was successful, {@code false} */ boolean registerService(Service service);