HBASE-18307 Share the same EventLoopGroup for NettyRpcServer, NettyRpcClient 
and AsyncFSWALProvider at RS side


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35170345
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35170345
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35170345

Branch: refs/heads/HBASE-14070.HLC
Commit: 351703455a091171a1abc90f250f52f0a7a0aaab
Parents: 1ddcc07
Author: zhangduo <zhang...@apache.org>
Authored: Mon Jul 10 16:33:37 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Mon Jul 10 21:00:44 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java  |  1 -
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |  7 +-
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  9 +-
 .../FanOutOneBlockAsyncDFSOutputHelper.java     | 15 ++--
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 93 ++++++++------------
 .../hbase/regionserver/HRegionServer.java       | 36 +++++---
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  9 +-
 .../wal/AsyncProtobufLogWriter.java             |  8 +-
 .../wal/SecureAsyncProtobufLogWriter.java       |  5 +-
 .../hbase/util/NettyEventLoopGroupConfig.java   | 82 +++++++++++++++++
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    | 27 ++++--
 .../hbase/wal/NettyAsyncFSWALConfigHelper.java  | 63 +++++++++++++
 .../TestFanOutOneBlockAsyncDFSOutput.java       | 42 +++++----
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java  |  8 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |  9 +-
 .../hbase/regionserver/wal/TestAsyncFSWAL.java  |  9 +-
 .../regionserver/wal/TestAsyncProtobufLog.java  |  7 +-
 .../regionserver/wal/TestAsyncWALReplay.java    |  7 +-
 18 files changed, 315 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 7d513db..8dd927e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 7fe86be..57613dc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 
 import java.io.IOException;
@@ -54,11 +55,11 @@ public final class AsyncFSOutputHelper {
    * implementation for other {@link FileSystem} which wraps around a {@link 
FSDataOutputStream}.
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean 
overwrite,
-      boolean createParent, short replication, long blockSize, final EventLoop 
eventLoop)
-      throws IOException {
+      boolean createParent, short replication, long blockSize, EventLoop 
eventLoop,
+      Class<? extends Channel> channelClass) throws IOException {
     if (fs instanceof DistributedFileSystem) {
       return 
FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
-        overwrite, createParent, replication, blockSize, eventLoop);
+        overwrite, createParent, replication, blockSize, eventLoop, 
channelClass);
     }
     final FSDataOutputStream fsOut;
     int bufferSize = 
fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index c64cdf7..9cc0ae0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -26,6 +26,8 @@ import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
@@ -37,6 +39,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import io.netty.util.concurrent.PromiseCombiner;
 
@@ -71,8 +74,6 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * An asynchronous HDFS output stream implementation which fans out data to 
datanode and only
  * supports writing file with only one block.
@@ -461,8 +462,8 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
       long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
       for (int remaining = dataLen; remaining > 0;) {
         int toWriteDataLen = Math.min(remaining, maxDataLen);
-        combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), 
nextSubPacketOffsetInBlock,
-          syncBlock));
+        combiner.add((Future<Void>) 
flushBuffer(buf.readRetainedSlice(toWriteDataLen),
+          nextSubPacketOffsetInBlock, syncBlock));
         nextSubPacketOffsetInBlock += toWriteDataLen;
         remaining -= toWriteDataLen;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 3eaacc4..d14d4d8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -46,7 +46,6 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoop;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.timeout.IdleStateEvent;
@@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.Encryptor;
-import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemLinkResolver;
 import org.apache.hadoop.fs.Path;
@@ -607,7 +605,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static List<Future<Channel>> connectToDataNodes(Configuration conf, 
DFSClient client,
       String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long 
latestGS,
-      BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
+      BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
+      Class<? extends Channel> channelClass) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
     boolean connectToDnViaHostname =
@@ -633,7 +632,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       Promise<Channel> promise = eventLoop.newPromise();
       futureList.add(promise);
       String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
-      new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
+      new Bootstrap().group(eventLoop).channel(channelClass)
           .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new 
