Repository: hbase Updated Branches: refs/heads/master ac2e1c33f -> aeecd2037
HBASE-11572 Add support for doing get/scans against a particular replica_id (Jeffrey Zhong) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aeecd203 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aeecd203 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aeecd203 Branch: refs/heads/master Commit: aeecd20373bd3810ee8430de3beec81679129614 Parents: ac2e1c3 Author: Enis Soztutar <e...@apache.org> Authored: Tue Aug 19 18:23:09 2014 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Aug 19 18:23:09 2014 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Get.java | 17 --------- .../org/apache/hadoop/hbase/client/Query.java | 39 +++++++++++++++++++- .../RpcRetryingCallerWithReadReplicas.java | 39 ++++++++++++-------- .../org/apache/hadoop/hbase/client/Scan.java | 17 --------- .../hbase/regionserver/TestRegionReplicas.java | 26 +++++++++++++ .../apache/hadoop/hbase/util/LoadTestTool.java | 15 +++++++- .../hadoop/hbase/util/MultiThreadedReader.java | 12 +++++- hbase-shell/src/main/ruby/hbase.rb | 1 + hbase-shell/src/main/ruby/hbase/table.rb | 2 + hbase-shell/src/main/ruby/shell/commands/get.rb | 2 + 10 files changed, 115 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index faccf5e..2d1348f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -77,7 +77,6 @@ public class Get extends Query private boolean closestRowBefore = false; private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR); - private Consistency consistency = null; /** * Create a Get operation for the specified row. @@ -343,22 +342,6 @@ public class Get extends Query } /** - * Returns the consistency level for this operation - * @return the consistency level - */ - public Consistency getConsistency() { - return consistency; - } - - /** - * Sets the consistency level for this operation - * @param consistency the consistency level - */ - public void setConsistency(Consistency consistency) { - this.consistency = consistency; - } - - /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools. http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java index fe141c9..bbfa940 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.security.access.AccessControlConstants; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.visibility.Authorizations; import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; -import org.apache.hadoop.hbase.util.Bytes; - import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; @@ -37,6 +35,8 @@ import com.google.common.collect.ListMultimap; @InterfaceStability.Evolving public abstract class Query extends OperationWithAttributes { protected Filter filter = null; + protected int targetReplicaId = -1; + protected Consistency consistency = Consistency.STRONG; /** * @return Filter @@ -103,4 +103,39 @@ public abstract class Query extends OperationWithAttributes { setAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL, ProtobufUtil.toUsersAndPermissions(permMap).toByteArray()); } + + /** + * Returns the consistency level for this operation + * @return the consistency level + */ + public Consistency getConsistency() { + return consistency; + } + + /** + * Sets the consistency level for this operation + * @param consistency the consistency level + */ + public void setConsistency(Consistency consistency) { + this.consistency = consistency; + } + + /** + * Specify region replica id where Query will fetch data from. Use this together with + * {@link #setConsistency(Consistency)} passing {@link Consistency#TIMELINE} to read data from + * a specific replicaId. + * <br><b> Expert: </b>This is an advanced API exposed. Only use it if you know what you are doing + * @param Id + */ + public void setReplicaId(int Id) { + this.targetReplicaId = Id; + } + + /** + * Returns region replica id where Query will fetch data from. + * @return region replica id or -1 if not set. + */ + public int getReplicaId() { + return this.targetReplicaId; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 1c733b6..6cd422f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -185,27 +185,34 @@ public class RpcRetryingCallerWithReadReplicas { */ public synchronized Result call() throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { - RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, - cConnection, tableName, get.getRow()); + boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); + + RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() + : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size()); - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response + if(isTargetReplicaSpecified) { + addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); + } else { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + throwEnrichedException(e, retries); + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } - } catch (ExecutionException e) { - throwEnrichedException(e, retries); - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + + // submit call for the all of the secondaries at once + addCallsForReplica(cs, rl, 1, rl.size() - 1); } - // submit call for the all of the secondaries at once - addCallsForReplica(cs, rl, 1, rl.size() - 1); try { try { Future<Result> f = cs.take(); http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 85681bf..74bf37f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -136,7 +136,6 @@ public class Scan extends Query { private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; - private Consistency consistency = Consistency.STRONG; /** * Set it true for small scan to get better performance @@ -635,22 +634,6 @@ public class Scan extends Query { } /** - * Returns the consistency level for this operation - * @return the consistency level - */ - public Consistency getConsistency() { - return consistency; - } - - /** - * Sets the consistency level for this operation - * @param consistency the consistency level - */ - public void setConsistency(Consistency consistency) { - this.consistency = consistency; - } - - /** * Compile the table and column family (i.e. schema) information * into a String. Useful for parsing and aggregation by debugging, * logging, and administration tools. http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index a7067f1..d45cda3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TestMetaTableAccessor; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -198,6 +199,31 @@ public class TestRegionReplicas { } } + @Test(timeout = 60000) + public void testGetOnTargetRegionReplica() throws Exception { + try { + //load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); + // assert that we can read back from primary + Assert.assertEquals(1000, HTU.countRows(table)); + // flush so that region replica can read + getRS().getRegionByEncodedName(hriPrimary.getEncodedName()).flushcache(); + + openRegion(hriSecondary); + + // try directly Get against region replica + byte[] row = Bytes.toBytes(String.valueOf(42)); + Get get = new Get(row); + get.setConsistency(Consistency.TIMELINE); + get.setReplicaId(1); + Result result = table.get(get); + Assert.assertArrayEquals(row, result.getValue(f, null)); + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); + closeRegion(hriSecondary); + } + } + private void assertGet(HRegion region, int value, boolean expect) throws IOException { byte[] row = Bytes.toBytes(String.valueOf(value)); Get get = new Get(row); http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index ce3da34..9e7186a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -161,6 +161,10 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_REGION_REPLICATION_USAGE = "Desired number of replicas per region"; + public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; + protected static final String OPT_REGION_REPLICA_ID_USAGE = + "Region replica id to do the reads from"; + protected static final long DEFAULT_START_KEY = 0; /** This will be removed as we factor out the dependency on command line */ @@ -202,7 +206,6 @@ public class LoadTestTool extends AbstractHBaseTool { private int verifyPercent; private int numTables = 1; - private int regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER; private String superUser; @@ -212,6 +215,7 @@ public class LoadTestTool extends AbstractHBaseTool { private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; private int regionReplication = -1; // not set + private int regionReplicaId = -1; // not set // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // console tool itself should only be used from console. @@ -334,6 +338,7 @@ public class LoadTestTool extends AbstractHBaseTool { addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); + addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); } @Override @@ -351,7 +356,7 @@ public class LoadTestTool extends AbstractHBaseTool { if (!isWrite && !isRead && !isUpdate && !isInitOnly) { throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + - "-" + OPT_UPDATE + "-" + OPT_READ + " has to be specified"); + "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified"); } if (isInitOnly && (isRead || isWrite || isUpdate)) { @@ -460,6 +465,11 @@ public class LoadTestTool extends AbstractHBaseTool { if (cmd.hasOption(OPT_REGION_REPLICATION)) { regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); } + + regionReplicaId = -1; + if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { + regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); + } } private void parseColumnFamilyOptions(CommandLine cmd) { @@ -621,6 +631,7 @@ public class LoadTestTool extends AbstractHBaseTool { readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); readerThreads.setMultiGetBatchSize(multiGetBatchSize); + readerThreads.setRegionReplicaId(regionReplicaId); } if (isUpdate && isWrite) { http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index b749e62..cce7130 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; @@ -78,6 +79,7 @@ public class MultiThreadedReader extends MultiThreadedAction private int maxErrors = DEFAULT_MAX_ERRORS; private int keyWindow = DEFAULT_KEY_WINDOW; private int batchSize = DEFAULT_BATCH_SIZE; + private int regionReplicaId = -1; // particular region replica id to do reads against if set public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) throws IOException { @@ -102,6 +104,10 @@ public class MultiThreadedReader extends MultiThreadedAction this.batchSize = batchSize; } + public void setRegionReplicaId(int regionReplicaId) { + this.regionReplicaId = regionReplicaId; + } + @Override public void start(long startKey, long endKey, int numThreads) throws IOException { super.start(startKey, endKey, numThreads); @@ -317,6 +323,10 @@ public class MultiThreadedReader extends MultiThreadedAction } } get = dataGenerator.beforeGet(keyToRead, get); + if (regionReplicaId > 0) { + get.setReplicaId(regionReplicaId); + get.setConsistency(Consistency.TIMELINE); + } if (verbose) { LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } @@ -334,7 +344,7 @@ public class MultiThreadedReader extends MultiThreadedAction public void queryKey(Get get, boolean verify, long keyToRead) throws IOException { // read the data - + long start = System.nanoTime(); // Uses simple get Result result = table.get(get); http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-shell/src/main/ruby/hbase.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index bbff134..ca26137 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -58,6 +58,7 @@ module HBaseConstants SPLITALGO = 'SPLITALGO' NUMREGIONS = 'NUMREGIONS' REGION_REPLICATION = 'REGION_REPLICATION' + REGION_REPLICA_ID = 'REGION_REPLICA_ID' CONFIGURATION = org.apache.hadoop.hbase.HConstants::CONFIGURATION ATTRIBUTES="ATTRIBUTES" VISIBILITY="VISIBILITY" http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-shell/src/main/ruby/hbase/table.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index 14fe1e4..2a350c1 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -288,6 +288,7 @@ EOF attributes = args[ATTRIBUTES] authorizations = args[AUTHORIZATIONS] consistency = args.delete(CONSISTENCY) if args[CONSISTENCY] + replicaId = args.delete(REGION_REPLICA_ID) if args[REGION_REPLICA_ID] unless args.empty? columns = args[COLUMN] || args[COLUMNS] if args[VERSIONS] @@ -346,6 +347,7 @@ EOF end get.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency + get.setReplicaId(replicaId) if replicaId # Call hbase for the results result = @table.get(get) http://git-wip-us.apache.org/repos/asf/hbase/blob/aeecd203/hbase-shell/src/main/ruby/shell/commands/get.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/get.rb b/hbase-shell/src/main/ruby/shell/commands/get.rb index 0035310..1ab13cb 100644 --- a/hbase-shell/src/main/ruby/shell/commands/get.rb +++ b/hbase-shell/src/main/ruby/shell/commands/get.rb @@ -40,6 +40,7 @@ a dictionary of column(s), timestamp, timerange and versions. Examples: hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}} hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']} hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'} + hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1} Besides the default 'toStringBinary' format, 'get' also supports custom formatting by column. A user can define a FORMATTER by adding it to the column name in the get @@ -71,6 +72,7 @@ would be: hbase> t.get 'r1', 'c1', 'c2' hbase> t.get 'r1', ['c1', 'c2'] hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE'} + hbase> t.get 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1} EOF end