HBASE-14674 Rpc handler / task monitoring seems to be broken after 0.98 (Heng Chen)
Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68f0fff2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68f0fff2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68f0fff2 Branch: refs/heads/branch-1 Commit: 68f0fff281a7da6935fd328b9a692b77c6f559c3 Parents: 87c97c2 Author: Enis Soztutar <e...@apache.org> Authored: Tue Oct 27 14:24:21 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Oct 27 15:06:38 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 19 ++++--------------- .../hadoop/hbase/ipc/FifoRpcScheduler.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 3 +++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 16 +++++++++++++++- .../apache/hadoop/hbase/ipc/TestCallRunner.java | 4 +++- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 4 ++++ 6 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index ede4b4e..5b52521 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -59,20 +58,22 @@ public class CallRunner { this.rpcServer = rpcServer; // Add size of the call to queue size. this.rpcServer.addCallSize(call.getSize()); - this.status = getStatus(); } public Call getCall() { return call; } + public void setStatus(MonitoredRPCHandler status) { + this.status = status; + } + /** * Cleanup after ourselves... let go of references. */ private void cleanup() { this.call = null; this.rpcServer = null; - this.status = null; } public void run() { @@ -160,16 +161,4 @@ public class CallRunner { cleanup(); } } - - MonitoredRPCHandler getStatus() { - // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. - MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); - if (status != null) { - return status; - } - status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); - status.pause("Waiting for a call"); - RpcServer.MONITORED_RPC.set(status); - return status; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index 8140c1c..621a8ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.ipc.CallRunner; /** * A very simple {@code }RpcScheduler} that serves incoming requests in order. @@ -70,6 +69,7 @@ public class FifoRpcScheduler extends RpcScheduler { executor.submit(new Runnable() { @Override public void run() { + task.setStatus(RpcServer.getStatus()); task.run(); } }); http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 709429d..27750a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -124,7 +125,9 @@ public abstract class RpcExecutor { try { while (running) { try { + MonitoredRPCHandler status = RpcServer.getStatus(); CallRunner task = myQueue.take(); + task.setStatus(status); try { activeHandlerCount.incrementAndGet(); task.run(); http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 72ebda2..af422bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -2300,7 +2301,8 @@ public class RpcServer implements RpcServerInterface { * @param user client user * @param connection incoming connection * @param addr InetAddress of incoming connection - * @throws org.apache.hadoop.security.authorize.AuthorizationException when the client isn't authorized to talk the protocol + * @throws org.apache.hadoop.security.authorize.AuthorizationException + * when the client isn't authorized to talk the protocol */ public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr) throws AuthorizationException { @@ -2484,6 +2486,18 @@ public class RpcServer implements RpcServerInterface { return bsasi == null? null: bsasi.getBlockingService(); } + static MonitoredRPCHandler getStatus() { + // It is ugly the way we park status up in RpcServer. Let it be for now. TODO. + MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get(); + if (status != null) { + return status; + } + status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + status.pause("Waiting for a call"); + RpcServer.MONITORED_RPC.set(status); + return status; + } + /** Returns the remote side ip address when invoked inside an RPC * Returns null incase of an error. * @return InetAddress http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index b328e57..9691602 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -34,6 +35,7 @@ public class TestCallRunner { RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class); mockCall.connection = Mockito.mock(RpcServer.Connection.class); CallRunner cr = new CallRunner(mockRpcServer, mockCall); + cr.setStatus(new MonitoredRPCHandlerImpl()); cr.run(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/68f0fff2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 6645442..fa0727a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; @@ -86,6 +87,7 @@ public class TestSimpleRpcScheduler { scheduler.init(CONTEXT); scheduler.start(); CallRunner task = createMockTask(); + task.setStatus(new MonitoredRPCHandlerImpl()); scheduler.dispatch(task); verify(task, timeout(1000)).run(); scheduler.stop(); @@ -120,6 +122,7 @@ public class TestSimpleRpcScheduler { } }; for (CallRunner task : tasks) { + task.setStatus(new MonitoredRPCHandlerImpl()); doAnswer(answerToRun).when(task).run(); } @@ -302,6 +305,7 @@ public class TestSimpleRpcScheduler { private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results, final int value, final int sleepInterval) { + callTask.setStatus(new MonitoredRPCHandlerImpl()); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocation) {