ChannelInitializer<Channel>() {
 
             @Override
@@ -672,7 +671,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static FanOutOneBlockAsyncDFSOutput 
createOutput(DistributedFileSystem dfs, String src,
       boolean overwrite, boolean createParent, short replication, long 
blockSize,
-      EventLoop eventLoop) throws IOException {
+      EventLoop eventLoop, Class<? extends Channel> channelClass) throws 
IOException {
     Configuration conf = dfs.getConf();
     FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
     DFSClient client = dfs.getClient();
@@ -701,7 +700,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         stat.getFileId(), null);
       List<Channel> datanodeList = new ArrayList<>();
       futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 
0L, 0L,
-        PIPELINE_SETUP_CREATE, summer, eventLoop);
+        PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
       for (Future<Channel> future : futureList) {
         // fail the creation if there are connection failures since we are 
fail-fast. The upper
         // layer should retry itself if needed.
@@ -741,14 +740,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
    */
   public static FanOutOneBlockAsyncDFSOutput 
createOutput(DistributedFileSystem dfs, Path f,
       boolean overwrite, boolean createParent, short replication, long 
blockSize,
-      EventLoop eventLoop) throws IOException {
+      EventLoop eventLoop, Class<? extends Channel> channelClass) throws 
IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
       public FanOutOneBlockAsyncDFSOutput doCall(Path p)
           throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, 
replication,
-          blockSize, eventLoop);
+          blockSize, eventLoop, channelClass);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 4b06fab..fafc53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -18,20 +18,19 @@
 package org.apache.hadoop.hbase.ipc;
 
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.ServerChannel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.FixedLengthFrameDecoder;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.io.IOException;
@@ -47,11 +46,12 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import 
org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.util.JVM;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 
@@ -69,57 +69,44 @@ public class NettyRpcServer extends RpcServer {
   private final Channel serverChannel;
   private final ChannelGroup allChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
-  public NettyRpcServer(final Server server, final String name,
-      final List<BlockingServiceAndInterface> services,
-      final InetSocketAddress bindAddress, Configuration conf,
-      RpcScheduler scheduler) throws IOException {
+  public NettyRpcServer(Server server, String name, 
List<BlockingServiceAndInterface> services,
+      InetSocketAddress bindAddress, Configuration conf, RpcScheduler 
scheduler)
+      throws IOException {
     super(server, name, services, bindAddress, conf, scheduler);
     this.bindAddress = bindAddress;
-    boolean useEpoll = useEpoll(conf);
-    int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count",
-        Runtime.getRuntime().availableProcessors() / 4);
-    EventLoopGroup bossGroup = null;
-    EventLoopGroup workerGroup = null;
-    if (useEpoll) {
-      bossGroup = new EpollEventLoopGroup(1);
-      workerGroup = new EpollEventLoopGroup(workerCount);
+    EventLoopGroup eventLoopGroup;
+    Class<? extends ServerChannel> channelClass;
+    if (server instanceof HRegionServer) {
+      NettyEventLoopGroupConfig config = ((HRegionServer) 
server).getEventLoopGroupConfig();
+      eventLoopGroup = config.group();
+      channelClass = config.serverChannelClass();
     } else {
-      bossGroup = new NioEventLoopGroup(1);
-      workerGroup = new NioEventLoopGroup(workerCount);
+      eventLoopGroup = new NioEventLoopGroup(0,
+          new DefaultThreadFactory("NettyRpcServer", true, 
Thread.MAX_PRIORITY));
+      channelClass = NioServerSocketChannel.class;
     }
-    ServerBootstrap bootstrap = new ServerBootstrap();
-    bootstrap.group(bossGroup, workerGroup);
-    if (useEpoll) {
-      bootstrap.channel(EpollServerSocketChannel.class);
-    } else {
-      bootstrap.channel(NioServerSocketChannel.class);
-    }
-    bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
-    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
-    bootstrap.childOption(ChannelOption.ALLOCATOR,
-        PooledByteBufAllocator.DEFAULT);
-    bootstrap.childHandler(new ChannelInitializer<Channel>() {
-
-      @Override
-      protected void initChannel(Channel ch) throws Exception {
-        ChannelPipeline pipeline = ch.pipeline();
-        FixedLengthFrameDecoder preambleDecoder = new 
FixedLengthFrameDecoder(6);
-        preambleDecoder.setSingleDecode(true);
-        pipeline.addLast("preambleDecoder", preambleDecoder);
-        pipeline.addLast("preambleHandler", new 
NettyRpcServerPreambleHandler(NettyRpcServer.this));
-        pipeline.addLast("frameDecoder",
-          new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
-        pipeline.addLast("decoder", new 
NettyRpcServerRequestDecoder(allChannels, metrics));
-        pipeline.addLast("encoder", new 
NettyRpcServerResponseEncoder(metrics));
-      }
-    });
-
+    ServerBootstrap bootstrap = new 
ServerBootstrap().group(eventLoopGroup).channel(channelClass)
+        .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+        .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
+        .childHandler(new ChannelInitializer<Channel>() {
+
+          @Override
+          protected void initChannel(Channel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+            FixedLengthFrameDecoder preambleDecoder = new 
FixedLengthFrameDecoder(6);
+            preambleDecoder.setSingleDecode(true);
+            pipeline.addLast("preambleDecoder", preambleDecoder);
+            pipeline.addLast("preambleHandler",
+              new NettyRpcServerPreambleHandler(NettyRpcServer.this));
+            pipeline.addLast("frameDecoder",
+              new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, 
true));
+            pipeline.addLast("decoder", new 
NettyRpcServerRequestDecoder(allChannels, metrics));
+            pipeline.addLast("encoder", new 
NettyRpcServerResponseEncoder(metrics));
+          }
+        });
     try {
       serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
-      LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress()
-          + ", hbase.netty.rpc.server.worker.count=" + workerCount
-          + ", useEpoll=" + useEpoll);
-      allChannels.add(serverChannel);
+      LOG.info("NettyRpcServer bind to address=" + 
serverChannel.localAddress());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
     }
@@ -127,14 +114,6 @@ public class NettyRpcServer extends RpcServer {
     this.scheduler.init(new RpcSchedulerContext(this));
   }
 
-  private static boolean useEpoll(Configuration conf) {
-    // Config to enable native transport.
-    boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
-        true);
-    // Use the faster native epoll transport mechanism on linux if enabled
-    return epollEnabled && JVM.isLinux() && JVM.isAmd64();
-  }
-
   @Override
   public synchronized void start() {
     if (started) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3593ce6..986d6d4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -36,8 +40,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -106,7 +110,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -137,7 +141,11 @@ import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.*;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -153,9 +161,6 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@@ -179,12 +184,14 @@ import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JSONBean;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
@@ -204,10 +211,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -526,6 +529,8 @@ public class HRegionServer extends HasThread implements
 
   protected FileSystemUtilizationChore fsUtilizationChore;
 
+  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
+
   /**
    * Starts a HRegionServer at the default location.
    */
@@ -541,6 +546,13 @@ public class HRegionServer extends HasThread implements
     super("RegionServer");  // thread name
     this.fsOk = true;
     this.conf = conf;
+    // initialize netty event loop group at the very beginning as we may use 
it to start rpc server,
+    // rpc client and WAL.
+    this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, 
"RS-EventLoopGroup");
+    NettyRpcClientConfigHelper.setEventLoopConfig(conf, 
eventLoopGroupConfig.group(),
+      eventLoopGroupConfig.clientChannelClass());
+    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, 
eventLoopGroupConfig.group(),
+      eventLoopGroupConfig.clientChannelClass());
     MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
     HFile.checkHFileVersion(this.conf);
     checkCodecs(this.conf);
