HBASE-15816 Provide client with ability to set priority on Operations Signed-off-by: Andrew Purtell <apurt...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec3cb196 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec3cb196 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec3cb196 Branch: refs/heads/HBASE-18426 Commit: ec3cb196641498edfa71c4f9e1bde5bc15acd8ed Parents: 70a357d Author: rgidwani <rgidw...@salesforce.com> Authored: Fri Jul 14 10:18:26 2017 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Jul 21 17:12:16 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Action.java | 8 +++++++ .../org/apache/hadoop/hbase/client/Append.java | 6 +++++ .../hadoop/hbase/client/AsyncProcess.java | 17 +++++++++++--- .../hbase/client/AsyncRequestFutureImpl.java | 2 +- .../client/CancellableRegionServerCallable.java | 4 ++-- .../hbase/client/ClientServiceCallable.java | 5 ++-- .../org/apache/hadoop/hbase/client/Delete.java | 6 +++++ .../org/apache/hadoop/hbase/client/Get.java | 5 ++++ .../org/apache/hadoop/hbase/client/HTable.java | 20 ++++++++-------- .../apache/hadoop/hbase/client/Increment.java | 6 +++++ .../apache/hadoop/hbase/client/MultiAction.java | 12 ++++++++++ .../hbase/client/MultiServerCallable.java | 4 ++-- .../apache/hadoop/hbase/client/Mutation.java | 5 +++- .../client/NoncedRegionServerCallable.java | 4 ++-- .../hbase/client/OperationWithAttributes.java | 12 ++++++++++ .../client/RegionCoprocessorRpcChannel.java | 3 ++- .../hbase/client/RegionServerCallable.java | 11 +++++++++ .../hadoop/hbase/client/RowMutations.java | 8 +++++++ .../RpcRetryingCallerWithReadReplicas.java | 4 ++-- .../org/apache/hadoop/hbase/client/Scan.java | 7 ++++++ .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../hbase/client/SecureBulkLoadClient.java | 7 +++--- .../hadoop/hbase/ipc/HBaseRpcController.java | 2 -- .../hbase/ipc/HBaseRpcControllerImpl.java | 7 +++--- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 3 ++- .../org/apache/hadoop/hbase/HConstants.java | 1 + .../hbase/client/TestRpcControllerFactory.java | 24 ++++++++++++++++++-- ...gionServerBulkLoadWithOldSecureEndpoint.java | 5 ++-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 3 +++ .../hbase/mapreduce/LoadIncrementalHFiles.java | 2 +- .../regionserver/wal/WALEditsReplaySink.java | 2 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 2 +- .../hbase/client/TestReplicaWithCluster.java | 2 +- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 ++ .../TestLoadIncrementalHFilesSplitRecovery.java | 2 +- .../hadoop/hbase/quotas/TestSpaceQuotas.java | 3 ++- .../regionserver/TestHRegionServerBulkLoad.java | 5 ++-- .../TestHRegionServerBulkLoadWithOldClient.java | 5 ++-- 38 files changed, 178 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index ef05912..f4b696a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -32,10 +32,16 @@ public class Action implements Comparable<Action> { private final int originalIndex; private long nonce = HConstants.NO_NONCE; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority; public Action(Row action, int originalIndex) { + this(action, originalIndex, HConstants.PRIORITY_UNSET); + } + + public Action(Row action, int originalIndex, int priority) { this.action = action; this.originalIndex = originalIndex; + this.priority = priority; } /** @@ -70,6 +76,8 @@ public class Action implements Comparable<Action> { return replicaId; } + public int getPriority() { return priority; } + @Override public int compareTo(Action other) { return action.compareTo(other.getAction()); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 346eb0e..02ec770 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -84,6 +84,7 @@ public class Append extends Mutation { for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + this.setPriority(a.getPriority()); } /** Create a Append operation for the specified row. @@ -184,6 +185,11 @@ public class Append extends Mutation { } @Override + public Append setPriority(int priority) { + return (Append) super.setPriority(priority); + } + + @Override public Append setTTL(long ttl) { return (Append) super.setTTL(ttl); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 22efdaa..8693b3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -291,7 +291,12 @@ class AsyncProcess { LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. - retainedActions.add(new Action(r, ++posInList)); + + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + retainedActions.add(new Action(r, ++posInList, priority)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); @@ -302,7 +307,11 @@ class AsyncProcess { break; } if (code == ReturnCode.INCLUDE) { - Action action = new Action(r, ++posInList); + int priority = HConstants.NORMAL_QOS; + if (r instanceof Mutation) { + priority = ((Mutation) r).getPriority(); + } + Action action = new Action(r, ++posInList, priority); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path @@ -372,6 +381,7 @@ class AsyncProcess { // The position will be used by the processBatch to match the object array returned. int posInList = -1; NonceGenerator ng = this.connection.getNonceGenerator(); + int highestPriority = HConstants.PRIORITY_UNSET; for (Row r : rows) { posInList++; if (r instanceof Put) { @@ -379,8 +389,9 @@ class AsyncProcess { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } + highestPriority = Math.max(put.getPriority(), highestPriority); } - Action action = new Action(r, posInList); + Action action = new Action(r, posInList, highestPriority); setNonce(ng, r, action); actions.add(action); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 710ec91..5a5a3e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { private MultiServerCallable createCallable(final ServerName server, TableName tableName, final MultiAction multi) { return new MultiServerCallable(asyncProcess.connection, tableName, server, - multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker); + multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index a0ff900..c0e64e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable< private final RetryingTimeTracker tracker; private final int rpcTimeout; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, row, rpcController); + RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, row, rpcController, priority); this.rpcTimeout = rpcTimeout; this.tracker = tracker; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java index 5fa8de1..00e9558 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java @@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; @InterfaceAudience.Private public abstract class ClientServiceCallable<T> extends RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> { + public ClientServiceCallable(Connection connection, TableName tableName, byte [] row, - RpcController rpcController) { - super(connection, tableName, row, rpcController); + RpcController rpcController, int priority) { + super(connection, tableName, row, rpcController, priority); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 0b3769d..351d8a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> { for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(d.getPriority()); } /** @@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable<Row> { public Delete setTTL(long ttl) { throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported"); } + + @Override + public Delete setPriority(int priority) { + return (Delete) super.setPriority(priority); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index c3ddc4b..b774a9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -127,6 +127,7 @@ public class Get extends Query TimeRange tr = entry.getValue(); setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } + super.setPriority(get.getPriority()); } /** @@ -552,4 +553,8 @@ public class Get extends Query return (Get) super.setIsolationLevel(level); } + @Override + public Get setPriority(int priority) { + return (Get) super.setPriority(priority); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a48b9e0..c0d321b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -415,7 +415,7 @@ public class HTable implements Table { if (get.getConsistency() == Consistency.STRONG) { final Get configuredGet = get; ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(), - get.getRow(), this.rpcControllerFactory.newController()) { + get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) { @Override protected Result rpcCall() throws Exception { ClientProtos.GetRequest request = RequestConverter.buildGetRequest( @@ -547,7 +547,7 @@ public class HTable implements Table { CancellableRegionServerCallable<SingleResponse> callable = new CancellableRegionServerCallable<SingleResponse>( connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(), - writeRpcTimeout, new RetryingTimeTracker().start()) { + writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -624,7 +624,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable<MultiResponse> callable = new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), - rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){ + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){ @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -668,7 +668,7 @@ public class HTable implements Table { checkHasFamilies(append); NoncedRegionServerCallable<Result> callable = new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), append.getPriority()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -690,7 +690,7 @@ public class HTable implements Table { checkHasFamilies(increment); NoncedRegionServerCallable<Result> callable = new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), increment.getPriority()) { @Override protected Result rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -734,7 +734,7 @@ public class HTable implements Table { NoncedRegionServerCallable<Long> callable = new NoncedRegionServerCallable<Long>(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( @@ -758,7 +758,7 @@ public class HTable implements Table { final Put put) throws IOException { ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Boolean rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -782,7 +782,7 @@ public class HTable implements Table { throws IOException { ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Boolean rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -817,7 +817,7 @@ public class HTable implements Table { CancellableRegionServerCallable<SingleResponse> callable = new CancellableRegionServerCallable<SingleResponse>( this.connection, getName(), row, this.rpcControllerFactory.newController(), - writeRpcTimeout, new RetryingTimeTracker().start()) { + writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -858,7 +858,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable<MultiResponse> callable = new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), - rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 4ba0efa..d323555 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable<Row> { for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) { this.setAttribute(entry.getKey(), entry.getValue()); } + super.setPriority(i.getPriority()); } /** @@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable<Row> { public Increment setTTL(long ttl) { return (Increment) super.setTTL(ttl); } + + @Override + public Increment setPriority(int priority) { + return (Increment) super.setPriority(priority); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index a4aa71d..bcec395 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Predicate; +import com.google.common.collect.Iterables; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -103,4 +108,11 @@ public final class MultiAction { public long getNonceGroup() { return this.nonceGroup; } + + // returns the max priority of all the actions + public int getPriority() { + Optional<Action> result = actions.values().stream().flatMap(List::stream) + .max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority())); + return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 64dada0..33c9a0b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse> MultiServerCallable(final ClusterConnection connection, final TableName tableName, final ServerName location, final MultiAction multi, RpcController rpcController, - int rpcTimeout, RetryingTimeTracker tracker) { - super(connection, tableName, null, rpcController, rpcTimeout, tracker); + int rpcTimeout, RetryingTimeTracker tracker, int priority) { + super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java index f6cb4b1..3b60497 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java @@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C // familyMap ClassSize.REFERENCE + // familyMap - ClassSize.TREEMAP); + ClassSize.TREEMAP + + // priority + ClassSize.INTEGER + ); /** * The attribute for storing the list of clusters that have consumed the change. http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 52ed263..5dc19f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl * @param row The row we want in <code>tableName</code>. */ public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row, - HBaseRpcController rpcController) { - super(connection, tableName, row, rpcController); + HBaseRpcController rpcController, int priority) { + super(connection, tableName, row, rpcController, priority); this.nonce = getConnection().getNonceGenerator().newNonce(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java index ba21cbb..1fb691a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri // used for uniquely identifying an operation public static final String ID_ATRIBUTE = "_operation.attributes.id"; + private int priority = HConstants.PRIORITY_UNSET; @Override public OperationWithAttributes setAttribute(String name, byte[] value) { @@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri byte[] attr = getAttribute(ID_ATRIBUTE); return attr == null? null: Bytes.toString(attr); } + + public OperationWithAttributes setPriority(int priority) { + this.priority = priority; + return this; + } + + public int getPriority() { + return priority; + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java index 3b10549..df7d74f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; @@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { } ClientServiceCallable<CoprocessorServiceResponse> callable = new ClientServiceCallable<CoprocessorServiceResponse>(this.conn, - this.table, this.row, this.conn.getRpcControllerFactory().newController()) { + this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) { @Override protected CoprocessorServiceResponse rpcCall() throws Exception { byte [] regionName = getLocation().getRegionInfo().getRegionName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index fb593a3..499685d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -66,6 +67,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> * Can be null! */ protected final RpcController rpcController; + private int priority = HConstants.NORMAL_QOS; /** * @param connection Connection to use. @@ -75,11 +77,17 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> */ public RegionServerCallable(Connection connection, TableName tableName, byte [] row, RpcController rpcController) { + this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS); + } + + public RegionServerCallable(Connection connection, TableName tableName, byte [] row, + RpcController rpcController, int priority) { super(); this.connection = connection; this.tableName = tableName; this.row = row; this.rpcController = rpcController; + this.priority = priority; } protected RpcController getRpcController() { @@ -111,6 +119,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> // If it is an instance of HBaseRpcController, we can set priority on the controller based // off the tableName. Set call timeout too. hrc.setPriority(tableName); + hrc.setPriority(priority); hrc.setCallTimeout(callTimeout); } } @@ -172,6 +181,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> return this.row; } + protected int getPriority() { return this.priority;} + public void throwable(Throwable t, boolean retrying) { if (location != null) { getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java index a9384ac..a6d6d39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java @@ -118,4 +118,12 @@ public class RowMutations implements Row { public List<Mutation> getMutations() { return Collections.unmodifiableList(mutations); } + + public int getMaxPriority() { + int maxPriority = Integer.MIN_VALUE; + for (Mutation mutation : mutations) { + maxPriority = Math.max(maxPriority, mutation.getPriority()); + } + return maxPriority; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index b5cddde..3cd9b2f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), - rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker()); + rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET); this.id = id; this.location = location; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 639f43e..e84716f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -276,6 +276,7 @@ public class Scan extends Query { this.mvccReadPoint = scan.getMvccReadPoint(); this.limit = scan.getLimit(); this.needCursorResult = scan.isNeedCursorResult(); + setPriority(scan.getPriority()); } /** @@ -306,6 +307,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = -1L; + setPriority(get.getPriority()); } public boolean isGetScan() { @@ -1060,6 +1062,11 @@ public class Scan extends Query { return (Scan) super.setIsolationLevel(level); } + @Override + public Scan setPriority(int priority) { + return (Scan) super.setPriority(priority); + } + /** * Enable collection of {@link ScanMetrics}. For advanced users. * @param enabled Set to true to enable accumulating scan metrics http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 4227e41..bb8b185 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController()); + super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority()); this.id = id; this.scan = scan; this.scanMetrics = scanMetrics; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index c8d9738..aa9f645 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.security.token.Token; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; + /** * Client proxy for SecureBulkLoadProtocol */ @@ -56,7 +57,7 @@ public class SecureBulkLoadClient { try { ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW, - this.rpcControllerFactory.newController()) { + this.rpcControllerFactory.newController(), PRIORITY_UNSET) { @Override protected String rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -79,7 +80,7 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { try { ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, - table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) { + table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 71ce70a..b925330 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface HBaseRpcController extends RpcController, CellScannable { - static final int PRIORITY_UNSET = -1; - /** * Only used to send cells to rpc server, the returned cells should be set by * {@link #setDone(CellScanner)}. http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 8ceac64..64d91f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { * This is the ordained way of setting priorities going forward. We will be undoing the old * annotation-based mechanism. */ - private int priority = PRIORITY_UNSET; + private int priority = HConstants.PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public void setPriority(int priority) { - this.priority = priority; + this.priority = Math.max(this.priority, priority); + } @Override @@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { @Override public int getPriority() { - return priority; + return priority < 0 ? HConstants.NORMAL_QOS : priority; } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 6dab3b5..e0636eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -111,7 +112,7 @@ class IPCUtil { builder.setCellBlockMeta(cellBlockMeta); } // Only pass priority if there is one set. - if (call.priority != HBaseRpcController.PRIORITY_UNSET) { + if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } builder.setTimeout(call.timeout); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index dfc140b..54e0eb8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1113,6 +1113,7 @@ public final class HConstants { * handled by high priority handlers. */ // normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS + public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int REPLICATION_QOS = 5; public static final int REPLAY_QOS = 6; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index a7709ee..848934c 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -28,6 +28,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset; +import org.apache.curator.shaded.com.google.common.collect.Multiset; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; @@ -76,6 +78,7 @@ public class TestRpcControllerFactory { public static class CountingRpcController extends DelegatingHBaseRpcController { + private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create(); private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); @@ -85,8 +88,13 @@ public class TestRpcControllerFactory { @Override public void setPriority(int priority) { + int oldPriority = getPriority(); super.setPriority(priority); - INT_PRIORITY.incrementAndGet(); + int newPriority = getPriority(); + if (newPriority != oldPriority) { + INT_PRIORITY.incrementAndGet(); + GROUPED_PRIORITY.add(priority); + } } @Override @@ -196,6 +204,14 @@ public class TestRpcControllerFactory { scanInfo.setSmall(false); counter = doScan(table, scanInfo, counter + 1); + // make sure we have no priority count + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0); + // lets set a custom priority on a get + Get get = new Get(row); + get.setPriority(HConstants.ADMIN_QOS); + table.get(get); + verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1); + table.close(); connection.close(); } @@ -208,11 +224,15 @@ public class TestRpcControllerFactory { } int verifyCount(Integer counter) { - assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue()); + assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter); assertEquals(0, CountingRpcController.INT_PRIORITY.get()); return CountingRpcController.TABLE_PRIORITY.get() + 1; } + void verifyPriorityGroupCount(int priorityLevel, int count) { + assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel)); + } + @Test public void testFallbackToDefaultRpcControllerFactory() { Configuration conf = new Configuration(UTIL.getConfiguration()); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 2c38662..0d5c993 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.TableName; @@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + @@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row " http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 2ee2d7e..900861b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs public boolean dispatch(CallRunner callTask) throws InterruptedException { RpcCall call = callTask.getRpcCall(); int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser()); + if (level == HConstants.PRIORITY_UNSET) { + level = HConstants.NORMAL_QOS; + } if (priorityExecutor != null && level > highPriorityLevel) { return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 4191aa8..7b4a353 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } return new ClientServiceCallable<byte[]>(conn, - tableName, first, rpcControllerFactory.newController()) { + tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected byte[] rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index f451207..c616a01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -183,7 +183,7 @@ public class WALEditsReplaySink { ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory, final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) { super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY, - rpcControllerFactory.newController()); + rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET); this.entries = entries; setLocation(regionLoc); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 037a538..1ef6c60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -668,7 +668,7 @@ public class TestHCM { TEST_UTIL.createTable(tableName, FAM_NAM); ClientServiceCallable<Object> regionServerCallable = new ClientServiceCallable<Object>( TEST_UTIL.getConnection(), tableName, ROW, - new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Object rpcCall() throws Exception { return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 437afaf..898f629 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -475,7 +475,7 @@ public class TestReplicaWithCluster { new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0), - new RpcControllerFactory(HTU.getConfiguration()).newController()) { + new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 1af0d88..8ef666d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -494,6 +494,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); @@ -504,6 +505,7 @@ public class TestHeapSize { expected = ClassSize.estimateBase(cl, false); //The actual TreeMap is not included in the above calculation expected += ClassSize.align(ClassSize.TREEMAP); + expected += ClassSize.align(ClassSize.INTEGER); // priority if (expected != actual) { ClassSize.estimateBase(cl, true); assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 32ebbd2..e1aa137 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>( conn, tableName, first, new RpcControllerFactory( - util.getConfiguration()).newController()) { + util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override public byte[] rpcCall() throws Exception { throw new IOException("Error calling something on RegionServer"); http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index 83108c6..9f6c9f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -445,7 +446,7 @@ public class TestSpaceQuotas { Table table = conn.getTable(tn); final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn); return new ClientServiceCallable<Void>(conn, - tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) { + tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) { @Override public Void rpcCall() throws Exception { SecureBulkLoadClient secureClient = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index c17234e..b5304f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad { prepareBulkLoad(conn); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), - new RpcControllerFactory(UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override public Void rpcCall() throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " @@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"), - new RpcControllerFactory(UTIL.getConfiguration()).newController()) { + new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row " http://git-wip-us.apache.org/repos/asf/hbase/blob/ec3cb196/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java index 2a1655d..7f486e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; import org.apache.hadoop.hbase.TableName; @@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController()) { + Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.info("Non-secure old client"); @@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! callable = new ClientServiceCallable<Void>(conn, tableName, - Bytes.toBytes("aaa"), rpcControllerFactory.newController()) { + Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { @Override protected Void rpcCall() throws Exception { LOG.debug("compacting " + getLocation() + " for row "