Github user ivmaykov commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r226757190
  
    --- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
    @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
     
                 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
                         zkServer, NettyServerCnxnFactory.this);
    -            ctx.setAttachment(cnxn);
    +            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
     
                 if (secure) {
    -                SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
    -                ChannelFuture handshakeFuture = sslHandler.handshake();
    +                SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
    +                Future<Channel> handshakeFuture = 
sslHandler.handshakeFuture();
                     handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
                 } else {
    -                allChannels.add(ctx.getChannel());
    +                allChannels.add(ctx.channel());
                     addCnxn(cnxn);
                 }
             }
     
             @Override
    -        public void channelDisconnected(ChannelHandlerContext ctx,
    -                ChannelStateEvent e) throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("Channel disconnected " + e);
    -            }
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
    +            LOG.trace("Channel inactive {}", ctx.channel());
    +            allChannels.remove(ctx.channel());
    +            NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace("Channel disconnect caused close " + e);
    -                }
    +                LOG.trace("Channel inactive caused close {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
    -            throws Exception
    -        {
    -            LOG.warn("Exception caught " + e, e.getCause());
    -            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
    +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
    +            LOG.warn("Exception caught", cause);
    +            NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
                 if (cnxn != null) {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("Closing " + cnxn);
    -                }
    +                LOG.debug("Closing {}", cnxn);
                     cnxn.close();
                 }
             }
     
             @Override
    -        public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
    -            throws Exception
    -        {
    -            if (LOG.isTraceEnabled()) {
    -                LOG.trace("message received called " + e.getMessage());
    -            }
    +        public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
                 try {
    -                if (LOG.isDebugEnabled()) {
    -                    LOG.debug("New message " + e.toString()
    -                            + " from " + ctx.getChannel());
    -                }
    -                NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
    -                synchronized(cnxn) {
    -                    processMessage(e, cnxn);
    +                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
    +                    LOG.debug("Received AutoReadEvent.ENABLE");
    +                    NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
    +                    // TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
    +                    // or exceptionCaught() trigger, but it's unclear to 
me if userEventTriggered() can run
    +                    // after either of those. Check for null just to be 
safe ...
    +                    if (cnxn != null) {
    +                        cnxn.processQueuedBuffer();
    +                    }
    +                    ctx.channel().config().setAutoRead(true);
    +                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
    +                    LOG.debug("Received AutoReadEvent.DISABLE");
    +                    ctx.channel().config().setAutoRead(false);
                     }
    -            } catch(Exception ex) {
    -                LOG.error("Unexpected exception in receive", ex);
    -                throw ex;
    +            } finally {
    +                ReferenceCountUtil.release(evt);
                 }
             }
     
    -        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
    -            if (LOG.isDebugEnabled()) {
    -                LOG.debug(Long.toHexString(cnxn.sessionId) + " 
queuedBuffer: "
    -                        + cnxn.queuedBuffer);
    -            }
    -
    -            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
    -                LOG.debug("Received ResumeMessageEvent");
    -                if (cnxn.queuedBuffer != null) {
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace("processing queue "
    -                                + Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                    cnxn.receiveMessage(cnxn.queuedBuffer);
    -                    if (!cnxn.queuedBuffer.readable()) {
    -                        LOG.debug("Processed queue - no bytes remaining");
    -                        cnxn.queuedBuffer = null;
    -                    } else {
    -                        LOG.debug("Processed queue - bytes remaining");
    -                    }
    -                } else {
    -                    LOG.debug("queue empty");
    -                }
    -                cnxn.channel.setReadable(true);
    -            } else {
    -                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
    -                if (LOG.isTraceEnabled()) {
    -                    LOG.trace(Long.toHexString(cnxn.sessionId)
    -                            + " buf 0x"
    -                            + ChannelBuffers.hexDump(buf));
    -                }
    -                
    -                if (cnxn.throttled) {
    -                    LOG.debug("Received message while throttled");
    -                    // we are throttled, so we need to queue
    -                    if (cnxn.queuedBuffer == null) {
    -                        LOG.debug("allocating queue");
    -                        cnxn.queuedBuffer = 
dynamicBuffer(buf.readableBytes());
    -                    }
    -                    cnxn.queuedBuffer.writeBytes(buf);
    -                    if (LOG.isTraceEnabled()) {
    -                        LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                + " queuedBuffer 0x"
    -                                + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                    }
    -                } else {
    -                    LOG.debug("not throttled");
    -                    if (cnxn.queuedBuffer != null) {
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -                        cnxn.queuedBuffer.writeBytes(buf);
    -                        if (LOG.isTraceEnabled()) {
    -                            LOG.trace(Long.toHexString(cnxn.sessionId)
    -                                    + " queuedBuffer 0x"
    -                                    + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
    -                        }
    -
    -                        cnxn.receiveMessage(cnxn.queuedBuffer);
    -                        if (!cnxn.queuedBuffer.readable()) {
    -                            LOG.debug("Processed queue - no bytes 
remaining");
    -                            cnxn.queuedBuffer = null;
    -                        } else {
    -                            LOG.debug("Processed queue - bytes remaining");
    -                        }
    +        @Override
    +        public void channelRead(ChannelHandlerContext ctx, Object msg) 
throws Exception {
    +            try {
    +                LOG.trace("message received called {}", msg);
    --- End diff --
    
    as above.


---

Reply via email to