Repository: gora Updated Branches: refs/heads/GORA-443 [created] 592c4a95d
GORA-443 Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/01856b56 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/01856b56 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/01856b56 Branch: refs/heads/GORA-443 Commit: 01856b565be8b6e890130dedc3bed89f63c96daa Parents: 4192c87 Author: Renato Marroquin <maren...@inf.ethz.ch> Authored: Wed Aug 31 13:26:51 2016 +0200 Committer: Renato Marroquin <maren...@inf.ethz.ch> Committed: Wed Aug 31 13:26:51 2016 +0200 ---------------------------------------------------------------------- .../apache/gora/store/DataStoreTestUtil.java | 5 +- .../org/apache/gora/hbase/store/HBaseStore.java | 47 ++- .../gora/hbase/store/HBaseTableConnection.java | 351 ++++++------------- gora-hbase/src/test/conf/hbase-site.xml | 4 + .../apache/gora/hbase/store/TestHBaseStore.java | 31 +- 5 files changed, 148 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java ---------------------------------------------------------------------- diff --git a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java index 551b90a..0b4fed4 100644 --- a/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java +++ b/gora-core/src/test/java/org/apache/gora/store/DataStoreTestUtil.java @@ -1079,12 +1079,13 @@ public class DataStoreTestUtil { store.deleteByQuery(query); store.deleteByQuery(query);//don't you love that HBase sometimes does not delete arbitrarily - store.flush(); - assertNumResults(store.newQuery(), URLS.length); + + //assert that data is deleted for (int i = 0; i < URLS.length; i++) { + store.flush(); WebPage page = store.get(URLS[i]); assertNotNull(page); if( URLS[i].compareTo(startKey) < 0 || URLS[i].compareTo(endKey) >= 0) { http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java index 00fe60b..51f33d0 100644 --- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java +++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java @@ -57,13 +57,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.jdom.Document; @@ -88,7 +82,7 @@ implements Configurable { private static final String SCANNER_CACHING_PROPERTIES_KEY = "scanner.caching" ; private static final int SCANNER_CACHING_PROPERTIES_DEFAULT = 0 ; - private volatile HBaseAdmin admin; + private volatile Admin admin; private volatile HBaseTableConnection table; @@ -110,10 +104,10 @@ implements Configurable { public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { try { - super.initialize(keyClass, persistentClass, properties); + this.conf = HBaseConfiguration.create(getConf()); - admin = new HBaseAdmin(this.conf); + admin = ConnectionFactory.createConnection(getConf()).getAdmin(); mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); filterUtil = new HBaseFilterUtil<>(this.conf); } catch (FileNotFoundException ex) { @@ -175,8 +169,8 @@ implements Configurable { if(!schemaExists()) { return; } - admin.disableTable(getSchemaName()); - admin.deleteTable(getSchemaName()); + admin.disableTable(mapping.getTable().getTableName()); + admin.deleteTable(mapping.getTable().getTableName()); } catch(IOException ex2){ LOG.error(ex2.getMessage(), ex2); } @@ -185,7 +179,7 @@ implements Configurable { @Override public boolean schemaExists() { try{ - return admin.tableExists(mapping.getTableName()); + return admin.tableExists(mapping.getTable().getTableName()); } catch(IOException ex2){ LOG.error(ex2.getMessage(), ex2); return false; @@ -241,13 +235,14 @@ implements Configurable { addPutsAndDeletes(put, delete, o, field.schema().getType(), field.schema(), hcol, hcol.getQualifier()); } - if (put.size() > 0) { - table.put(put); - } + if (delete.size() > 0) { table.delete(delete); - table.delete(delete); - table.delete(delete); // HBase sometimes does not delete arbitrarily +// table.delete(delete); +// table.delete(delete); // HBase sometimes does not delete arbitrarily + } + if (put.size() > 0) { + table.put(put); } } catch (IOException ex2) { LOG.error(ex2.getMessage(), ex2); @@ -260,16 +255,18 @@ implements Configurable { case UNION: if (isNullable(schema) && o == null) { if (qualifier == null) { - delete.deleteFamily(hcol.getFamily()); +// delete.deleteFamily(hcol.getFamily()); + delete.addFamily(hcol.getFamily()); } else { - delete.deleteColumn(hcol.getFamily(), qualifier); +// delete.deleteColumn(hcol.getFamily(), qualifier); + delete.addColumn(hcol.getFamily(), qualifier); } } else { // int index = GenericData.get().resolveUnion(schema, o); int index = getResolvedUnionIndex(schema); if (index > 1) { //if more than 2 type in union, serialize directly for now byte[] serializedBytes = toBytes(o, schema); - put.add(hcol.getFamily(), qualifier, serializedBytes); + put.addColumn(hcol.getFamily(), qualifier, serializedBytes); } else { Schema resolvedSchema = schema.getTypes().get(index); addPutsAndDeletes(put, delete, o, resolvedSchema.getType(), @@ -281,9 +278,11 @@ implements Configurable { // if it's a map that has been modified, then the content should be replaced by the new one // This is because we don't know if the content has changed or not. if (qualifier == null) { - delete.deleteFamily(hcol.getFamily()); + //delete.deleteFamily(hcol.getFamily()); + delete.addFamily(hcol.getFamily()); } else { - delete.deleteColumn(hcol.getFamily(), qualifier); + //delete.deleteColumn(hcol.getFamily(), qualifier); + delete.addColumn(hcol.getFamily(), qualifier); } @SuppressWarnings({ "rawtypes", "unchecked" }) Set<Entry> set = ((Map) o).entrySet(); @@ -303,7 +302,7 @@ implements Configurable { break; default: byte[] serializedBytes = toBytes(o, schema); - put.add(hcol.getFamily(), qualifier, serializedBytes); + put.addColumn(hcol.getFamily(), qualifier, serializedBytes); break; } } http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java index 6803827..000a8b5 100644 --- a/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java +++ b/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseTableConnection.java @@ -18,63 +18,67 @@ package org.apache.gora.hbase.store; import java.io.IOException; -import java.io.InterruptedIOException; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Pair; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Service; - /** * Thread safe implementation to connect to a HBase table. * */ -public class HBaseTableConnection implements HTableInterface { +public class HBaseTableConnection { /* * The current implementation uses ThreadLocal HTable instances. It keeps * track of the floating instances in order to correctly flush and close * the connection when it is closed. HBase itself provides a utility called - * HTablePool for maintaining a pool of tables, but there are still some + * HTablePool for maintaining a tPool of tables, but there are still some * drawbacks that are only solved in later releases. - * + * */ - + private final Configuration conf; - private final ThreadLocal<HTable> tables; - private final BlockingQueue<HTable> pool = new LinkedBlockingQueue<>(); + private final Connection connection; + private final RegionLocator regionLocator; + // BufferedMutator used for doing async flush i.e. autoflush = false + private final ThreadLocal<ConcurrentLinkedQueue<Mutation>> buffers; + private final ThreadLocal<Table> tables; + + private final BlockingQueue<Table> tPool = new LinkedBlockingQueue<>(); + private final BlockingQueue<ConcurrentLinkedQueue<Mutation>> bPool = new LinkedBlockingQueue<>(); private final boolean autoFlush; private final TableName tableName; - + +// public class MutationPair { +// private Mutation mutation; +// private boolean type; +// +// public void MutationPair(Mutation m, boolean t) { +// this.mutation = m; +// this.type = t; +// } +// +// public boolean isType() { +// return type; +// } +// +// public Mutation getMutation() { +// return mutation; +// } +// } /** * Instantiate new connection. - * + * * @param conf * @param tableName * @param autoflush @@ -83,288 +87,129 @@ public class HBaseTableConnection implements HTableInterface { public HBaseTableConnection(Configuration conf, String tableName, boolean autoflush) throws IOException { this.conf = conf; + this.tables = new ThreadLocal<>(); + this.buffers = new ThreadLocal<>(); + this.connection = ConnectionFactory.createConnection(conf); this.tableName = TableName.valueOf(tableName); + this.regionLocator = this.connection.getRegionLocator(this.tableName); + this.autoFlush = autoflush; } - - private HTable getTable() throws IOException { - HTable table = tables.get(); + + private Table getTable() throws IOException { + Table table = tables.get(); if (table == null) { - table = new HTable(conf, tableName) { - @Override - public synchronized void flushCommits() throws RetriesExhaustedWithDetailsException, InterruptedIOException { - super.flushCommits(); - } - }; - table.setAutoFlushTo(autoFlush); - pool.add(table); //keep track + table = connection.getTable(tableName); +// table.setAutoFlushTo(autoFlush); + tPool.add(table); //keep track tables.set(table); } return table; } - - @Override + + private ConcurrentLinkedQueue<Mutation> getBuffer() throws IOException { + ConcurrentLinkedQueue<Mutation> buffer = buffers.get(); + if (buffer == null) { +// BufferedMutatorParams params = new BufferedMutatorParams(this.tableName).listener(listener); +// buffer = connection.getBufferedMutator(this.tableName); + buffer = new ConcurrentLinkedQueue<>(); + bPool.add(buffer); + buffers.set(buffer); + } + return buffer; + } + + public void flushCommits() throws IOException { + BufferedMutator bufMutator = connection.getBufferedMutator(this.tableName); + for (ConcurrentLinkedQueue<Mutation> buffer : bPool) { + for (Mutation m: buffer) { + bufMutator.mutate(m); + bufMutator.flush(); + } + } + bufMutator.close(); + } + public void close() throws IOException { // Flush and close all instances. // (As an extra safeguard one might employ a shared variable i.e. 'closed' // in order to prevent further table creation but for now we assume that // once close() is called, clients are no longer using it). - for (HTable table : pool) { - table.flushCommits(); + flushCommits(); + + for (Table table : tPool) { table.close(); } } - @Override - public byte[] getTableName() { - return tableName.getName(); - } - - @Override public Configuration getConfiguration() { return conf; } - @Override - public boolean isAutoFlush() { - return autoFlush; - } - /** - * getStartEndKeys provided by {@link HTable} but not {@link HTableInterface}. - * @see HTable#getStartEndKeys() + * getStartEndKeys provided by {@link HRegionLocation}. + * @see RegionLocator#getStartEndKeys() */ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { - return getTable().getStartEndKeys(); + return regionLocator.getStartEndKeys(); } /** - * getRegionLocation provided by {@link HTable} but not - * {@link HTableInterface}. - * @see HTable#getRegionLocation(byte[]) + * getRegionLocation provided by {@link HRegionLocation} + * @see RegionLocator#getRegionLocation(byte[]) */ public HRegionLocation getRegionLocation(final byte[] bs) throws IOException { - return getTable().getRegionLocation(bs); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return getTable().getTableDescriptor(); + return regionLocator.getRegionLocation(bs); } - @Override public boolean exists(Get get) throws IOException { return getTable().exists(get); } - @Override + public boolean[] existsAll(List<Get> list) throws IOException { + return getTable().existsAll(list); + } + public Result get(Get get) throws IOException { return getTable().get(get); } - @Override public Result[] get(List<Get> gets) throws IOException { return getTable().get(gets); } - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - return getTable().getRowOrBefore(row, family); - } - - @Override public ResultScanner getScanner(Scan scan) throws IOException { return getTable().getScanner(scan); } - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - return getTable().getScanner(family); - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) - throws IOException { - return getTable().getScanner(family, qualifier); - } - - @Override public void put(Put put) throws IOException { - getTable().put(put); + getBuffer().add(put); +// getBuffer().flush(); +// getTable().put(put); } - @Override +// @Override public void put(List<Put> puts) throws IOException { - getTable().put(puts); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException { - return getTable().checkAndPut(row, family, qualifier, value, put); +// getTable().put(puts); + getBuffer().addAll(puts); +// getBuffer().flush(); } - @Override +// @Override public void delete(Delete delete) throws IOException { - getTable().delete(delete); + getBuffer().add(delete); +// getBuffer().flush(); +// getTable().delete(delete); } - @Override +// @Override public void delete(List<Delete> deletes) throws IOException { - getTable().delete(deletes); - - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException { - return getTable().checkAndDelete(row, family, qualifier, value, delete); - } - - @Override - public Result increment(Increment increment) throws IOException { - return getTable().increment(increment); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) throws IOException { - return getTable().incrementColumnValue(row, family, qualifier, amount); - } - - @Deprecated - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, boolean writeToWAL) throws IOException { - return getTable().incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - } - - @Override - public void flushCommits() throws IOException { - for (HTable table : pool) { - table.flushCommits(); - } - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) - throws IOException, InterruptedException { - getTable().batch(actions, results); - - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - getTable().mutateRow(rm); - } - - @Override - public Result append(Append append) throws IOException { - return getTable().append(append); - } - - @Override - public void setAutoFlush(boolean autoFlush) { - // TODO Auto-generated method stub - +// getTable().delete(deletes); + getBuffer().addAll(deletes); +// getBuffer().flush(); } - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail){ - // TODO Auto-generated method stub - } - - @Override - public long getWriteBufferSize() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - // TODO Auto-generated method stub - } - - @Override public TableName getName() { return tableName; } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - return getTable().exists(gets); - } - - @Override - public <R> void - batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback) - throws IOException, InterruptedException { - getTable().batchCallback(actions, results, callback); - - } - - @Deprecated - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> callback) - throws IOException, InterruptedException { - return getTable().batchCallback(actions, callback); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, - Durability durability) throws IOException { - return getTable().incrementColumnValue(row, family, qualifier, amount,durability); - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - // TODO Auto-generated method stub - return null; - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, - byte[] startKey, byte[] endKey, Call<T, R> callable) throws Throwable { - return getTable().coprocessorService(service, startKey, endKey, callable); - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, - byte[] endKey, Call<T, R> callable, Callback<R> callback) throws Throwable { - getTable().coprocessorService(service, startKey, endKey, callable, callback); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - // TODO Auto-generated method stub - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, - R responsePrototype) throws Throwable { - return getTable().batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); - } - - @Override - public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, - Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) - throws Throwable { - getTable().batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); - - } - - @Deprecated - @Override - public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { - return getTable().batch(actions); - } - - @Override - public boolean checkAndMutate(byte[] arg0, byte[] arg1, byte[] arg2, CompareOp arg3, byte[] arg4, - RowMutations arg5) throws IOException { - // TODO Auto-generated method stub - return false; - } } http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/test/conf/hbase-site.xml ---------------------------------------------------------------------- diff --git a/gora-hbase/src/test/conf/hbase-site.xml b/gora-hbase/src/test/conf/hbase-site.xml index 51e1346..0b506e2 100644 --- a/gora-hbase/src/test/conf/hbase-site.xml +++ b/gora-hbase/src/test/conf/hbase-site.xml @@ -138,4 +138,8 @@ <value>localhost</value> <description>The directory shared by region servers.</description> </property> + <property> + <name>zookeeper.session.timeout</name> + <value>60000</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/gora/blob/01856b56/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java ---------------------------------------------------------------------- diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java index 41be132..87101ed 100644 --- a/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java +++ b/gora-hbase/src/test/java/org/apache/gora/hbase/store/TestHBaseStore.java @@ -26,9 +26,8 @@ import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.DataStoreTestBase; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Ignore; import org.junit.Test; @@ -87,8 +86,10 @@ public class TestHBaseStore extends DataStoreTestBase { } @Override - public void assertPutArray() throws IOException { - HTable table = new HTable(conf,"WebPage"); + public void assertPutArray() throws IOException { + Connection conn = ConnectionFactory.createConnection(conf); + TableName webPageTab = TableName.valueOf("WebPage"); + Table table = conn.getTable(webPageTab); Get get = new Get(Bytes.toBytes("com.example/http")); org.apache.hadoop.hbase.client.Result result = table.get(get); @@ -100,8 +101,7 @@ public class TestHBaseStore extends DataStoreTestBase { ,Bytes.toBytes(3)), Bytes.toBytes("example.com"))); table.close(); } - - + /** * Asserts that writing bytes actually works at low level in HBase. * Checks writing null unions too. @@ -110,7 +110,10 @@ public class TestHBaseStore extends DataStoreTestBase { public void assertPutBytes(byte[] contentBytes) throws IOException { // Check first the parameter "contentBytes" if written+read right. - HTable table = new HTable(conf,"WebPage"); + Connection conn = ConnectionFactory.createConnection(conf); + TableName webPageTab = TableName.valueOf("WebPage"); + Table table = conn.getTable(webPageTab); + Get get = new Get(Bytes.toBytes("com.example/http")); org.apache.hadoop.hbase.client.Result result = table.get(get); @@ -131,7 +134,7 @@ public class TestHBaseStore extends DataStoreTestBase { page = webPageStore.get("com.example/http") ; assertNull(page.getContent()) ; // Check directly with HBase - table = new HTable(conf,"WebPage"); + table = conn.getTable(webPageTab); get = new Get(Bytes.toBytes("com.example/http")); result = table.get(get); actualBytes = result.getValue(Bytes.toBytes("content"), null); @@ -148,6 +151,8 @@ public class TestHBaseStore extends DataStoreTestBase { page = webPageStore.get("com.example/http") ; assertTrue(Arrays.equals("".getBytes(Charset.defaultCharset()),page.getContent().array())) ; // Check directly with HBase + + table = new HTable(conf,"WebPage"); get = new Get(Bytes.toBytes("com.example/http")); result = table.get(get); @@ -206,7 +211,9 @@ public class TestHBaseStore extends DataStoreTestBase { webPageStore.flush() ; // Read directly from HBase - HTable table = new HTable(conf,"WebPage"); + Connection conn = ConnectionFactory.createConnection(conf); + TableName webPageTab = TableName.valueOf("WebPage"); + Table table = conn.getTable(webPageTab); Get get = new Get(Bytes.toBytes("com.example/http")); org.apache.hadoop.hbase.client.Result result = table.get(get); table.close(); @@ -218,7 +225,9 @@ public class TestHBaseStore extends DataStoreTestBase { @Override public void assertPutMap() throws IOException { - HTable table = new HTable(conf,"WebPage"); + Connection conn = ConnectionFactory.createConnection(conf); + TableName webPageTab = TableName.valueOf("WebPage"); + Table table = conn.getTable(webPageTab); Get get = new Get(Bytes.toBytes("com.example/http")); org.apache.hadoop.hbase.client.Result result = table.get(get);