Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 5306ceb1a -> fc733bd31


HBASE-15593 Time limit of scanning should be offered by client (Phil Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc733bd3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc733bd3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc733bd3

Branch: refs/heads/branch-1.3
Commit: fc733bd31fb7d9646187048a8df5bd5a6f88c8f2
Parents: 5306ceb
Author: stack <st...@apache.org>
Authored: Tue May 17 12:51:03 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue May 17 12:52:06 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/RpcRetryingCaller.java  |   2 +-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |   3 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |   1 +
 .../hbase/protobuf/generated/RPCProtos.java     | 106 +++++++++++++++++--
 hbase-protocol/src/main/protobuf/RPC.proto      |   1 +
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   2 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  43 ++++++--
 .../hadoop/hbase/ipc/RpcServerInterface.java    |   5 +
 .../hbase/regionserver/RSRpcServices.java       |   9 ++
 .../TestScannerHeartbeatMessages.java           |  19 ++--
 10 files changed, 162 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index fd74161..76261b2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -57,7 +57,7 @@ public class RpcRetryingCaller<T> {
   /**
    * Start and end times for a single call.
    */
-  private final static int MIN_RPC_TIMEOUT = 2000;
+  private final static int MIN_RPC_TIMEOUT = 1;
   /** How many retries are allowed before we start to log */
   private final int startLogErrorsCnt;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 37ed413..9ab17f5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -413,7 +413,8 @@ public class AsyncRpcChannel {
       if (call.controller.getPriority() != 
PayloadCarryingRpcController.PRIORITY_UNSET) {
         requestHeaderBuilder.setPriority(call.controller.getPriority());
       }
-
+      requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
+          Integer.MAX_VALUE : (int)call.rpcTimeout);
       RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
 
       int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index e1821e5..ef500d6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -903,6 +903,7 @@ public class RpcClientImpl extends AbstractRpcClient {
       if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
         builder.setPriority(priority);
       }
+      builder.setTimeout(call.timeout);
       RequestHeader header = builder.build();
 
       setupIOstreams();

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
index 2d4a430..d05eb57 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
@@ -3895,6 +3895,16 @@ public final class RPCProtos {
      * </pre>
      */
     int getPriority();
+
+    // optional uint32 timeout = 7;
+    /**
+     * <code>optional uint32 timeout = 7;</code>
+     */
+    boolean hasTimeout();
+    /**
+     * <code>optional uint32 timeout = 7;</code>
+     */
+    int getTimeout();
   }
   /**
    * Protobuf type {@code hbase.pb.RequestHeader}
@@ -3997,6 +4007,11 @@ public final class RPCProtos {
               priority_ = input.readUInt32();
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              timeout_ = input.readUInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4210,6 +4225,22 @@ public final class RPCProtos {
       return priority_;
     }
 
+    // optional uint32 timeout = 7;
+    public static final int TIMEOUT_FIELD_NUMBER = 7;
+    private int timeout_;
+    /**
+     * <code>optional uint32 timeout = 7;</code>
+     */
+    public boolean hasTimeout() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional uint32 timeout = 7;</code>
+     */
+    public int getTimeout() {
+      return timeout_;
+    }
+
     private void initFields() {
       callId_ = 0;
       traceInfo_ = 
org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo.getDefaultInstance();
@@ -4217,6 +4248,7 @@ public final class RPCProtos {
       requestParam_ = false;
       cellBlockMeta_ = 
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
       priority_ = 0;
+      timeout_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4248,6 +4280,9 @@ public final class RPCProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeUInt32(6, priority_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeUInt32(7, timeout_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4281,6 +4316,10 @@ public final class RPCProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt32Size(6, priority_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(7, timeout_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4334,6 +4373,11 @@ public final class RPCProtos {
         result = result && (getPriority()
             == other.getPriority());
       }
+      result = result && (hasTimeout() == other.hasTimeout());
+      if (hasTimeout()) {
+        result = result && (getTimeout()
+            == other.getTimeout());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4371,6 +4415,10 @@ public final class RPCProtos {
         hash = (37 * hash) + PRIORITY_FIELD_NUMBER;
         hash = (53 * hash) + getPriority();
       }
+      if (hasTimeout()) {
+        hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
+        hash = (53 * hash) + getTimeout();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4506,6 +4554,8 @@ public final class RPCProtos {
         bitField0_ = (bitField0_ & ~0x00000010);
         priority_ = 0;
         bitField0_ = (bitField0_ & ~0x00000020);
+        timeout_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -4566,6 +4616,10 @@ public final class RPCProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.priority_ = priority_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.timeout_ = timeout_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4602,6 +4656,9 @@ public final class RPCProtos {
         if (other.hasPriority()) {
           setPriority(other.getPriority());
         }
+        if (other.hasTimeout()) {
+          setTimeout(other.getTimeout());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -5124,6 +5181,39 @@ public final class RPCProtos {
         return this;
       }
 
+      // optional uint32 timeout = 7;
+      private int timeout_ ;
+      /**
+       * <code>optional uint32 timeout = 7;</code>
+       */
+      public boolean hasTimeout() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional uint32 timeout = 7;</code>
+       */
+      public int getTimeout() {
+        return timeout_;
+      }
+      /**
+       * <code>optional uint32 timeout = 7;</code>
+       */
+      public Builder setTimeout(int value) {
+        bitField0_ |= 0x00000040;
+        timeout_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional uint32 timeout = 7;</code>
+       */
+      public Builder clearTimeout() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        timeout_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.RequestHeader)
     }
 
@@ -6141,17 +6231,17 @@ public final class RPCProtos {
       "llBlockMeta\022\016\n\006length\030\001 \001(\r\"|\n\021Exception" +
       "Response\022\034\n\024exception_class_name\030\001 \001(\t\022\023",
       "\n\013stack_trace\030\002 \001(\t\022\020\n\010hostname\030\003 
\001(\t\022\014\n" +
-      "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 
\001(\010\"\270\001\n\rRe" +
+      "\004port\030\004 \001(\005\022\024\n\014do_not_retry\030\005 
\001(\010\"\311\001\n\rRe" +
       "questHeader\022\017\n\007call_id\030\001 \001(\r\022&\n\ntrace_in" +
       "fo\030\002 \001(\0132\022.hbase.pb.RPCTInfo\022\023\n\013method_n" +
       "ame\030\003 \001(\t\022\025\n\rrequest_param\030\004 
\001(\010\0220\n\017cell" +
       "_block_meta\030\005 \001(\0132\027.hbase.pb.CellBlockMe" +
-      "ta\022\020\n\010priority\030\006 
\001(\r\"\203\001\n\016ResponseHeader\022" +
-      "\017\n\007call_id\030\001 \001(\r\022.\n\texception\030\002 
\001(\0132\033.hb" +
-      "ase.pb.ExceptionResponse\0220\n\017cell_block_m" +
-      "eta\030\003 \001(\0132\027.hbase.pb.CellBlockMetaB<\n*or",
-      "g.apache.hadoop.hbase.protobuf.generated" +
-      "B\tRPCProtosH\001\240\001\001"
+      "ta\022\020\n\010priority\030\006 \001(\r\022\017\n\007timeout\030\007 
\001(\r\"\203\001" +
+      "\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022.\n\texc" +
+      "eption\030\002 \001(\0132\033.hbase.pb.ExceptionRespons" +
+      "e\0220\n\017cell_block_meta\030\003 \001(\0132\027.hbase.pb.Ce",
+      "llBlockMetaB<\n*org.apache.hadoop.hbase.p" +
+      "rotobuf.generatedB\tRPCProtosH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6187,7 +6277,7 @@ public final class RPCProtos {
           internal_static_hbase_pb_RequestHeader_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_RequestHeader_descriptor,
-              new java.lang.String[] { "CallId", "TraceInfo", "MethodName", 
"RequestParam", "CellBlockMeta", "Priority", });
+              new java.lang.String[] { "CallId", "TraceInfo", "MethodName", 
"RequestParam", "CellBlockMeta", "Priority", "Timeout", });
           internal_static_hbase_pb_ResponseHeader_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_hbase_pb_ResponseHeader_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-protocol/src/main/protobuf/RPC.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/RPC.proto 
b/hbase-protocol/src/main/protobuf/RPC.proto
index 59bb03d..8413d25 100644
--- a/hbase-protocol/src/main/protobuf/RPC.proto
+++ b/hbase-protocol/src/main/protobuf/RPC.proto
@@ -125,6 +125,7 @@ message RequestHeader {
   // 0 is NORMAL priority.  200 is HIGH.  If no priority, treat it as NORMAL.
   // See HConstants.
   optional uint32 priority = 6;
+  optional uint32 timeout = 7;
 }
 
 message ResponseHeader {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 3514245..00e08c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -114,7 +114,7 @@ public class CallRunner {
         }
         // make the call
         resultPair = this.rpcServer.call(call.service, call.md, call.param, 
call.cellScanner,
-          call.timestamp, this.status);
+          call.timestamp, this.status, call.timeout);
       } catch (Throwable e) {
         RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + 
call.toShortString(), e);
         errorThrowable = e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a909352..a359a1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -266,6 +266,13 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
   private static final String WARN_RESPONSE_TIME = 
"hbase.ipc.warn.response.time";
   private static final String WARN_RESPONSE_SIZE = 
"hbase.ipc.warn.response.size";
 
+  /**
+   * Minimum allowable timeout (in milliseconds) in rpc request's header. This
+   * configuration exists to prevent the rpc service regarding this request as 
timeout immediately.
+   */
+  private static final String MIN_CLIENT_REQUEST_TIMEOUT = 
"hbase.ipc.min.client.request.timeout";
+  private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
+
   /** Default value for above params */
   private static final int DEFAULT_MAX_REQUEST_SIZE = 
DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
@@ -276,6 +283,9 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
   private final int maxRequestSize;
   private final int warnResponseTime;
   private final int warnResponseSize;
+
+  private final int minClientRequestTimeout;
+
   private final Server server;
   private final List<BlockingServiceAndInterface> services;
 
@@ -304,6 +314,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     protected Connection connection;              // connection to client
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
+    protected int timeout;
     /**
      * Chain of buffers to send as response.
      */
@@ -326,7 +337,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
         justification="Can't figure why this complaint is happening... see 
below")
     Call(int id, final BlockingService service, final MethodDescriptor md, 
RequestHeader header,
          Message param, CellScanner cellScanner, Connection connection, 
Responder responder,
-         long size, TraceInfo tinfo, final InetAddress remoteAddress) {
+         long size, TraceInfo tinfo, final InetAddress remoteAddress, int 
timeout) {
       this.id = id;
       this.service = service;
       this.md = md;
@@ -344,6 +355,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       this.remoteAddress = remoteAddress;
       this.retryImmediatelySupported =
           connection == null? null: connection.retryImmediatelySupported;
+      this.timeout = timeout;
     }
 
     /**
@@ -1261,13 +1273,13 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     // Fake 'call' for failed authorization response
     private static final int AUTHORIZATION_FAILED_CALLID = -1;
     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, 
null, null, null,
-        null, null, this, null, 0, null, null);
+        null, null, this, null, 0, null, null, 0);
     private ByteArrayOutputStream authFailedResponse =
         new ByteArrayOutputStream();
     // Fake 'call' for SASL context setup
     private static final int SASL_CALLID = -33;
     private final Call saslCall = new Call(SASL_CALLID, null, null, null, 
null, null, this, null,
-        0, null, null);
+        0, null, null, 0);
 
     // was authentication allowed with a fallback to simple auth
     private boolean authenticatedWithFallback;
@@ -1685,7 +1697,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
 
     private int doBadPreambleHandling(final String msg, final Exception e) 
throws IOException {
       LOG.warn(msg);
-      Call fakeCall = new Call(-1, null, null, null, null, null, this, 
responder, -1, null, null);
+      Call fakeCall = new Call(-1, null, null, null, null, null, this, 
responder, -1, null, null,0);
       setupResponse(null, fakeCall, e, msg);
       responder.doRespond(fakeCall);
       // Returning -1 closes out the connection.
@@ -1860,7 +1872,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
         final Call callTooBig =
           new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null);
+            responder, totalRequestSize, null, null, 0);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
@@ -1909,7 +1921,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
 
         final Call readParamsFailedCall =
           new Call(id, this.service, null, null, null, null, this,
-            responder, totalRequestSize, null, null);
+            responder, totalRequestSize, null, null, 0);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
         setupResponse(responseBuffer, readParamsFailedCall, t,
           msg + "; " + t.getMessage());
@@ -1920,8 +1932,12 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       TraceInfo traceInfo = header.hasTraceInfo()
           ? new TraceInfo(header.getTraceInfo().getTraceId(), 
header.getTraceInfo().getParentId())
           : null;
+      int timeout = 0;
+      if (header.hasTimeout()){
+        timeout = Math.max(minClientRequestTimeout, header.getTimeout());
+      }
       Call call = new Call(id, this.service, md, header, param, cellScanner, 
this, responder,
-              totalRequestSize, traceInfo, this.addr);
+              totalRequestSize, traceInfo, this.addr, timeout);
 
       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
         callQueueSize.add(-1 * call.getSize());
@@ -2078,7 +2094,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, 
DEFAULT_WARN_RESPONSE_TIME);
     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, 
DEFAULT_WARN_RESPONSE_SIZE);
-
+    this.minClientRequestTimeout = conf.getInt(MIN_CLIENT_REQUEST_TIMEOUT,
+        DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT);
     this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, 
DEFAULT_MAX_REQUEST_SIZE);
 
     // Start the listener here and let it bind to the port
@@ -2219,6 +2236,12 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
   }
 
+  public Pair<Message, CellScanner> call(BlockingService service, 
MethodDescriptor md,
+      Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status)
+      throws IOException {
+    return call(service, md, param, cellScanner, receiveTime, status, 0);
+  }
+
   /**
    * This is a server side method, which is invoked over RPC. On success
    * the return response has protobuf response payload. On failure, the
@@ -2226,7 +2249,8 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
    */
   @Override
   public Pair<Message, CellScanner> call(BlockingService service, 
MethodDescriptor md,
-      Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status)
+      Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status,
+      int timeout)
   throws IOException {
     try {
       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@@ -2236,6 +2260,7 @@ public class RpcServer implements RpcServerInterface, 
ConfigurationObserver {
       //get an instance of the method arg type
       long startTime = System.currentTimeMillis();
       PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController(cellScanner);
+      controller.setCallTimeout(timeout);
       Message result = service.callBlockingMethod(md, controller, param);
       long endTime = System.currentTimeMillis();
       int processingTime = (int) (endTime - startTime);

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 013d256..12b158d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -52,6 +52,11 @@ public interface RpcServerInterface {
     Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status)
   throws IOException, ServiceException;
 
+  Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+      Message param, CellScanner cellScanner, long receiveTime, 
MonitoredRPCHandler status,
+      int timeout)
+      throws IOException, ServiceException;
+
   void setErrorHandler(HBaseRPCErrorHandler handler);
   HBaseRPCErrorHandler getErrorHandler();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 4b4a378..78678cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -88,6 +88,7 @@ import 
org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -2577,6 +2578,14 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
                     timeLimitDelta =
                         scannerLeaseTimeoutPeriod > 0 ? 
scannerLeaseTimeoutPeriod : rpcTimeout;
                   }
+                  if (controller instanceof TimeLimitedRpcController) {
+                    TimeLimitedRpcController timeLimitedRpcController =
+                        (TimeLimitedRpcController)controller;
+                    if (timeLimitedRpcController.getCallTimeout() > 0) {
+                      timeLimitDelta = Math.min(timeLimitDelta,
+                          timeLimitedRpcController.getCallTimeout());
+                    }
+                  }
                   // Use half of whichever timeout value was more 
restrictive... But don't allow
                   // the time limit to be less than the allowable minimum 
(could cause an
                   // immediatate timeout before scanning any data).

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc733bd3/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index 8b3dac0..58bafce 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -105,16 +106,16 @@ public class TestScannerHeartbeatMessages {
   private static int VALUE_SIZE = 128;
   private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
 
+
+  private static int SERVER_TIMEOUT = 6000;
+
   // Time, in milliseconds, that the client will wait for a response from the 
server before timing
   // out. This value is used server side to determine when it is necessary to 
send a heartbeat
   // message to the client
-  private static int CLIENT_TIMEOUT = 2000;
-
-  // The server limits itself to running for half of the CLIENT_TIMEOUT value.
-  private static int SERVER_TIME_LIMIT = CLIENT_TIMEOUT / 2;
+  private static int CLIENT_TIMEOUT = SERVER_TIMEOUT / 3;
 
   // By default, at most one row's worth of cells will be retrieved before the 
time limit is reached
-  private static int DEFAULT_ROW_SLEEP_TIME = SERVER_TIME_LIMIT / 2;
+  private static int DEFAULT_ROW_SLEEP_TIME = CLIENT_TIMEOUT / 5;
   // By default, at most cells for two column families are retrieved before 
the time limit is
   // reached
   private static int DEFAULT_CF_SLEEP_TIME = DEFAULT_ROW_SLEEP_TIME / 
NUM_FAMILIES;
@@ -127,8 +128,8 @@ public class TestScannerHeartbeatMessages {
 
     conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
     conf.setStrings(HConstants.REGION_SERVER_IMPL, 
HeartbeatHRegionServer.class.getName());
-    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
CLIENT_TIMEOUT);
-    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
+    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
SERVER_TIMEOUT);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
     conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
 
     // Check the timeout condition after every cell
@@ -143,7 +144,7 @@ public class TestScannerHeartbeatMessages {
     Table ht = TEST_UTIL.createTable(name, families);
     List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
     ht.put(puts);
-
+    
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
CLIENT_TIMEOUT);
     return ht;
   }
 
@@ -285,7 +286,7 @@ public class TestScannerHeartbeatMessages {
     @Override
     public ReturnCode filterKeyValue(Cell v) throws IOException {
       try {
-        Thread.sleep(SERVER_TIME_LIMIT + 10);
+        Thread.sleep(CLIENT_TIMEOUT/2 + 10);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }

Reply via email to