Add javadoc to NettyRemotingAbstract class and several other trivial clean up.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6609c866 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6609c866 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6609c866 Branch: refs/heads/release-4.1.0-incubating Commit: 6609c86650917ebfb5bd12a4bd8b1bcf9c477759 Parents: 6a9628b Author: Zhanhui Li <lizhan...@apache.org> Authored: Tue Apr 25 22:58:14 2017 +0800 Committer: Zhanhui Li <lizhan...@apache.org> Committed: Tue Apr 25 22:58:14 2017 +0800 ---------------------------------------------------------------------- .../remoting/netty/NettyRemotingAbstract.java | 93 ++++++++++++++++++-- .../remoting/netty/NettyRemotingClient.java | 19 ++-- .../remoting/netty/NettyRemotingServer.java | 22 ++--- 3 files changed, 106 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 83eeb02..cddab3d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -48,32 +48,84 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class NettyRemotingAbstract { + + /** + * Remoting logger instance. + */ private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + /** + * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint. + */ protected final Semaphore semaphoreOneway; + /** + * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint. + */ protected final Semaphore semaphoreAsync; + /** + * This map caches all on-going requests. + */ protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256); + /** + * This container holds all processors per request code, aka, for each incoming request, we may look up the + * responding processor in this map to handle the request. + */ protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); - protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter(); + /** + * Executor to feed netty events to user defined {@link ChannelEventListener}. + */ + protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); + + /** + * The default request processor to use in case there is no exact match in {@link #processorTable} per request code. + */ protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; + /** + * Constructor, specifying capacity of one-way and asynchronous semaphores. + * @param permitsOneway Number of permits for one-way requests. + * @param permitsAsync Number of permits for asynchronous requests. + */ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreAsync = new Semaphore(permitsAsync, true); } + /** + * Custom channel event listener. + * @return custom channel event listener if defined; null otherwise. + */ public abstract ChannelEventListener getChannelEventListener(); + /** + * Put a netty event to the executor. + * @param event Netty event instance. + */ public void putNettyEvent(final NettyEvent event) { - this.nettyEventExecuter.putNettyEvent(event); + this.nettyEventExecutor.putNettyEvent(event); } + /** + * Entry of incoming command processing. + * + * <p> + * <strong>Note:</strong> + * The incoming remoting command may be + * <ul> + * <li>An inquiry request from a remote peer component;</li> + * <li>A response to a previous request issued by this very participant.</li> + * </ul> + * </p> + * @param ctx Channel handler context. + * @param msg incoming remoting command. + * @throws Exception if there were any error while processing the incoming command. + */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { @@ -90,6 +142,11 @@ public abstract class NettyRemotingAbstract { } } + /** + * Process incoming request command issued by remote peer. + * @param ctx channel handler context. + * @param cmd request command. + */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; @@ -175,6 +232,11 @@ public abstract class NettyRemotingAbstract { } } + /** + * Process response from remote peer to the previous issued requests. + * @param ctx channel handler context. + * @param cmd response command instance. + */ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); @@ -196,7 +258,10 @@ public abstract class NettyRemotingAbstract { } } - //execute callback in callback executor. If callback executor is null, run directly in current thread + /** + * Execute callback in callback executor. If callback executor is null, run directly in current thread + * @param responseFuture + */ private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); @@ -229,10 +294,24 @@ public abstract class NettyRemotingAbstract { } } + /** + * Custom RPC hook. + * @return RPC hook if specified; null otherwise. + */ public abstract RPCHook getRPCHook(); - abstract public ExecutorService getCallbackExecutor(); - + /** + * This method specifies thread pool to use while invoking callback methods. + * @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the + * netty event-loop thread. + */ + public abstract ExecutorService getCallbackExecutor(); + + /** + * <p> + * This method is periodically invoked to scan and expire deprecated request. + * </p> + */ public void scanResponseTable() { final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>(); Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator(); @@ -386,7 +465,7 @@ public abstract class NettyRemotingAbstract { } } - class NettyEventExecuter extends ServiceThread { + class NettyEventExecutor extends ServiceThread { private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>(); private final int maxSize = 10000; @@ -436,7 +515,7 @@ public abstract class NettyRemotingAbstract { @Override public String getServiceName() { - return NettyEventExecuter.class.getSimpleName(); + return NettyEventExecutor.class.getSimpleName(); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 26088aa..52ca47e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -172,7 +172,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti }, 1000 * 3, 1000); if (this.channelEventListener != null) { - this.nettyEventExecuter.start(); + this.nettyEventExecutor.start(); } } @@ -189,8 +189,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this.eventLoopGroupWorker.shutdownGracefully(); - if (this.nettyEventExecuter != null) { - this.nettyEventExecuter.shutdown(); + if (this.nettyEventExecutor != null) { + this.nettyEventExecutor.shutdown(); } if (this.defaultEventExecutorGroup != null) { @@ -586,7 +586,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); - } } @@ -594,8 +593,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { - final String local = localAddress == null ? "UNKNOWN" : localAddress.toString(); - final String remote = remoteAddress == null ? "UNKNOWN" : remoteAddress.toString(); + final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress); + final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); super.connect(ctx, remoteAddress, localAddress, promise); @@ -613,7 +612,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti super.disconnect(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) { - NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @@ -625,7 +624,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti super.close(ctx, promise); if (NettyRemotingClient.this.channelEventListener != null) { - NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @@ -639,7 +638,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { NettyRemotingClient.this - .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } @@ -654,7 +653,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause); closeChannel(ctx.channel()); if (NettyRemotingClient.this.channelEventListener != null) { - NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 6a6df37..d8d9b65 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -160,7 +160,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnetManageHandler(), + new NettyConnectManageHandler(), new NettyServerHandler()); } }); @@ -178,7 +178,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } if (this.channelEventListener != null) { - this.nettyEventExecuter.start(); + this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @@ -205,8 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this.eventLoopGroupSelector.shutdownGracefully(); - if (this.nettyEventExecuter != null) { - this.nettyEventExecuter.shutdown(); + if (this.nettyEventExecutor != null) { + this.nettyEventExecutor.shutdown(); } if (this.defaultEventExecutorGroup != null) { @@ -297,7 +297,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } - class NettyConnetManageHandler extends ChannelDuplexHandler { + class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -319,7 +319,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti super.channelActive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel())); } } @@ -330,21 +330,21 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti super.channelInactive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent) evt; - if (evnet.state().equals(IdleState.ALL_IDLE)) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { NettyRemotingServer.this - .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel())); } } } @@ -359,7 +359,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel())); } RemotingUtil.closeChannel(ctx.channel());