Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226757190 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); - ctx.setAttachment(cnxn); + ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { - SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); - ChannelFuture handshakeFuture = sslHandler.handshake(); + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + Future<Channel> handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { - allChannels.add(ctx.getChannel()); + allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnected " + e); - } - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.trace("Channel inactive {}", ctx.channel()); + allChannels.remove(ctx.channel()); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnect caused close " + e); - } + LOG.trace("Channel inactive caused close {}", cnxn); cnxn.close(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception - { - LOG.warn("Exception caught " + e, e.getCause()); - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("Exception caught", cause); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing " + cnxn); - } + LOG.debug("Closing {}", cnxn); cnxn.close(); } } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("message received called " + e.getMessage()); - } + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { - if (LOG.isDebugEnabled()) { - LOG.debug("New message " + e.toString() - + " from " + ctx.getChannel()); - } - NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); - synchronized(cnxn) { - processMessage(e, cnxn); + if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { + LOG.debug("Received AutoReadEvent.ENABLE"); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() + // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run + // after either of those. Check for null just to be safe ... + if (cnxn != null) { + cnxn.processQueuedBuffer(); + } + ctx.channel().config().setAutoRead(true); + } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { + LOG.debug("Received AutoReadEvent.DISABLE"); + ctx.channel().config().setAutoRead(false); } - } catch(Exception ex) { - LOG.error("Unexpected exception in receive", ex); - throw ex; + } finally { + ReferenceCountUtil.release(evt); } } - private void processMessage(MessageEvent e, NettyServerCnxn cnxn) { - if (LOG.isDebugEnabled()) { - LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: " - + cnxn.queuedBuffer); - } - - if (e instanceof NettyServerCnxn.ResumeMessageEvent) { - LOG.debug("Received ResumeMessageEvent"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("processing queue " - + Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } - } else { - LOG.debug("queue empty"); - } - cnxn.channel.setReadable(true); - } else { - ChannelBuffer buf = (ChannelBuffer)e.getMessage(); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " buf 0x" - + ChannelBuffers.hexDump(buf)); - } - - if (cnxn.throttled) { - LOG.debug("Received message while throttled"); - // we are throttled, so we need to queue - if (cnxn.queuedBuffer == null) { - LOG.debug("allocating queue"); - cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - } else { - LOG.debug("not throttled"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + LOG.trace("message received called {}", msg); --- End diff -- as above.
---