This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b091c40612 HBASE-27222 Purge FutureReturnValueIgnored warnings from 
error prone (#4634)
8b091c40612 is described below

commit 8b091c4061220cc4307ef2ed35291f6fe14bbdb0
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Jul 26 23:42:37 2022 +0800

    HBASE-27222 Purge FutureReturnValueIgnored warnings from error prone (#4634)
    
    Signed-off-by: Andrew Purtell <[email protected]>
---
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  50 ++++++-----
 .../FanOutOneBlockAsyncDFSOutputHelper.java        |  15 ++--
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    |   9 +-
 .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java  |   7 +-
 .../TestFanOutOneBlockAsyncDFSOutputHang.java      |   5 +-
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java     |   4 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java      |   4 +-
 .../hadoop/hbase/client/ClusterStatusListener.java |  15 +++-
 .../org/apache/hadoop/hbase/ipc/TestIPCUtil.java   |   4 +-
 .../org/apache/hadoop/hbase/util/FutureUtils.java  |  11 +++
 .../apache/hadoop/hbase/util/NettyFutureUtils.java | 100 +++++++++++++++++++++
 .../hbase/client/example/HttpProxyExample.java     |  10 ++-
 .../hadoop/hbase/io/hfile/MemcachedBlockCache.java |  30 +++----
 .../hadoop/hbase/ShellExecEndpointCoprocessor.java |   2 +-
 .../MergeRandomAdjacentRegionsOfTableAction.java   |   2 +-
 .../hbase/test/IntegrationTestLoadCommonCrawl.java |  10 ++-
 .../trace/IntegrationTestSendTraceRequests.java    |   4 +-
 .../org/apache/hadoop/hbase/client/TestAdmin2.java |   1 +
 .../assignment/TestWakeUpUnexpectedProcedure.java  |   1 +
 .../hadoop/hbase/regionserver/TestHStore.java      |   4 +-
 .../regionserver/wal/TestAsyncFSWALDurability.java |   4 +-
 .../apache/hadoop/hbase/zookeeper/ZKWatcher.java   |   2 +-
 22 files changed, 212 insertions(+), 82 deletions(-)

diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index a43bed31734..149dec431e0 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -22,23 +22,26 @@ import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static 
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 import static 
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
 
 import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
@@ -48,6 +51,8 @@ import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.Can
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -63,14 +68,13 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
 import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 import 
org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
@@ -252,7 +256,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     // disable further write, and fail all pending ack.
     state = State.BROKEN;
     failWaitingAckQueue(channel, errorSupplier);
-    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
+    datanodeInfoMap.keySet().forEach(NettyFutureUtils::safeClose);
   }
 
   private void failWaitingAckQueue(Channel channel, Supplier<Throwable> 
errorSupplier) {
@@ -329,7 +333,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
           ByteBuf buf = alloc.buffer(len);
           heartbeat.putInBuffer(buf.nioBuffer(0, len));
           buf.writerIndex(len);
-          ctx.channel().writeAndFlush(buf);
+          safeWriteAndFlush(ctx.channel(), buf);
         }
         return;
       }
@@ -440,9 +444,9 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     // TODO: we should perhaps measure time taken per DN here;
     // we could collect statistics per DN, and/or exclude bad nodes in 
createOutput.
     datanodeInfoMap.keySet().forEach(ch -> {
-      ch.write(headerBuf.retainedDuplicate());
-      ch.write(checksumBuf.retainedDuplicate());
-      ch.writeAndFlush(dataBuf.retainedDuplicate());
+      safeWrite(ch, headerBuf.retainedDuplicate());
+      safeWrite(ch, checksumBuf.retainedDuplicate());
+      safeWriteAndFlush(ch, dataBuf.retainedDuplicate());
     });
     checksumBuf.release();
     headerBuf.release();
@@ -562,16 +566,18 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     headerBuf.writerIndex(headerLen);
     CompletableFuture<Long> future = new CompletableFuture<>();
     waitingAckQueue.add(new Callback(future, finalizedLength, 
datanodeInfoMap.keySet(), 0));
-    datanodeInfoMap.keySet().forEach(ch -> 
ch.writeAndFlush(headerBuf.retainedDuplicate()));
+    datanodeInfoMap.keySet().forEach(ch -> safeWriteAndFlush(ch, 
headerBuf.retainedDuplicate()));
     headerBuf.release();