@@ -3740,4 +3752,8 @@ public class HRegionServer extends HasThread implements
   public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
     return this.rsSpaceQuotaManager;
   }
+
+  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
+    return eventLoopGroupConfig;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 69ca1c5..997591b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -24,6 +24,7 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.Sequencer;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.util.concurrent.SingleThreadEventExecutor;
 
@@ -144,6 +145,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final EventLoop eventLoop;
 
+  private final Class<? extends Channel> channelClass;
+
   private final Lock consumeLock = new ReentrantLock();
 
   private final Runnable consumer = this::consume;
@@ -192,10 +195,11 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
 
   public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
-      String prefix, String suffix, EventLoop eventLoop)
+      String prefix, String suffix, EventLoop eventLoop, Class<? extends 
Channel> channelClass)
       throws FailedLogCloseException, IOException {
     super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, 
prefix, suffix);
     this.eventLoop = eventLoop;
+    this.channelClass = channelClass;
     Supplier<Boolean> hasConsumerTask;
     if (eventLoop instanceof SingleThreadEventExecutor) {
 
@@ -607,7 +611,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     boolean overwrite = false;
     for (int retry = 0;; retry++) {
       try {
-        return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, 
eventLoop);
+        return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, 
eventLoop,
+          channelClass);
       } catch (RemoteException e) {
         LOG.warn("create wal log writer " + path + " failed, retry = " + 
retry, e);
         if (shouldRetryCreate(e)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e1f7b8f..f020d25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import com.google.common.base.Throwables;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 
 import java.io.IOException;
@@ -54,6 +55,8 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   private final EventLoop eventLoop;
 
+  private final Class<? extends Channel> channelClass;
+
   private AsyncFSOutput output;
 
   private static final class OutputStreamWrapper extends OutputStream
@@ -99,8 +102,9 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   private OutputStream asyncOutputWrapper;
 
-  public AsyncProtobufLogWriter(EventLoop eventLoop) {
+  public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> 
channelClass) {
     this.eventLoop = eventLoop;
+    this.channelClass = channelClass;
   }
 
   @Override
@@ -151,7 +155,7 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
       short replication, long blockSize) throws IOException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, 
false, replication,
-      blockSize, eventLoop);
+      blockSize, eventLoop, channelClass);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
index 5a54e98..22c8aa8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.crypto.Encryptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@@ -32,8 +33,8 @@ public class SecureAsyncProtobufLogWriter extends 
AsyncProtobufLogWriter {
 
   private Encryptor encryptor = null;
 
-  public SecureAsyncProtobufLogWriter(EventLoop eventLoop) {
-    super(eventLoop);
+  public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends 
Channel> channelClass) {
+    super(eventLoop, channelClass);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
new file mode 100644
index 0000000..30caf72
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Event loop group related config.
+ */
+@InterfaceAudience.Private
+public class NettyEventLoopGroupConfig {
+
+  private final EventLoopGroup group;
+
+  private final Class<? extends ServerChannel> serverChannelClass;
+
+  private final Class<? extends Channel> clientChannelClass;
+
+  private static boolean useEpoll(Configuration conf) {
+    // Config to enable native transport.
+    boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", 
true);
+    // Use the faster native epoll transport mechanism on linux if enabled
+    return epollEnabled && JVM.isLinux() && JVM.isAmd64();
+  }
+
+  public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
+    boolean useEpoll = useEpoll(conf);
+    int workerCount = conf.getInt("hbase.netty.worker.count", 0);
+    ThreadFactory eventLoopThreadFactory =
+        new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
+    if (useEpoll) {
+      group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory);
+      serverChannelClass = EpollServerSocketChannel.class;
+      clientChannelClass = EpollSocketChannel.class;
+    } else {
+      group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory);
+      serverChannelClass = NioServerSocketChannel.class;
+      clientChannelClass = NioSocketChannel.class;
+    }
+  }
+
+  public EventLoopGroup group() {
+    return group;
+  }
+
+  public Class<? extends ServerChannel> serverChannelClass() {
+    return serverChannelClass;
+  }
+
+  public Class<? extends Channel> clientChannelClass() {
+    return clientChannelClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 786f58a..2efa96d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.wal;
 
 import com.google.common.base.Throwables;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
 
@@ -36,7 +39,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * A WAL provider that use {@link AsyncFSWAL}.
@@ -52,31 +55,43 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
     void init(FileSystem fs, Path path, Configuration c, boolean overwritable) 
throws IOException;
   }
 
