This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ccb910fa93 HBASE-27947 RegionServer OOM when outbound channel backed 
up (#5350)
8ccb910fa93 is described below

commit 8ccb910fa93c10dd84aac5e38ac0c0e44db935bb
Author: Bryan Beaudreault <bbeaudrea...@apache.org>
AuthorDate: Fri Aug 18 09:59:38 2023 -0400

    HBASE-27947 RegionServer OOM when outbound channel backed up (#5350)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Reviewed-by: Norman Maurer <norman_mau...@apple.com>
---
 .../apache/hadoop/hbase/util/NettyUnsafeUtils.java |  61 +++++++
 .../hadoop/hbase/ipc/MetricsHBaseServerSource.java |  16 ++
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java    |  24 +++
 .../hbase/ipc/MetricsHBaseServerWrapper.java       |   7 +
 .../hadoop/hbase/ipc/MetricsHBaseServer.java       |   8 +
 .../hbase/ipc/MetricsHBaseServerWrapperImpl.java   |  13 ++
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java    | 161 +++++++++++++++++-
 .../NettyRpcServerChannelWritabilityHandler.java   | 125 ++++++++++++++
 .../hbase/ipc/NettyRpcServerPreambleHandler.java   |  11 +-
 .../apache/hadoop/hbase/ipc/NettyServerCall.java   |   2 +-
 .../hadoop/hbase/ipc/FailingNettyRpcServer.java    |   9 +-
 .../hbase/ipc/MetricsHBaseServerWrapperStub.java   |   7 +
 .../hbase/ipc/TestNettyChannelWritability.java     | 182 +++++++++++++++++++++
 .../apache/hadoop/hbase/ipc/TestRpcMetrics.java    |   9 +
 .../hbase/ipc/TestRpcSkipInitialSaslHandshake.java |  28 +---
 15 files changed, 619 insertions(+), 44 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
new file mode 100644
index 00000000000..8b246e978ea
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundBuffer;
+
+/**
+ * Wraps some usages of netty's unsafe API, for ease of maintainability.
+ */
+@InterfaceAudience.Private
+public final class NettyUnsafeUtils {
+
+  private NettyUnsafeUtils() {
+  }
+
+  /**
+   * Directly closes the channel, setting SO_LINGER to 0 and skipping any 
handlers in the pipeline.
+   * This is useful for cases where it's important to immediately close 
without any delay.
+   * Otherwise, pipeline handlers and even general TCP flows can cause a 
normal close to take
+   * upwards of a few second or more. This will likely cause the client side 
to see either a
+   * "Connection reset by peer" or unexpected ConnectionClosedException.
+   * <p>
+   * <b>It's necessary to call this from within the channel's eventLoop!</b>
+   */
+  public static void closeImmediately(Channel channel) {
+    assert channel.eventLoop().inEventLoop();
+    channel.config().setOption(ChannelOption.SO_LINGER, 0);
+    channel.unsafe().close(channel.voidPromise());
+  }
+
+  /**
+   * Get total bytes pending write to socket
+   */
+  public static long getTotalPendingOutboundBytes(Channel channel) {
+    ChannelOutboundBuffer outboundBuffer = channel.unsafe().outboundBuffer();
+    // can be null when the channel is closing
+    if (outboundBuffer == null) {
+      return 0;
+    }
+    return outboundBuffer.totalPendingWriteBytes();
+  }
+}
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 98ecf8b8d92..df2e335a718 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends 
ExceptionTrackingSource {
   String PROCESS_CALL_TIME_DESC = "Processing call time.";
   String TOTAL_CALL_TIME_NAME = "totalCallTime";
   String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and 
processing time.";
+
+  String UNWRITABLE_TIME_NAME = "unwritableTime";
+  String UNWRITABLE_TIME_DESC =
+    "Time where an channel was unwritable due to having too many outbound 
bytes";
+  String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded";
+  String MAX_OUTBOUND_BYTES_EXCEEDED_DESC =
+    "Number of times a connection was closed because the channel outbound "
+      + "bytes exceeded the configured max.";
   String QUEUE_SIZE_NAME = "queueSize";
   String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has 
been read and "
     + "parsed and is waiting to run or is currently being executed.";
@@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends 
ExceptionTrackingSource {
   String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";
 
   String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage.";
+  String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes";
+  String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending 
write to all channel";
+  String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes";
+  String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending 
write to any channel";
 
   void authorizationSuccess();
 
@@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends 
ExceptionTrackingSource {
   void processedCall(int processingTime);
 
   void queuedAndProcessedCall(int totalTime);
+
+  void unwritableTime(long unwritableTime);
+
+  void maxOutboundBytesExceeded();
 }
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 9c75f4e6bcb..1a6d557d8ad 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
 import org.apache.hadoop.hbase.metrics.Interns;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.metrics2.MetricHistogram;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
   private final MutableFastCounter authenticationFallbacks;
   private final MutableFastCounter sentBytes;
   private final MutableFastCounter receivedBytes;
+  private final MutableFastCounter maxOutboundBytesExceeded;
 
   private MetricHistogram queueCallTime;
   private MetricHistogram processCallTime;
   private MetricHistogram totalCallTime;
+  private MetricHistogram unwritableTime;
   private MetricHistogram requestSize;
   private MetricHistogram responseSize;
 
@@ -67,6 +70,10 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
       this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, 
PROCESS_CALL_TIME_DESC);
     this.totalCallTime =
       this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, 
TOTAL_CALL_TIME_DESC);
+    this.unwritableTime =
+      this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME, 
UNWRITABLE_TIME_DESC);
+    this.maxOutboundBytesExceeded = this.getMetricsRegistry()
+      .newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME, 
MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0);
     this.requestSize =
       this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, 
