HBASE-18307 Share the same EventLoopGroup for NettyRpcServer, NettyRpcClient and AsyncFSWALProvider at RS side
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35170345 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35170345 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35170345 Branch: refs/heads/HBASE-18147 Commit: 351703455a091171a1abc90f250f52f0a7a0aaab Parents: 1ddcc07 Author: zhangduo <zhang...@apache.org> Authored: Mon Jul 10 16:33:37 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Mon Jul 10 21:00:44 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 1 - .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 7 +- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 9 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 15 ++-- .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 93 ++++++++------------ .../hbase/regionserver/HRegionServer.java | 36 +++++--- .../hbase/regionserver/wal/AsyncFSWAL.java | 9 +- .../wal/AsyncProtobufLogWriter.java | 8 +- .../wal/SecureAsyncProtobufLogWriter.java | 5 +- .../hbase/util/NettyEventLoopGroupConfig.java | 82 +++++++++++++++++ .../hadoop/hbase/wal/AsyncFSWALProvider.java | 27 ++++-- .../hbase/wal/NettyAsyncFSWALConfigHelper.java | 63 +++++++++++++ .../TestFanOutOneBlockAsyncDFSOutput.java | 42 +++++---- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 8 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 9 +- .../hbase/regionserver/wal/TestAsyncFSWAL.java | 9 +- .../regionserver/wal/TestAsyncProtobufLog.java | 7 +- .../regionserver/wal/TestAsyncWALReplay.java | 7 +- 18 files changed, 315 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 7d513db..8dd927e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.classification.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 7fe86be..57613dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import java.io.IOException; @@ -54,11 +55,11 @@ public final class AsyncFSOutputHelper { * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, - boolean createParent, short replication, long blockSize, final EventLoop eventLoop) - throws IOException { + boolean createParent, short replication, long blockSize, EventLoop eventLoop, + Class<? extends Channel> channelClass) throws IOException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoop); + overwrite, createParent, replication, blockSize, eventLoop, channelClass); } final FSDataOutputStream fsOut; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index c64cdf7..9cc0ae0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import com.google.common.annotations.VisibleForTesting; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -37,6 +39,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseCombiner; @@ -71,8 +74,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.util.DataChecksum; -import com.google.common.annotations.VisibleForTesting; - /** * An asynchronous HDFS output stream implementation which fans out data to datanode and only * supports writing file with only one block. @@ -461,8 +462,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; for (int remaining = dataLen; remaining > 0;) { int toWriteDataLen = Math.min(remaining, maxDataLen); - combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock, - syncBlock)); + combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen), + nextSubPacketOffsetInBlock, syncBlock)); nextSubPacketOffsetInBlock += toWriteDataLen; remaining -= toWriteDataLen; } http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 3eaacc4..d14d4d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -46,7 +46,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.timeout.IdleStateEvent; @@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.Encryptor; -import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.Path; @@ -607,7 +605,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, - BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { + BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop, + Class<? extends Channel> channelClass) { Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = @@ -633,7 +632,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { Promise<Channel> promise = eventLoop.newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); - new Bootstrap().group(eventLoop).channel(NioSocketChannel.class) + new Bootstrap().group(eventLoop).channel(channelClass) .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { @Override @@ -672,7 +671,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoop eventLoop) throws IOException { + EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException { Configuration conf = dfs.getConf(); FSUtils fsUtils = FSUtils.getInstance(dfs, conf); DFSClient client = dfs.getClient(); @@ -701,7 +700,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { stat.getFileId(), null); List<Channel> datanodeList = new ArrayList<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, - PIPELINE_SETUP_CREATE, summer, eventLoop); + PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass); for (Future<Channel> future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. @@ -741,14 +740,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { */ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoop eventLoop) throws IOException { + EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoop); + blockSize, eventLoop, channelClass); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 4b06fab..fafc53f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -18,20 +18,19 @@ package org.apache.hadoop.hbase.ipc; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.ServerChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; @@ -47,11 +46,12 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.util.JVM; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -69,57 +69,44 @@ public class NettyRpcServer extends RpcServer { private final Channel serverChannel; private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - public NettyRpcServer(final Server server, final String name, - final List<BlockingServiceAndInterface> services, - final InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) throws IOException { + public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services, + InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) + throws IOException { super(server, name, services, bindAddress, conf, scheduler); this.bindAddress = bindAddress; - boolean useEpoll = useEpoll(conf); - int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count", - Runtime.getRuntime().availableProcessors() / 4); - EventLoopGroup bossGroup = null; - EventLoopGroup workerGroup = null; - if (useEpoll) { - bossGroup = new EpollEventLoopGroup(1); - workerGroup = new EpollEventLoopGroup(workerCount); + EventLoopGroup eventLoopGroup; + Class<? extends ServerChannel> channelClass; + if (server instanceof HRegionServer) { + NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig(); + eventLoopGroup = config.group(); + channelClass = config.serverChannelClass(); } else { - bossGroup = new NioEventLoopGroup(1); - workerGroup = new NioEventLoopGroup(workerCount); + eventLoopGroup = new NioEventLoopGroup(0, + new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY)); + channelClass = NioServerSocketChannel.class; } - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bossGroup, workerGroup); - if (useEpoll) { - bootstrap.channel(EpollServerSocketChannel.class); - } else { - bootstrap.channel(NioServerSocketChannel.class); - } - bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay); - bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive); - bootstrap.childOption(ChannelOption.ALLOCATOR, - PooledByteBufAllocator.DEFAULT); - bootstrap.childHandler(new ChannelInitializer<Channel>() { - - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); - preambleDecoder.setSingleDecode(true); - pipeline.addLast("preambleDecoder", preambleDecoder); - pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this)); - pipeline.addLast("frameDecoder", - new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true)); - pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); - pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); - } - }); - + ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) + .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) + .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) + .childHandler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); + preambleDecoder.setSingleDecode(true); + pipeline.addLast("preambleDecoder", preambleDecoder); + pipeline.addLast("preambleHandler", + new NettyRpcServerPreambleHandler(NettyRpcServer.this)); + pipeline.addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true)); + pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); + pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); + } + }); try { serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); - LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress() - + ", hbase.netty.rpc.server.worker.count=" + workerCount - + ", useEpoll=" + useEpoll); - allChannels.add(serverChannel); + LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } @@ -127,14 +114,6 @@ public class NettyRpcServer extends RpcServer { this.scheduler.init(new RpcSchedulerContext(this)); } - private static boolean useEpoll(Configuration conf) { - // Config to enable native transport. - boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport", - true); - // Use the faster native epoll transport mechanism on linux if enabled - return epollEnabled && JVM.isLinux() && JVM.isAmd64(); - } - @Override public synchronized void start() { if (started) { http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3593ce6..986d6d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.Thread.UncaughtExceptionHandler; @@ -36,8 +40,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -106,7 +110,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -137,7 +141,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.*; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; @@ -153,9 +161,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; @@ -179,12 +184,14 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JvmPauseMonitor; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; @@ -204,10 +211,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - import sun.misc.Signal; import sun.misc.SignalHandler; @@ -526,6 +529,8 @@ public class HRegionServer extends HasThread implements protected FileSystemUtilizationChore fsUtilizationChore; + private final NettyEventLoopGroupConfig eventLoopGroupConfig; + /** * Starts a HRegionServer at the default location. */ @@ -541,6 +546,13 @@ public class HRegionServer extends HasThread implements super("RegionServer"); // thread name this.fsOk = true; this.conf = conf; + // initialize netty event loop group at the very beginning as we may use it to start rpc server, + // rpc client and WAL. + this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); + NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), + eventLoopGroupConfig.clientChannelClass()); + NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(), + eventLoopGroupConfig.clientChannelClass()); MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); HFile.checkHFileVersion(this.conf); checkCodecs(this.conf); @@ -3740,4 +3752,8 @@ public class HRegionServer extends HasThread implements public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { return this.rsSpaceQuotaManager; } + + public NettyEventLoopGroupConfig getEventLoopGroupConfig() { + return eventLoopGroupConfig; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 69ca1c5..997591b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -24,6 +24,7 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequencer; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.util.concurrent.SingleThreadEventExecutor; @@ -144,6 +145,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { private final EventLoop eventLoop; + private final Class<? extends Channel> channelClass; + private final Lock consumeLock = new ReentrantLock(); private final Runnable consumer = this::consume; @@ -192,10 +195,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, - String prefix, String suffix, EventLoop eventLoop) + String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoop = eventLoop; + this.channelClass = channelClass; Supplier<Boolean> hasConsumerTask; if (eventLoop instanceof SingleThreadEventExecutor) { @@ -607,7 +611,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { boolean overwrite = false; for (int retry = 0;; retry++) { try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop); + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop, + channelClass); } catch (RemoteException e) { LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); if (shouldRetryCreate(e)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e1f7b8f..f020d25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.base.Throwables; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import java.io.IOException; @@ -54,6 +55,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private final EventLoop eventLoop; + private final Class<? extends Channel> channelClass; + private AsyncFSOutput output; private static final class OutputStreamWrapper extends OutputStream @@ -99,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private OutputStream asyncOutputWrapper; - public AsyncProtobufLogWriter(EventLoop eventLoop) { + public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { this.eventLoop = eventLoop; + this.channelClass = channelClass; } @Override @@ -151,7 +155,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoop); + blockSize, eventLoop, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java index 5a54e98..22c8aa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.crypto.Encryptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @@ -32,8 +33,8 @@ public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter { private Encryptor encryptor = null; - public SecureAsyncProtobufLogWriter(EventLoop eventLoop) { - super(eventLoop); + public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { + super(eventLoop, channelClass); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java new file mode 100644 index 0000000..30caf72 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/NettyEventLoopGroupConfig.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Event loop group related config. + */ +@InterfaceAudience.Private +public class NettyEventLoopGroupConfig { + + private final EventLoopGroup group; + + private final Class<? extends ServerChannel> serverChannelClass; + + private final Class<? extends Channel> clientChannelClass; + + private static boolean useEpoll(Configuration conf) { + // Config to enable native transport. + boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", true); + // Use the faster native epoll transport mechanism on linux if enabled + return epollEnabled && JVM.isLinux() && JVM.isAmd64(); + } + + public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) { + boolean useEpoll = useEpoll(conf); + int workerCount = conf.getInt("hbase.netty.worker.count", 0); + ThreadFactory eventLoopThreadFactory = + new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY); + if (useEpoll) { + group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory); + serverChannelClass = EpollServerSocketChannel.class; + clientChannelClass = EpollSocketChannel.class; + } else { + group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory); + serverChannelClass = NioServerSocketChannel.class; + clientChannelClass = NioSocketChannel.class; + } + } + + public EventLoopGroup group() { + return group; + } + + public Class<? extends ServerChannel> serverChannelClass() { + return serverChannelClass; + } + + public Class<? extends Channel> clientChannelClass() { + return clientChannelClass; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 786f58a..2efa96d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.wal; import com.google.common.base.Throwables; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; @@ -36,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Pair; /** * A WAL provider that use {@link AsyncFSWAL}. @@ -52,31 +55,43 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; } - private EventLoopGroup eventLoopGroup = null; + private EventLoopGroup eventLoopGroup; + private Class<? extends Channel> channelClass; @Override protected AsyncFSWAL createWAL() throws IOException { return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, - eventLoopGroup.next()); + eventLoopGroup.next(), channelClass); } @Override protected void doInit(Configuration conf) throws IOException { - eventLoopGroup = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("AsyncFSWAL")); + Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + if (eventLoopGroupAndChannelClass != null) { + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); + } else { + eventLoopGroup = new NioEventLoopGroup(1, + new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY)); + channelClass = NioSocketChannel.class; + } } /** * public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoop eventLoop) throws IOException { + boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass) + throws IOException { // Configuration already does caching for the Class lookup. Class<? extends AsyncWriter> logWriterClass = conf.getClass( "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); try { - AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop); + AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class) + .newInstance(eventLoop, channelClass); writer.init(fs, path, conf, overwritable); return writer; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java new file mode 100644 index 0000000..273fc37 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NettyAsyncFSWALConfigHelper.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Helper class for passing netty event loop config to {@link AsyncFSWALProvider}. + */ +public class NettyAsyncFSWALConfigHelper { + + private static final String EVENT_LOOP_CONFIG = "hbase.wal.async.event-loop.config"; + + private static final String CONFIG_NAME = "global-event-loop"; + + private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>> EVENT_LOOP_CONFIG_MAP = + new HashMap<>(); + + /** + * Set the EventLoopGroup and channel class for {@code AsyncFSWALProvider}. + */ + public static void setEventLoopConfig(Configuration conf, EventLoopGroup group, + Class<? extends Channel> channelClass) { + Preconditions.checkNotNull(group, "group is null"); + Preconditions.checkNotNull(channelClass, "channel class is null"); + conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME); + EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME, + Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass)); + } + + static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) { + String name = conf.get(EVENT_LOOP_CONFIG); + if (StringUtils.isBlank(name)) { + return null; + } + return EVENT_LOOP_CONFIG_MAP.get(name); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index f59133a..43a279e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -18,14 +18,17 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.FileNotFoundException; import java.io.IOException; @@ -64,6 +67,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { private static EventLoopGroup EVENT_LOOP_GROUP; + private static Class<? extends Channel> CHANNEL_CLASS; + private static int READ_TIMEOUT_MS = 2000; @Rule @@ -75,6 +80,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; } @AfterClass @@ -91,9 +97,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { // will fail. for (;;) { try { - FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), - true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next()); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, + new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); out.close(); break; } catch (IOException e) { @@ -122,8 +128,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); writeAndVerify(eventLoop, FS, f, out); } @@ -131,8 +137,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testMaxByteBufAllocated() throws Exception { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); out.guess(5 * 1024); assertEquals(8 * 1024, out.guess(5 * 1024)); assertEquals(16 * 1024, out.guess(10 * 1024)); @@ -146,9 +152,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); - final byte[] b = new byte[10]; + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); out.flush(false).get(); @@ -179,8 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(eventLoop, FS, f, out); @@ -195,11 +201,11 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); - assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); + assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class)); } } @@ -220,7 +226,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); fail("should fail with connection error"); } catch (IOException e) { LOG.info("expected exception caught", e); @@ -239,8 +245,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 3, 1024 * 1024 * 1024, eventLoop); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 6bd2d3c..4da778e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -31,8 +33,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; -import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.AfterClass; @@ -44,6 +44,8 @@ public class TestLocalAsyncOutput { private static EventLoopGroup GROUP = new NioEventLoopGroup(); + private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class; + private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); @AfterClass @@ -57,7 +59,7 @@ public class TestLocalAsyncOutput { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next()); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b); http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index e05d869..7e67a90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -31,9 +31,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; +import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.File; import java.io.IOException; @@ -83,6 +85,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { private static EventLoopGroup EVENT_LOOP_GROUP; + private static Class<? extends Channel> CHANNEL_CLASS; + private static int READ_TIMEOUT_MS = 200000; private static final File KEYTAB_FILE = @@ -166,6 +170,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); @@ -242,8 +247,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { public void test() throws IOException, InterruptedException, ExecutionException { Path f = getTestFile(); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, - true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, + false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index a55df68..9b28975 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.util.List; @@ -41,9 +43,12 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { private static EventLoopGroup GROUP; + private static Class<? extends Channel> CHANNEL_CLASS; + @BeforeClass public static void setUpBeforeClass() throws Exception { GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); + CHANNEL_CLASS = NioSocketChannel.class; AbstractTestFSWAL.setUpBeforeClass(); } @@ -58,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next()); + suffix, GROUP.next(), CHANNEL_CLASS); } @Override @@ -67,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { boolean failIfWALExists, String prefix, String suffix, final Runnable action) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next()) { + suffix, GROUP.next(), CHANNEL_CLASS) { @Override void atHeadOfRingBufferEventHandlerAppend() { http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 72fc4b2..a689775 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.base.Throwables; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.io.InterruptedIOException; @@ -42,9 +44,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As private static EventLoopGroup EVENT_LOOP_GROUP; + private static Class<? extends Channel> CHANNEL_CLASS; + @BeforeClass public static void setUpBeforeClass() throws Exception { EVENT_LOOP_GROUP = new NioEventLoopGroup(); + CHANNEL_CLASS = NioSocketChannel.class; AbstractTestProtobufLog.setUpBeforeClass(); } @@ -57,7 +62,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As @Override protected AsyncWriter createWriter(Path path) throws IOException { return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false, - EVENT_LOOP_GROUP.next()); + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/35170345/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index e008b37..17f58f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; @@ -40,9 +42,12 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { private static EventLoopGroup GROUP; + private static Class<? extends Channel> CHANNEL_CLASS; + @BeforeClass public static void setUpBeforeClass() throws Exception { GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); + CHANNEL_CLASS = NioSocketChannel.class; Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); AbstractTestWALReplay.setUpBeforeClass(); @@ -57,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @Override protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, - HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next()); + HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS); } }