-  private EventLoopGroup eventLoopGroup = null;
+  private EventLoopGroup eventLoopGroup;
 
+  private Class<? extends Channel> channelClass;
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
     return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), 
FSUtils.getWALRootDir(conf),
         getWALDirectoryName(factory.factoryId), 
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
         true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? 
META_WAL_PROVIDER_ID : null,
-        eventLoopGroup.next());
+        eventLoopGroup.next(), channelClass);
   }
 
   @Override
   protected void doInit(Configuration conf) throws IOException {
-    eventLoopGroup = new NioEventLoopGroup(1, 
Threads.newDaemonThreadFactory("AsyncFSWAL"));
+    Pair<EventLoopGroup, Class<? extends Channel>> 
eventLoopGroupAndChannelClass =
+        NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
+    if (eventLoopGroupAndChannelClass != null) {
+      eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
+      channelClass = eventLoopGroupAndChannelClass.getSecond();
+    } else {
+      eventLoopGroup = new NioEventLoopGroup(1,
+          new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
+      channelClass = NioSocketChannel.class;
+    }
   }
 
   /**
    * public because of AsyncFSWAL. Should be package-private
    */
   public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem 
fs, Path path,
-      boolean overwritable, EventLoop eventLoop) throws IOException {
+      boolean overwritable, EventLoop eventLoop, Class<? extends Channel> 
channelClass)
+      throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends AsyncWriter> logWriterClass = conf.getClass(
       "hbase.regionserver.hlog.async.writer.impl", 
AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
-      AsyncWriter writer = 
logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
+      AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, 
Class.class)
+          .newInstance(eventLoop, channelClass);
       writer.init(fs, path, conf, overwritable);
       return writer;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
