[ https://issues.apache.org/jira/browse/HBASE-21973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Balazs Meszaros updated HBASE-21973: ------------------------------------ Component/s: (was: hbase-connectors) master > NettyRpcServer performance improvement based on Netty > ----------------------------------------------------- > > Key: HBASE-21973 > URL: https://issues.apache.org/jira/browse/HBASE-21973 > Project: HBase > Issue Type: Improvement > Components: master > Affects Versions: 2.1.0, 2.0.0, 2.0.1, 2.1.1, 2.0.2, 2.0.3, 2.1.2, 2.0.4, > 2.1.3 > Reporter: Nicholas Jiang > Priority: Major > > In NettyRpcServer#NettyRpcServer constructor method, we have the following: > {code:java} > ServerBootstrap bootstrap = new > ServerBootstrap().group(eventLoopGroup).channel(channelClass) > .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) > .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) > .childHandler(new ChannelInitializer<Channel>() { > @Override > protected void initChannel(Channel ch) throws Exception { > ChannelPipeline pipeline = ch.pipeline(); > FixedLengthFrameDecoder preambleDecoder = new > FixedLengthFrameDecoder(6); > preambleDecoder.setSingleDecode(true); > pipeline.addLast("preambleDecoder", preambleDecoder); > pipeline.addLast("preambleHandler", > createNettyRpcServerPreambleHandler()); > pipeline.addLast("frameDecoder", new > NettyRpcFrameDecoder(maxRequestSize)); > pipeline.addLast("decoder", new > NettyRpcServerRequestDecoder(allChannels, metrics)); > pipeline.addLast("encoder", new > NettyRpcServerResponseEncoder(metrics)); > } > }); > try { > serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); > LOG.info("Bind to {}", serverChannel.localAddress()); > } catch (InterruptedException e) { > throw new InterruptedIOException(e.getMessage()); > } > {code} > In build ServerBootstrap, we would configure ServerSocketChannel options and > SocketChannel child options to improve rpc perfermance.These options and > child options are as follows: > {code:java} > .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog()) > .option(ChannelOption.SO_REUSEADDR, > transportConfig.isReuseAddr()) > .option(ChannelOption.RCVBUF_ALLOCATOR, > NettyHelper.getRecvByteBufAllocator()) > .option(ChannelOption.ALLOCATOR, > NettyHelper.getByteBufAllocator()) > .childOption(ChannelOption.SO_KEEPALIVE, > transportConfig.isKeepAlive()) > .childOption(ChannelOption.TCP_NODELAY, > transportConfig.isTcpNoDelay()) > .childOption(ChannelOption.SO_RCVBUF, 8192 * 128) > .childOption(ChannelOption.SO_SNDBUF, 8192 * 128) > .childOption(ChannelOption.ALLOCATOR, > NettyHelper.getByteBufAllocator()) > .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new > WriteBufferWaterMark( > transportConfig.getBufferMin(), > transportConfig.getBufferMax())) > {code} > What's more,ChannelPipeline includes NettyRpcFrameDecoder,this decorder > extends ByteToMessageDecoder.ChannelPipeline is as follows: > {code:java} > .childHandler(new ChannelInitializer<Channel>() { > @Override > protected void initChannel(Channel ch) throws Exception { > ChannelPipeline pipeline = ch.pipeline(); > FixedLengthFrameDecoder preambleDecoder = new > FixedLengthFrameDecoder(6); > preambleDecoder.setSingleDecode(true); > pipeline.addLast("preambleDecoder", preambleDecoder); > pipeline.addLast("preambleHandler", > createNettyRpcServerPreambleHandler()); > pipeline.addLast("frameDecoder", new > NettyRpcFrameDecoder(maxRequestSize)); > pipeline.addLast("decoder", new > NettyRpcServerRequestDecoder(allChannels, metrics)); > pipeline.addLast("encoder", new > NettyRpcServerResponseEncoder(metrics)); > } > }); > {code} > Netty provides a convenient decoding tool class ByteToMessageDecoder , this > class has accumulate bulk unpacking ability, can read bytes from the socket > as much as possible, then synchronously call the decode method, decode the > business object, and form a List. Finally, the traversal traverses the List > and submits it to ChannelPipeline for processing. > Here we can make a small change, submit the submitted content from a single > command to the entire List, which can reduce the number of pipeline execution > and increase throughput. This mode has no advantage in low-concurrency > scenarios, and has a significant performance boost in boost throughput in > high-concurrency scenarios. > Will provide an patch and some perf-comparison for this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)