-    try {
-      future.get();
-    } catch (InterruptedException e) {
-      throw (IOException) new InterruptedIOException().initCause(e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      Throwables.propagateIfPossible(cause, IOException.class);
-      throw new IOException(cause);
+    FutureUtils.get(future);
+  }
+
+  private void closeDataNodeChannelsAndAwait() {
+    List<ChannelFuture> futures = new ArrayList<>();
+    for (Channel ch : datanodeInfoMap.keySet()) {
+      futures.add(ch.close());
+    }
+    for (ChannelFuture future : futures) {
+      consume(future.awaitUninterruptibly());
     }
   }
 
@@ -579,14 +585,12 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
    * The close method when error occurred. Now we just call recoverFileLease.
    */
   @Override
-  @SuppressWarnings("FutureReturnValueIgnored")
   public void recoverAndClose(CancelableProgressable reporter) throws 
IOException {
     if (buf != null) {
       buf.release();
       buf = null;
     }
-    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
-    datanodeInfoMap.keySet().forEach(ch -> 
ch.closeFuture().awaitUninterruptibly());
+    closeDataNodeChannelsAndAwait();
     endFileLease(client, fileId);
     RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
       reporter == null ? new CancelOnClose(client) : reporter);
@@ -597,12 +601,10 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
    * {@link #recoverAndClose(CancelableProgressable)} if this method throws an 
exception.
    */
   @Override
-  @SuppressWarnings("FutureReturnValueIgnored")
   public void close() throws IOException {
     endBlock();
     state = State.CLOSED;
-    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
-    datanodeInfoMap.keySet().forEach(ch -> 
ch.closeFuture().awaitUninterruptibly());
+    closeDataNodeChannelsAndAwait();
     block.setNumBytes(ackedBlockLength);
     completeFile(client, namenode, src, clientName, block, fileId);
   }
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 2517f2d2c01..9c66c53b8bf 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.io.asyncfs;
 
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@@ -351,7 +354,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
     buffer.writeByte(Op.WRITE_BLOCK.code);
     proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
-    channel.writeAndFlush(buffer);
+    safeWriteAndFlush(channel, buffer);
   }
 
   private static void initialize(Configuration conf, Channel channel, 
DatanodeInfo dnInfo,
@@ -360,7 +363,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     throws IOException {
     Promise<Void> saslPromise = channel.eventLoop().newPromise();
     trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, 
saslPromise);
-    saslPromise.addListener(new FutureListener<Void>() {
+    addListener(saslPromise, new FutureListener<Void>() {
 
       @Override
       public void operationComplete(Future<Void> future) throws Exception {
@@ -404,7 +407,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       Promise<Channel> promise = eventLoopGroup.next().newPromise();
       futureList.add(promise);
       String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
-      new Bootstrap().group(eventLoopGroup).channel(channelClass)
+      addListener(new Bootstrap().group(eventLoopGroup).channel(channelClass)
         .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new 
ChannelInitializer<Channel>() {
 
           @Override
@@ -413,7 +416,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
             // channel connected. Leave an empty implementation here because 
netty does not allow
             // a null handler.
           }
-        }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new 
ChannelFutureListener() {
+        }).connect(NetUtils.createSocketAddr(dnAddr)), new 
ChannelFutureListener() {
 
           @Override
           public void operationComplete(ChannelFuture future) throws Exception 
{
@@ -533,12 +536,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         if (!succ) {
           if (futureList != null) {
             for (Future<Channel> f : futureList) {
-              f.addListener(new FutureListener<Channel>() {
+              addListener(f, new FutureListener<Channel>() {
 
                 @Override
                 public void operationComplete(Future<Channel> future) throws 
Exception {
                   if (future.isSuccess()) {
-                    future.getNow().close();
+                    safeClose(future.getNow());
                   }
                 }
               });
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index ee02d42d2d3..4ac46e8cc5d 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
 import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static 
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 
@@ -448,12 +449,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
       size += CodedOutputStream.computeRawVarint32Size(size);
       ByteBuf buf = ctx.alloc().buffer(size);
       proto.writeDelimitedTo(new ByteBufOutputStream(buf));
-      ctx.write(buf);
+      safeWrite(ctx, buf);
     }
 
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      safeWrite(ctx, 
ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
       sendSaslMessage(ctx, new byte[0]);
       ctx.flush();
       step++;
@@ -642,7 +643,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
         cBuf.addComponent(buf);
         cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
       } else {
-        ctx.write(msg);
+        safeWrite(ctx, msg);
       }
     }
 
@@ -656,7 +657,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
         ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
         buf.writeInt(wrapped.length);
         buf.writeBytes(wrapped);
-        ctx.write(buf);
+        safeWrite(ctx, buf);
       }
       ctx.flush();
     }
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 26cbbe034a5..3c385283103 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.consume;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -93,9 +94,9 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
   }
 
   @AfterClass
-  public static void tearDown() throws IOException, InterruptedException {
+  public static void tearDown() throws Exception {
     if (EVENT_LOOP_GROUP != null) {
-      EVENT_LOOP_GROUP.shutdownGracefully().sync();
+      EVENT_LOOP_GROUP.shutdownGracefully().get();
     }
     shutdownMiniDFSCluster();
   }
@@ -262,7 +263,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
     byte[] b = new byte[50 * 1024 * 1024];
     Bytes.random(b);
     out.write(b);
-    out.flush(false);
+    consume(out.flush(false));
     assertEquals(b.length, out.flush(false).get().longValue());
     out.close();
     assertEquals(b.length, FS.getFileStatus(f).getLen());
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
index 53fb37a8e0b..77752789dbb 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -103,12 +102,12 @@ public class TestFanOutOneBlockAsyncDFSOutputHang extends 
AsyncFSTestBase {
   }
 
   @AfterClass
-  public static void tearDown() throws IOException, InterruptedException {
+  public static void tearDown() throws Exception {
     if (OUT != null) {
       OUT.recoverAndClose(null);
     }
     if (EVENT_LOOP_GROUP != null) {
-      EVENT_LOOP_GROUP.shutdownGracefully().sync();
+      EVENT_LOOP_GROUP.shutdownGracefully().get();
     }
     shutdownMiniDFSCluster();
   }
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index cb936a4e7c6..d1ce128b118 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -53,9 +53,9 @@ public class TestLocalAsyncOutput {
   private static StreamSlowMonitor MONITOR;
 
   @AfterClass
-  public static void tearDownAfterClass() throws IOException {
+  public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.cleanupTestDir();
-    GROUP.shutdownGracefully();
+    GROUP.shutdownGracefully().get();
     MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), 
"testMonitor");
   }
 
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index cb5fb4006d3..479b8f4e603 100644
--- 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -193,9 +193,9 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends 
AsyncFSTestBase {
   }
 
   @AfterClass
-  public static void tearDownAfterClass() throws IOException, 
InterruptedException {
+  public static void tearDownAfterClass() throws Exception {
     if (EVENT_LOOP_GROUP != null) {
-      EVENT_LOOP_GROUP.shutdownGracefully().sync();
+      EVENT_LOOP_GROUP.shutdownGracefully().get();
     }
     if (KDC != null) {
       KDC.stop();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
index cc34d59c732..26c5d98b46d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -183,7 +186,6 @@ class ClusterStatusListener implements Closeable {
 
     @Override
     public void connect(Configuration conf) throws IOException {
-
       String mcAddress =
         conf.get(HConstants.STATUS_MULTICAST_ADDRESS, 
HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
       String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
@@ -218,16 +220,21 @@ class ClusterStatusListener implements Closeable {
       }
 
       LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", 
bindAddress, ni, ina);
-      channel.joinGroup(ina, ni, null, channel.newPromise());
+      try {
+        consume(channel.joinGroup(ina, ni, null).sync());
+      } catch (InterruptedException e) {
+        close();
+        throw ExceptionUtil.asInterrupt(e);
+      }
     }
 
     @Override
     public void close() {
       if (channel != null) {
-        channel.close();
+        safeClose(channel);
         channel = null;
       }
-      group.shutdownGracefully();
+      consume(group.shutdownGracefully());
     }
 
     /**
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index c327896f72a..67a8d15c1d0 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -121,7 +121,7 @@ public class TestIPCUtil {
   }
 
   @Test
-  public void testExecute() throws IOException {
+  public void testExecute() throws Exception {
     EventLoop eventLoop = new DefaultEventLoop();
     MutableInt executed = new MutableInt(0);
     MutableInt numStackTraceElements = new MutableInt(0);
@@ -156,7 +156,7 @@ public class TestIPCUtil {
       });
       FutureUtils.get(future);
     } finally {
-      eventLoop.shutdownGracefully();
+      eventLoop.shutdownGracefully().get();
     }
   }
 }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
index dc70edd0905..4f8a7320fb4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java
@@ -93,6 +93,17 @@ public final class FutureUtils {
     }, executor);
   }
 
+  /**
+   * Log the error if the future indicates any failure.
+   */
+  public static void consume(CompletableFuture<?> future) {
+    addListener(future, (r, e) -> {
+      if (e != null) {
+        LOG.warn("Async operation fails", e);
+      }
+    });
+  }
+
   /**
    * Return a {@link CompletableFuture} which is same with the given {@code 
future}, but execute all
    * the callbacks in the given {@code executor}.
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java
new file mode 100644
index 00000000000..1613569fe94
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyFutureUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import 
org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * Helper class for processing netty futures.
+ */
[email protected]
+public final class NettyFutureUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NettyFutureUtils.class);
+
+  private NettyFutureUtils() {
+  }
+
+  /**
+   * This is method is used when you just want to add a listener to the given 
netty future. Ignoring
+   * the return value of a Future is considered as a bad practice as it may 
suppress exceptions
+   * thrown from the code that completes the future, and this method will 
catch all the exception
+   * thrown from the {@code listener} to catch possible code bugs.
+   * <p/>
+   * And the error phone check will always report FutureReturnValueIgnored 
because every method in
+   * the {@link Future} class will return a new {@link Future}, so you always 
have one future that
+   * has not been checked. So we introduce this method and add a suppress 
warnings annotation here.
+   */
+  @SuppressWarnings({ "FutureReturnValueIgnored", "rawtypes", "unchecked" })
+  public static <V> void addListener(Future<V> future,
+    GenericFutureListener<? extends Future<? super V>> listener) {
+    future.addListener(f -> {
+      try {
+        // the ? operator in template makes it really hard to pass compile, so 
here we just cast the
+        // listener to raw type.
+        ((GenericFutureListener) listener).operationComplete(f);
+      } catch (Throwable t) {
+        LOG.error("Unexpected error caught when processing netty", t);
+      }
+    });
+  }
+
+  private static void loggingWhenError(Future<?> future) {
+    if (!future.isSuccess()) {
+      LOG.warn("IO operation failed", future.cause());
+    }
+  }
+
+  /**
+   * Log the error if the future indicates any failure.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  public static void consume(Future<?> future) {
+    future.addListener(NettyFutureUtils::loggingWhenError);
+  }
+
+  /**
+   * Close the channel and eat the returned future by logging the error when 
the future is completed
+   * with error.
+   */
+  public static void safeClose(ChannelOutboundInvoker channel) {
+    consume(channel.close());
+  }
+
+  /**
+   * Call write on the channel and eat the returned future by logging the 
error when the future is
+   * completed with error.
+   */
+  public static void safeWrite(ChannelOutboundInvoker channel, Object msg) {
+    consume(channel.write(msg));
+  }
+
+  /**
+   * Call writeAndFlush on the channel and eat the returned future by logging 
the error when the
+   * future is completed with error.
+   */
+  public static void safeWriteAndFlush(ChannelOutboundInvoker channel, Object 
msg) {
+    consume(channel.writeAndFlush(msg));
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
index d731ac779bf..9ecd266b5dd 100644
--- 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client.example;
 
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -71,6 +72,11 @@ import 
org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
  *
  * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value 
as the request content
  * when doing PUT.
+ * <p>
+ * Notice that, future class methods will all return a new Future, so you 
always have one future
+ * that will not been checked, so we need to suppress error-prone 
"FutureReturnValueIgnored"
+ * warnings on the methods such as join and stop. In your real production 
code, you should use your
+ * own convenient way to address the warning.
  */
 @InterfaceAudience.Private
 public class HttpProxyExample {
@@ -148,7 +154,7 @@ public class HttpProxyExample {
         resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
       }
       resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; 
charset=UTF-8");
-      ctx.writeAndFlush(resp);
+      safeWriteAndFlush(ctx, resp);
     }
 
     private Params parse(FullHttpRequest req) {
@@ -239,6 +245,7 @@ public class HttpProxyExample {
         }).bind(port).syncUninterruptibly().channel();
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void join() {
     serverChannel.closeFuture().awaitUninterruptibly();
   }
@@ -251,6 +258,7 @@ public class HttpProxyExample {
     }
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void stop() throws IOException {
     serverChannel.close().syncUninterruptibly();
     serverChannel = null;
diff --git 
a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
 
b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
index 1129921ef35..d5c6bff8cd0 100644
--- 
a/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
+++ 
b/hbase-external-blockcache/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -80,23 +80,10 @@ public class MemcachedBlockCache implements BlockCache {
     boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, 
MEMCACHED_OPTIMIZE_DEFAULT);
 
     ConnectionFactoryBuilder builder =
-      new 
ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
 // Cap
-                                                                               
                   // the
-                                                                               
                   // max
-                                                                               
                   // time
-                                                                               
                   // before
-                                                                               
                   // anything
-                                                                               
                   // times
-                                                                               
                   // out
-        
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
 // Don't
-                                                                               
               // keep
-                                                                               
               // threads
-                                                                               
               // around
-                                                                               
               // past
-                                                                               
               // the
-                                                                               
               // end
-                                                                               
               // of
-                                                                               
               // days.
+      // Cap the max time before anything times out
+      new 
ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
+        // Don't keep threads around past the end of days.
+        
.setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
         .setUseNagleAlgorithm(false) // Ain't nobody got time for that
         .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much 
larger just in case
 
@@ -124,10 +111,17 @@ public class MemcachedBlockCache implements BlockCache {
     cacheBlock(cacheKey, buf);
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Override
   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
     if (buf instanceof HFileBlock) {
-      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
+      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, 
tc).addListener(f -> {
+        try {
+          f.get();
+        } catch (ExecutionException e) {
+          LOG.warn("Failed to cache block", e);
+        }
+      });
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
index edaa2a7e40f..e80989174c4 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/ShellExecEndpointCoprocessor.java
@@ -108,7 +108,7 @@ public class ShellExecEndpointCoprocessor extends 
ShellExecEndpoint.ShellExecSer
   private void runBackgroundTask(final Shell.ShellCommandExecutor shell,
     final RpcCallback<ShellExecResponse> done) {
     final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, 
DEFAULT_BACKGROUND_DELAY_MS);
-    backgroundExecutor.submit(() -> {
+    backgroundExecutor.execute(() -> {
       try {
         // sleep first so that the RPC can ACK. race condition here as we have 
no means of blocking
         // until the IPC response has been acknowledged by the client.
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
index 4dfb1af0196..b019197b7d0 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/MergeRandomAdjacentRegionsOfTableAction.java
@@ -72,7 +72,7 @@ public class MergeRandomAdjacentRegionsOfTableAction extends 
Action {
     }
 
     try {
-      admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), 
b.getEncodedNameAsBytes(), false);
+      admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), 
b.getEncodedNameAsBytes(), false).get();
     } catch (Exception ex) {
       getLogger().warn("Merge failed, might be caused by other chaos: " + 
ex.getMessage());
     }
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
index bdb1c719af2..52a88743f55 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.test;
 
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
@@ -684,9 +686,9 @@ public class IntegrationTestLoadCommonCrawl extends 
IntegrationTestBase {
             }
             final long putStartTime = System.currentTimeMillis();
             final CompletableFuture<Void> putFuture = table.put(put);
-            putFuture.thenRun(() -> {
+            addListener(putFuture, (r, e) -> {
               inflight.decrementAndGet();
-              if (!putFuture.isCompletedExceptionally()) {
+              if (e == null) {
                 output.getCounter(Counts.RPC_TIME_MS)
                   .increment(System.currentTimeMillis() - putStartTime);
                 
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(put.heapSize());
@@ -732,9 +734,9 @@ public class IntegrationTestLoadCommonCrawl extends 
IntegrationTestBase {
                   }
                   final long incrStartTime = System.currentTimeMillis();
                   final CompletableFuture<Result> incrFuture = 
table.increment(increment);
-                  incrFuture.thenRun(() -> {
+                  addListener(incrFuture, (r, e) -> {
                     inflight.decrementAndGet();
-                    if (!incrFuture.isCompletedExceptionally()) {
+                    if (e == null) {
                       output.getCounter(Counts.RPC_TIME_MS)
                         .increment(System.currentTimeMillis() - incrStartTime);
                       
output.getCounter(Counts.RPC_BYTES_WRITTEN).increment(increment.heapSize());
diff --git 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
index 35b70120993..49ac9f1a239 100644
--- 
a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
+++ 
b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java
@@ -150,7 +150,7 @@ public class IntegrationTestSendTraceRequests extends 
AbstractHBaseTool {
           }
         }
       };
-      service.submit(runnable);
+      service.execute(runnable);
     }
   }
 
@@ -194,7 +194,7 @@ public class IntegrationTestSendTraceRequests extends 
AbstractHBaseTool {
 
         }
       };
-      service.submit(runnable);
+      service.execute(runnable);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 1807fcd9e88..49abb3035a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -722,6 +722,7 @@ public class TestAdmin2 extends TestAdminBase {
     assertEquals(11111111, ADMIN.getDescriptor(tableName).getMaxFileSize());
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void testTableMergeFollowedByModify() throws Exception {
     final TableName tableName = TableName.valueOf(name.getMethodName());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
index 824ccb49eab..f0c8cf58abe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestWakeUpUnexpectedProcedure.java
@@ -236,6 +236,7 @@ public class TestWakeUpUnexpectedProcedure {
     UTIL.shutdownMiniCluster();
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   @Test
   public void test() throws Exception {
     RegionInfo region = 
UTIL.getMiniHBaseCluster().getRegions(NAME).get(0).getRegionInfo();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 72d073ea3d5..3e25f4aa61d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -1284,7 +1284,7 @@ public class TestHStore {
       storeFlushCtx.prepare();
     };
     ExecutorService service = Executors.newSingleThreadExecutor();
-    service.submit(flush);
+    service.execute(flush);
     // we get scanner from pipeline and snapshot but they are empty. -- phase 
(2/5)
     // this is blocked until we recreate the active memstore -- phase (3/5)
     // we get scanner from active memstore but it is empty -- phase (5/5)
@@ -1321,7 +1321,7 @@ public class TestHStore {
       public void getScanners(MyStore store) throws IOException {
         final long tmpId = id++;
         ExecutorService s = Executors.newSingleThreadExecutor();
-        s.submit(() -> {
+        s.execute(() -> {
           try {
             // flush the store before storescanner updates the scanners from 
store.
             // The current data will be flushed into files, and the memstore 
will
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index 58ec1a79cf2..8402617c44b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -52,8 +52,8 @@ public class TestAsyncFSWALDurability extends 
WALDurabilityTestBase<CustomAsyncF
   }
 
   @AfterClass
-  public static void tearDownAfterClass() {
-    GROUP.shutdownGracefully();
+  public static void tearDownAfterClass() throws Exception {
+    GROUP.shutdownGracefully().get();
   }
 
   @Override
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index 786c78d5614..9608ffc17c6 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -604,7 +604,7 @@ public class ZKWatcher implements Watcher, Abortable, 
Closeable {
     LOG.debug(prefix("Received ZooKeeper Event, " + "type=" + event.getType() 
+ ", " + "state="
       + event.getState() + ", " + "path=" + event.getPath()));
     final String spanName = ZKWatcher.class.getSimpleName() + "-" + identifier;
-    zkEventProcessor.submit(TraceUtil.tracedRunnable(() -> 
processEvent(event), spanName));
+    zkEventProcessor.execute(TraceUtil.tracedRunnable(() -> 
processEvent(event), spanName));
   }
 
   // Connection management

Reply via email to