new file mode 100644
index 0000000..273fc37
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper class for passing netty event loop config to {@link 
AsyncFSWALProvider}.
+ */
+public class NettyAsyncFSWALConfigHelper {
+
+  private static final String EVENT_LOOP_CONFIG = 
"hbase.wal.async.event-loop.config";
+
+  private static final String CONFIG_NAME = "global-event-loop";
+
+  private static final Map<String, Pair<EventLoopGroup, Class<? extends 
Channel>>> EVENT_LOOP_CONFIG_MAP =
+      new HashMap<>();
+
+  /**
+   * Set the EventLoopGroup and channel class for {@code AsyncFSWALProvider}.
+   */
+  public static void setEventLoopConfig(Configuration conf, EventLoopGroup 
group,
+      Class<? extends Channel> channelClass) {
+    Preconditions.checkNotNull(group, "group is null");
+    Preconditions.checkNotNull(channelClass, "channel class is null");
+    conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
+    EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
+      Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, 
channelClass));
+  }
+
+  static Pair<EventLoopGroup, Class<? extends Channel>> 
getEventLoopConfig(Configuration conf) {
+    String name = conf.get(EVENT_LOOP_CONFIG);
+    if (StringUtils.isBlank(name)) {
+      return null;
+    }
+    return EVENT_LOOP_CONFIG_MAP.get(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index f59133a..43a279e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -18,14 +18,17 @@
 package org.apache.hadoop.hbase.io.asyncfs;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -64,6 +67,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
 
   private static EventLoopGroup EVENT_LOOP_GROUP;
 
+  private static Class<? extends Channel> CHANNEL_CLASS;
+
   private static int READ_TIMEOUT_MS = 2000;
 
   @Rule
@@ -75,6 +80,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     TEST_UTIL.startMiniDFSCluster(3);
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
+    CHANNEL_CLASS = NioSocketChannel.class;
   }
 
   @AfterClass
@@ -91,9 +97,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     // will fail.
     for (;;) {
       try {
-        FanOutOneBlockAsyncDFSOutput out =
-            FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new 
Path("/ensureDatanodeAlive"),
-              true, true, (short) 3, FS.getDefaultBlockSize(), 
EVENT_LOOP_GROUP.next());
+        FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+          new Path("/ensureDatanodeAlive"), true, true, (short) 3, 
FS.getDefaultBlockSize(),
+          EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
         out.close();
         break;
       } catch (IOException e) {
@@ -122,8 +128,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void test() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
     writeAndVerify(eventLoop, FS, f, out);
   }
 
@@ -131,8 +137,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void testMaxByteBufAllocated() throws Exception {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
     out.guess(5 * 1024);
     assertEquals(8 * 1024, out.guess(5 * 1024));
     assertEquals(16 * 1024, out.guess(10 * 1024));
@@ -146,9 +152,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void testRecover() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
-    final byte[] b = new byte[10];
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+    byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b, 0, b.length);
     out.flush(false).get();
@@ -179,8 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void testHeartbeat() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
     writeAndVerify(eventLoop, FS, f, out);
@@ -195,11 +201,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     try {
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-        FS.getDefaultBlockSize(), eventLoop);
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
       fail("should fail with parent does not exist");
     } catch (RemoteException e) {
       LOG.info("expected exception caught", e);
-      assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
+      assertThat(e.unwrapRemoteException(), 
instanceOf(FileNotFoundException.class));
     }
   }
 