REQUEST_SIZE_DESC);
     this.responseSize =
@@ -133,6 +140,16 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
     totalCallTime.add(totalTime);
   }
 
+  @Override
+  public void unwritableTime(long unwritableTime) {
+    this.unwritableTime.add(unwritableTime);
+  }
+
+  @Override
+  public void maxOutboundBytesExceeded() {
+    maxOutboundBytesExceeded.incr();
+  }
+
   @Override
   public void getMetrics(MetricsCollector metricsCollector, boolean all) {
     MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
@@ -177,6 +194,13 @@ public class MetricsHBaseServerSourceImpl extends 
ExceptionTrackingSourceImpl
           wrapper.getActiveScanRpcHandlerCount())
         .addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC),
           wrapper.getNettyDmUsage());
+
+      Pair<Long, Long> totalAndMax = 
wrapper.getTotalAndMaxNettyOutboundBytes();
+      mrb.addGauge(
+        Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME, 
NETTY_TOTAL_PENDING_OUTBOUND_DESC),
+        totalAndMax.getFirst());
+      mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME, 
NETTY_MAX_PENDING_OUTBOUND_DESC),
+        totalAndMax.getSecond());
     }
 
     metricsRegistry.snapshot(mrb, all);
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 1a8980bbc7b..bb376cba930 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
@@ -64,4 +65,10 @@ public interface MetricsHBaseServerWrapper {
   int getActiveScanRpcHandlerCount();
 
   long getNettyDmUsage();
+
+  /**
+   * These two metrics are calculated together, so we want to return them in 
one call
+   * @return pair containing total (first) and max (second) pending outbound 
bytes.
+   */
+  Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index a4c73f925d3..b5fbb5c43d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -97,6 +97,14 @@ public class MetricsHBaseServer {
     source.queuedAndProcessedCall(totalTime);
   }
 
+  void unwritableTime(long unwritableTime) {
+    source.unwritableTime(unwritableTime);
+  }
+
+  void maxOutboundBytesExceeded() {
+    source.maxOutboundBytesExceeded();
+  }
+
   public void exception(Throwable throwable) {
     source.exception();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 857315568c5..1fc1806265d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import org.apache.hadoop.hbase.util.DirectMemoryUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
@@ -209,4 +210,16 @@ public class MetricsHBaseServerWrapperImpl implements 
MetricsHBaseServerWrapper
 
     return DirectMemoryUtils.getNettyDirectMemoryUsage();
   }
+
+  @Override
+  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+    if (
+      !isServerStarted() || this.server.getScheduler() == null
+        || !(this.server instanceof NettyRpcServer)
+    ) {
+      return Pair.newPair(0L, 0L);
+    }
+
+    return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 0b7badf7d81..722ee1d28c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.io.FileChangeWatcher;
 import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -53,6 +55,7 @@ import 
org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
+import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
 import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
 import 
org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
@@ -84,6 +87,38 @@ public class NettyRpcServer extends RpcServer {
   static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
   static final String HEAP_ALLOCATOR_TYPE = "heap";
 
+  /**
+   * Low watermark for pending outbound bytes of a single netty channel. If 
the high watermark was
+   * exceeded, channel will have setAutoRead to true again. The server will 
start reading incoming
+   * bytes (requests) from the client channel.
+   */
+  public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
+    "hbase.server.netty.writable.watermark.low";
+  private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
+
+  /**
+   * High watermark for pending outbound bytes of a single netty channel. If 
the number of pending
+   * outbound bytes exceeds this threshold, setAutoRead will be false for the 
channel. The server
+   * will stop reading incoming requests from the client channel.
+   * <p>
+   * Note: any requests already in the call queue will still be processed.
+   */
+  public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
+    "hbase.server.netty.writable.watermark.high";
+  private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
+
+  /**
+   * Fatal watermark for pending outbound bytes of a single netty channel. If 
the number of pending
+   * outbound bytes exceeds this threshold, the connection will be forcibly 
closed so that memory
+   * can be reclaimed. The client will have to re-establish a new connection 
and retry any in-flight
+   * requests.
+   * <p>
+   * Note: must be higher than the high watermark, otherwise it's ignored.
+   */
+  public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
+    "hbase.server.netty.writable.watermark.fatal";
+  private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
+
   private final InetSocketAddress bindAddress;
 
   private final CountDownLatch closed = new CountDownLatch(1);
@@ -94,6 +129,9 @@ public class NettyRpcServer extends RpcServer {
   private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new 
AtomicReference<>();
   private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new 
AtomicReference<>();
 
+  private volatile int writeBufferFatalThreshold;
+  private volatile WriteBufferWaterMark writeBufferWaterMark;
+
   public NettyRpcServer(Server server, String name, 
List<BlockingServiceAndInterface> services,
     InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
     boolean reservoirEnabled) throws IOException {
@@ -108,6 +146,10 @@ public class NettyRpcServer extends RpcServer {
     if (config == null) {
       config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
     }
+
+    // call before creating bootstrap below so that the necessary configs can 
be set
+    configureNettyWatermarks(conf);
+
     EventLoopGroup eventLoopGroup = config.group();
     Class<? extends ServerChannel> channelClass = config.serverChannelClass();
     ServerBootstrap bootstrap = new 
ServerBootstrap().group(eventLoopGroup).channel(channelClass)
@@ -117,6 +159,7 @@ public class NettyRpcServer extends RpcServer {
       .childHandler(new ChannelInitializer<Channel>() {
         @Override
         protected void initChannel(Channel ch) throws Exception {
+          ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
           ch.config().setAllocator(channelAllocator);
           ChannelPipeline pipeline = ch.pipeline();
           FixedLengthFrameDecoder preambleDecoder = new 
FixedLengthFrameDecoder(6);
@@ -124,12 +167,18 @@ public class NettyRpcServer extends RpcServer {
           if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
             initSSL(pipeline, 
conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
           }
+          NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
           pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, 
preambleDecoder)
-            .addLast(createNettyRpcServerPreambleHandler())
+            .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, 
conn))
             // We need NettyRpcServerResponseEncoder here because 
NettyRpcServerPreambleHandler may
             // send RpcResponse to client.
-            .addLast(NettyRpcServerResponseEncoder.NAME,
-              new NettyRpcServerResponseEncoder(metrics));
+            .addLast(NettyRpcServerResponseEncoder.NAME, new 
NettyRpcServerResponseEncoder(metrics))
+            // Add writability handler after the response encoder, so we can 
abort writes before
+            // they get encoded, if the fatal threshold is exceeded. We pass 
in suppliers here so
+            // that the handler configs can be live updated via update_config.
+            .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
+              new NettyRpcServerChannelWritabilityHandler(metrics, () -> 
writeBufferFatalThreshold,
+                () -> isWritabilityBackpressureEnabled()));
         }
       });
     try {
@@ -142,6 +191,91 @@ public class NettyRpcServer extends RpcServer {
     this.scheduler.init(new RpcSchedulerContext(this));
   }
 
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    super.onConfigurationChange(newConf);
+    configureNettyWatermarks(newConf);
+  }
+
+  private void configureNettyWatermarks(Configuration conf) {
+    int watermarkLow =
+      conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 
CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
+    int watermarkHigh =
+      conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 
CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
+    int fatalThreshold =
+      conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 
CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
+
+    WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
+    int oldFatalThreshold = writeBufferFatalThreshold;
+
+    boolean disabled = false;
+    if (watermarkHigh == 0 && watermarkLow == 0) {
+      // if both are 0, use the netty default, which we will treat as 
"disabled".
+      // when disabled, we won't manage autoRead in response to writability 
changes.
+      writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+      disabled = true;
+    } else {
+      // netty checks pendingOutboundBytes < watermarkLow. It can never be 
less than 0, so set to
+      // 1 to avoid confusing behavior.
+      if (watermarkLow == 0) {
+        LOG.warn(
+          "Detected a {} value of 0, which is impossible to achieve "
+            + "due to how netty evaluates these thresholds, setting to 1",
+          CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
+        watermarkLow = 1;
+      }
+
+      // netty validates the watermarks and throws an exception if high < low, 
fail more gracefully
+      // by disabling the watermarks and warning.
+      if (watermarkHigh <= watermarkLow) {
+        LOG.warn(
+          "Detected {} value {}, lower than {} value {}. This will fail netty 
validation, "
+            + "so disabling",
+          CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, 
CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
+          watermarkLow);
+        writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+      } else {
+        writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, 
watermarkHigh);
+      }
+
+      // only apply this check when watermark is enabled. this way we give the 
operator some
+      // flexibility if they want to try enabling fatal threshold without 
backpressure.
+      if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
+        LOG.warn("Detected a {} value of {}, which is lower than the {} value 
of {}, ignoring.",
+          CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, 
CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
+          watermarkHigh);
+        fatalThreshold = 0;
+      }
+    }
+
+    writeBufferFatalThreshold = fatalThreshold;
+
+    if (
+      oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
+        || oldWaterMark.high() != writeBufferWaterMark.high()
+        || oldFatalThreshold != writeBufferFatalThreshold)
+    ) {
+      LOG.info("Updated netty outbound write buffer watermarks: low={}, 
high={}, fatal={}",
+        disabled ? "disabled" : writeBufferWaterMark.low(),
+        disabled ? "disabled" : writeBufferWaterMark.high(),
+        writeBufferFatalThreshold <= 0 ? "disabled" : 
writeBufferFatalThreshold);
+    }
+
+    // update any existing channels
+    for (Channel channel : allChannels) {
+      channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
+      // if disabling watermark, set auto read to true in case channel had 
been exceeding
+      // previous watermark
+      if (disabled) {
+        channel.config().setAutoRead(true);
+      }
+    }
+  }
+
+  public boolean isWritabilityBackpressureEnabled() {
+    return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
+  }
+
   private ByteBufAllocator getChannelAllocator(Configuration conf) throws 
IOException {
     final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
     if (value != null) {
@@ -172,10 +306,10 @@ public class NettyRpcServer extends RpcServer {
     }
   }
 
-  // will be overriden in tests
+  // will be overridden in tests
   @InterfaceAudience.Private
-  protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
-    return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
+  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel 
channel) {
+    return new NettyServerRpcConnection(NettyRpcServer.this, channel);
   }
 
   @Override
@@ -296,4 +430,19 @@ public class NettyRpcServer extends RpcServer {
     }
     return result;
   }