@@ -220,7 +226,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       EventLoop eventLoop = EVENT_LOOP_GROUP.next();
       try {
         FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-          FS.getDefaultBlockSize(), eventLoop);
+          FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
         fail("should fail with connection error");
       } catch (IOException e) {
         LOG.info("expected exception caught", e);
@@ -239,8 +245,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void testWriteLargeChunk() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
     byte[] b = new byte[50 * 1024 * 1024];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b);

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 6bd2d3c..4da778e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io.asyncfs;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
@@ -31,8 +33,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
-import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.AfterClass;
@@ -44,6 +44,8 @@ public class TestLocalAsyncOutput {
 
   private static EventLoopGroup GROUP = new NioEventLoopGroup();
 
+  private static Class<? extends Channel> CHANNEL_CLASS = 
NioSocketChannel.class;
+
   private static final HBaseCommonTestingUtility TEST_UTIL = new 
HBaseCommonTestingUtility();
 
   @AfterClass
@@ -57,7 +59,7 @@ public class TestLocalAsyncOutput {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
-      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), 
CHANNEL_CLASS);
     byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b);

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index e05d869..7e67a90 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -31,9 +31,11 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.File;
 import java.io.IOException;
@@ -83,6 +85,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private static EventLoopGroup EVENT_LOOP_GROUP;
 
+  private static Class<? extends Channel> CHANNEL_CLASS;
+
   private static int READ_TIMEOUT_MS = 200000;
 
   private static final File KEYTAB_FILE =
@@ -166,6 +170,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
+    CHANNEL_CLASS = NioSocketChannel.class;
     TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
READ_TIMEOUT_MS);
     KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
     USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
@@ -242,8 +247,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   public void test() throws IOException, InterruptedException, 
ExecutionException {
     Path f = getTestFile();
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
-      true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop);
+    FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
+      false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index a55df68..9b28975 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,9 +43,12 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
 
   private static EventLoopGroup GROUP;
 
+  private static Class<? extends Channel> CHANNEL_CLASS;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     GROUP = new NioEventLoopGroup(1, 
Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
+    CHANNEL_CLASS = NioSocketChannel.class;
     AbstractTestFSWAL.setUpBeforeClass();
   }
 
@@ -58,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
       String prefix, String suffix) throws IOException {
     return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP.next());
+        suffix, GROUP.next(), CHANNEL_CLASS);
   }
 
   @Override
@@ -67,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
       boolean failIfWALExists, String prefix, String suffix, final Runnable 
action)
       throws IOException {
     return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP.next()) {
+        suffix, GROUP.next(), CHANNEL_CLASS) {
 
       @Override
       void atHeadOfRingBufferEventHandlerAppend() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
index 72fc4b2..a689775 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import com.google.common.base.Throwables;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -42,9 +44,12 @@ public class TestAsyncProtobufLog extends 
AbstractTestProtobufLog<WALProvider.As
 
   private static EventLoopGroup EVENT_LOOP_GROUP;
 
+  private static Class<? extends Channel> CHANNEL_CLASS;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
+    CHANNEL_CLASS = NioSocketChannel.class;
     AbstractTestProtobufLog.setUpBeforeClass();
   }
 
@@ -57,7 +62,7 @@ public class TestAsyncProtobufLog extends 
AbstractTestProtobufLog<WALProvider.As
   @Override
   protected AsyncWriter createWriter(Path path) throws IOException {
     return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), 
fs, path, false,
-      EVENT_LOOP_GROUP.next());
+      EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
index e008b37..17f58f8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
 
@@ -40,9 +42,12 @@ public class TestAsyncWALReplay extends 
AbstractTestWALReplay {
 
   private static EventLoopGroup GROUP;
 
+  private static Class<? extends Channel> CHANNEL_CLASS;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     GROUP = new NioEventLoopGroup(1, 
Threads.newDaemonThreadFactory("TestAsyncWALReplay"));
+    CHANNEL_CLASS = NioSocketChannel.class;
     Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
     conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
     AbstractTestWALReplay.setUpBeforeClass();
@@ -57,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay 
{
   @Override
   protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 
throws IOException {
     return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
-        HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, 
GROUP.next());
+        HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, 
GROUP.next(), CHANNEL_CLASS);
   }
 }

Reply via email to