+
+  public int getWriteBufferFatalThreshold() {
+    return writeBufferFatalThreshold;
+  }
+
+  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+    long total = 0;
+    long max = 0;
+    for (Channel channel : allChannels) {
+      long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+      total += outbound;
+      max = Math.max(max, outbound);
+    }
+    return Pair.newPair(total, max);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
new file mode 100644
index 00000000000..4b0b3878da8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.function.BooleanSupplier;
+import java.util.function.IntSupplier;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
+
+/**
+ * Handler to enforce writability protections on our server channels: <br>
+ * - Responds to channel writability events, which are triggered when the 
total pending bytes for a
+ * channel passes configured high and low watermarks. When high watermark is 
exceeded, the channel
+ * is setAutoRead(false). This way, we won't accept new requests from the 
client until some pending
+ * outbound bytes are successfully received by the client.<br>
+ * - Pre-processes any channel write requests. If the total pending outbound 
bytes exceeds a fatal
+ * threshold, the channel is forcefully closed and the write is set to failed. 
This handler should
+ * be the last handler in the pipeline so that it's the first handler to 
receive any messages sent
+ * to channel.write() or channel.writeAndFlush().
+ */
+@InterfaceAudience.Private
+public class NettyRpcServerChannelWritabilityHandler extends 
ChannelDuplexHandler {
+
+  static final String NAME = "NettyRpcServerChannelWritabilityHandler";
+
+  private final MetricsHBaseServer metrics;
+  private final IntSupplier pendingBytesFatalThreshold;
+  private final BooleanSupplier isWritabilityBackpressureEnabled;
+
+  private boolean writable = true;
+  private long unwritableStartTime;
+
+  NettyRpcServerChannelWritabilityHandler(MetricsHBaseServer metrics,
+    IntSupplier pendingBytesFatalThreshold, BooleanSupplier 
isWritabilityBackpressureEnabled) {
+    this.metrics = metrics;
+    this.pendingBytesFatalThreshold = pendingBytesFatalThreshold;
+    this.isWritabilityBackpressureEnabled = isWritabilityBackpressureEnabled;
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise)
+    throws Exception {
+    if (handleFatalThreshold(ctx)) {
+      promise.setFailure(
+        new ConnectionClosedException("Channel outbound bytes exceeded fatal 
threshold"));
+      if (msg instanceof RpcResponse) {
+        ((RpcResponse) msg).done();
+      } else {
+        ReferenceCountUtil.release(msg);
+      }
+      return;
+    }
+    ctx.write(msg, promise);
+  }
+
+  @Override
+  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
+    if (isWritabilityBackpressureEnabled.getAsBoolean()) {
+      handleWritabilityChanged(ctx);
+    }
+    ctx.fireChannelWritabilityChanged();
+  }
+
+  private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+    int fatalThreshold = pendingBytesFatalThreshold.getAsInt();
+    if (fatalThreshold <= 0) {
+      return false;
+    }
+
+    Channel channel = ctx.channel();
+    long outboundBytes = 
NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+    if (outboundBytes < fatalThreshold) {
+      return false;
+    }
+
+    if (channel.isOpen()) {
+      metrics.maxOutboundBytesExceeded();
+      RpcServer.LOG.warn(
+        "{}: Closing connection because outbound buffer size of {} exceeds 
fatal threshold of {}",
+        channel.remoteAddress(), outboundBytes, fatalThreshold);
+      NettyUnsafeUtils.closeImmediately(channel);
+    }
+
+    return true;
+  }
+
+  private void handleWritabilityChanged(ChannelHandlerContext ctx) {
+    boolean oldWritableValue = this.writable;
+
+    this.writable = ctx.channel().isWritable();
+    ctx.channel().config().setAutoRead(this.writable);
+
+    if (!oldWritableValue && this.writable) {
+      // changing from not writable to writable, update metrics
+      metrics.unwritableTime(EnvironmentEdgeManager.currentTime() - 
unwritableStartTime);
+      unwritableStartTime = 0;
+    } else if (oldWritableValue && !this.writable) {
+      // changing from writable to non-writable, set start time
+      unwritableStartTime = EnvironmentEdgeManager.currentTime();
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
index 8269bbc60d8..b79a67f986e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
 import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
@@ -38,14 +37,15 @@ class NettyRpcServerPreambleHandler extends 
SimpleChannelInboundHandler<ByteBuf>
   static final String DECODER_NAME = "preambleDecoder";
 
   private final NettyRpcServer rpcServer;
+  private final NettyServerRpcConnection conn;
 
-  public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
+  public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, 
NettyServerRpcConnection conn) {
     this.rpcServer = rpcServer;
+    this.conn = conn;
   }
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws 
Exception {
-    NettyServerRpcConnection conn = 
createNettyServerRpcConnection(ctx.channel());
     ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
     msg.readBytes(buf);
     buf.flip();
@@ -76,9 +76,4 @@ class NettyRpcServerPreambleHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       ctx.channel().remoteAddress(), cause);
     NettyFutureUtils.safeClose(ctx);
   }
-
-  // will be overridden in tests
-  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel 
channel) {
-    return new NettyServerRpcConnection(rpcServer, channel);
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index fd0c6d75d88..4f0540da80a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -54,6 +54,6 @@ class NettyServerCall extends 
ServerCall<NettyServerRpcConnection> {
   public synchronized void sendResponseIfReady() throws IOException {
     // set param null to reduce memory pressure
     this.param = null;
-    connection.channel.writeAndFlush(this);
+    connection.doRespond(this);
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
index d5c408c2387..da4f70e3a24 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
@@ -49,12 +49,7 @@ public class FailingNettyRpcServer extends NettyRpcServer {
   }
 
   @Override
-  protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
-    return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
-      @Override
-      protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
-        return new FailingConnection(FailingNettyRpcServer.this, channel);
-      }
-    };
+  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel 
channel) {
+    return new FailingConnection(FailingNettyRpcServer.this, channel);
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index 6e5dfe87fc7..7170413bee9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.util.Pair;
+
 public class MetricsHBaseServerWrapperStub implements 
MetricsHBaseServerWrapper {
   @Override
   public long getTotalQueueSize() {
@@ -127,4 +129,9 @@ public class MetricsHBaseServerWrapperStub implements 
MetricsHBaseServerWrapper
   public int getActiveMetaPriorityRpcHandlerCount() {
     return 1;
   }
+
+  @Override
+  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
+    return Pair.newPair(100L, 5L);
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
new file mode 100644
index 00000000000..001f6dbd22c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static 
org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyChannelWritability {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestNettyChannelWritability.class);
+
+  private static final MetricsAssertHelper METRICS_ASSERT =
+    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
+  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, 
CELL_BYTES, CELL_BYTES);
+
+  /**
+   * Test that we properly send configured watermarks to netty, and trigger 
setWritable when
+   * necessary.
+   */
+  @Test
+  public void testNettyWritableWatermarks() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
+    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
+
+    NettyRpcServer rpcServer = createRpcServer(conf, 0);
+    try {
+      sendAndReceive(conf, rpcServer, 5);
+      METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0,
+        rpcServer.metrics.getMetricsSource());
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  /**
+   * Test that our fatal watermark is honored, which requires artificially 
causing some queueing so
+   * that pendingOutboundBytes increases.
+   */
+  @Test
+  public void testNettyWritableFatalThreshold() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
+
+    // flushAfter is 3 here, with requestCount 5 below. If we never flush, the 
WriteTasks will sit
+    // in the eventloop. So we flush a few at once, which will ensure that we 
hit fatal threshold
+    NettyRpcServer rpcServer = createRpcServer(conf, 3);
+    try {
+      CompletionException exception =
+        assertThrows(CompletionException.class, () -> sendAndReceive(conf, 
rpcServer, 5));
+      assertTrue(exception.getCause().getCause() instanceof ServiceException);
+      METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0,
+        rpcServer.metrics.getMetricsSource());
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, 
int requestCount)
+    throws Exception {
+    List<Cell> cells = new ArrayList<>();
+    int count = 3;
+    for (int i = 0; i < count; i++) {
+      cells.add(CELL);
+    }
+
+    try (NettyRpcClient client = new NettyRpcClient(conf)) {
+      rpcServer.start();
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+        newBlockingStub(client, rpcServer.getListenerAddress());
+      CompletableFuture<Void>[] futures = new CompletableFuture[requestCount];
+      for (int i = 0; i < requestCount; i++) {
+        futures[i] = CompletableFuture.runAsync(() -> {
+          try {
+            sendMessage(cells, stub);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+      }
+      CompletableFuture.allOf(futures).join();
+    }
+  }
+
+  private void sendMessage(List<Cell> cells,
+    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws 
Exception {
+    HBaseRpcController pcrc = new 
HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
+    String message = "hello";
+    assertEquals(message,
+      stub.echo(pcrc, 
TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+        .getMessage());
+    int index = 0;
+    CellScanner cellScanner = pcrc.cellScanner();
+    assertNotNull(cellScanner);
+    while (cellScanner.advance()) {
+      assertEquals(CELL, cellScanner.current());
+      index++;
+    }
+    assertEquals(cells.size(), index);
+  }
+
+  private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) 
throws IOException {
+    String name = "testRpcServer";
+    ArrayList<RpcServer.BlockingServiceAndInterface> services =
+      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null));
+
+    InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0);
+    FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1);
+
+    AtomicInteger writeCount = new AtomicInteger(0);
+
+    return new NettyRpcServer(null, name, services, bindAddress, conf, 
scheduler, true) {
+      @Override
+      protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
+        return new NettyServerRpcConnection(this, channel) {
+          @Override
+          protected void doRespond(RpcResponse resp) {
+            if (writeCount.incrementAndGet() >= flushAfter) {
+              super.doRespond(resp);
+            } else {
+              channel.write(resp);
+            }
+          }
+        };
+      }
+    };
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
index 288bb3fe262..c55568d392a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
@@ -89,6 +89,9 @@ public class TestRpcMetrics {
     HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
     HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
     HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
+    HELPER.assertGauge("nettyDirectMemoryUsage", 100, serverSource);
+    HELPER.assertGauge("nettyTotalPendingOutboundBytes", 100, serverSource);
+    HELPER.assertGauge("nettyMaxPendingOutboundBytes", 5, serverSource);
   }
 
   /**
@@ -100,6 +103,12 @@ public class TestRpcMetrics {
       new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
     MetricsHBaseServerSource serverSource = mrpc.getMetricsSource();
 
+    mrpc.unwritableTime(100);
+    mrpc.maxOutboundBytesExceeded();
+    mrpc.maxOutboundBytesExceeded();
+    HELPER.assertCounter("maxOutboundBytesExceeded", 2, serverSource);
+    HELPER.assertCounter("unwritableTime_NumOps", 1, serverSource);
+
     for (int i = 0; i < 12; i++) {
       mrpc.authenticationFailure();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
index 9f6b7d54430..bc791754a12 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
@@ -28,7 +28,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -49,9 +49,7 @@ import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
 import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@@ -131,29 +129,15 @@ public class TestRpcSkipInitialSaslHandshake {
       .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
 
-    final AtomicBoolean useSaslRef = new AtomicBoolean(false);
+    final AtomicReference<NettyServerRpcConnection> conn = new 
AtomicReference<>(null);
     NettyRpcServer rpcServer = new NettyRpcServer(null, 
getClass().getSimpleName(),
       Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, 
null)),
       new InetSocketAddress(HOST, 0), serverConf, new 
FifoRpcScheduler(serverConf, 1), true) {
 
       @Override
-      protected NettyRpcServerPreambleHandler 
createNettyRpcServerPreambleHandler() {
-        return new NettyRpcServerPreambleHandler(this) {
-          private NettyServerRpcConnection conn;
-
-          @Override
-          protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) 
throws Exception {
-            super.channelRead0(ctx, msg);
-            useSaslRef.set(conn.useSasl);
-
-          }
-
-          @Override
-          protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
-            conn = super.createNettyServerRpcConnection(channel);
-            return conn;
-          }
-        };
+      protected NettyServerRpcConnection 
createNettyServerRpcConnection(Channel channel) {
+        conn.set(super.createNettyServerRpcConnection(channel));
+        return conn.get();
       }
     };
 
@@ -167,7 +151,7 @@ public class TestRpcSkipInitialSaslHandshake {
         stub.echo(null, 
TestProtos.EchoRequestProto.newBuilder().setMessage("test").build())
           .getMessage();
       assertTrue("test".equals(response));
-      assertFalse(useSaslRef.get());
+      assertFalse(conn.get().useSasl);
 
     } finally {
       rpcServer.stop();


Reply via email to