anmolnar commented on a change in pull request #753: ZOOKEEPER-3204: Reconfig 
tests are constantly failing on 3.5 after applying Java 11 fix
URL: https://github.com/apache/zookeeper/pull/753#discussion_r253898138
 
 

 ##########
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ##########
 @@ -61,215 +26,184 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NettyServerCnxnFactory extends ServerCnxnFactory {
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyServerCnxnFactory.class);
 
-    ServerBootstrap bootstrap;
-    Channel parentChannel;
-    ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
-    HashMap<InetAddress, Set<NettyServerCnxn>> ipMap =
-        new HashMap<InetAddress, Set<NettyServerCnxn>>( );
-    InetSocketAddress localAddress;
-    int maxClientCnxns = 60;
-    ClientX509Util x509Util;
+    private final ServerBootstrap bootstrap;
+    private Channel parentChannel;
+    private final ChannelGroup allChannels =
+            new DefaultChannelGroup("zkServerCnxns", new 
DefaultEventExecutor());
+    // Access to ipMap or to any Set contained in the map needs to be
+    // protected with synchronized (ipMap) { ... }
+    private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new 
HashMap<>();
+    private InetSocketAddress localAddress;
+    private int maxClientCnxns = 60;
+    private final ClientX509Util x509Util;
+
+    private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
+            AttributeKey.valueOf("NettyServerCnxn");
+
+    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+            new AtomicReference<>(null);
 
     /**
-     * This is an inner class since we need to extend SimpleChannelHandler, but
+     * This is an inner class since we need to extend ChannelDuplexHandler, but
      * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it 
inner
      * this class gets access to the member variables and methods.
      */
     @Sharable
-    class CnxnChannelHandler extends SimpleChannelHandler {
+    class CnxnChannelHandler extends ChannelDuplexHandler {
 
         @Override
-        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent 
e)
-            throws Exception
-        {
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel closed " + e);
+                LOG.trace("Channel active {}", ctx.channel());
             }
-            allChannels.remove(ctx.getChannel());
-        }
 
-        @Override
-        public void channelConnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel connected " + e);
-            }
-
-            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
+            NettyServerCnxn cnxn = new NettyServerCnxn(ctx.channel(),
                     zkServer, NettyServerCnxnFactory.this);
-            ctx.setAttachment(cnxn);
+            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
             if (secure) {
-                SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-                ChannelFuture handshakeFuture = sslHandler.handshake();
+                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
             } else {
-                allChannels.add(ctx.getChannel());
+                allChannels.add(ctx.channel());
                 addCnxn(cnxn);
             }
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx,
-                ChannelStateEvent e) throws Exception
-        {
+        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Channel disconnected " + e);
+                LOG.trace("Channel inactive {}", ctx.channel());
             }
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+            allChannels.remove(ctx.channel());
+            NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace("Channel disconnect caused close " + e);
+                    LOG.trace("Channel inactive caused close {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
e)
-            throws Exception
-        {
-            LOG.warn("Exception caught " + e, e.getCause());
-            NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+            LOG.warn("Exception caught", cause);
+            NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
             if (cnxn != null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Closing " + cnxn);
+                    LOG.debug("Closing {}", cnxn);
                 }
                 cnxn.close();
             }
         }
 
         @Override
-        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-            throws Exception
-        {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("message received called " + e.getMessage());
-            }
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
             try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("New message " + e.toString()
-                            + " from " + ctx.getChannel());
-                }
-                NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
-                synchronized(cnxn) {
-                    processMessage(e, cnxn);
+                if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+                    LOG.debug("Received AutoReadEvent.ENABLE");
+                    NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    // TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
+                    // or exceptionCaught() trigger, but it's unclear to me if 
userEventTriggered() can run
+                    // after either of those. Check for null just to be safe 
...
+                    if (cnxn != null) {
+                        cnxn.processQueuedBuffer();
+                    }
+                    ctx.channel().config().setAutoRead(true);
+                } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+                    LOG.debug("Received AutoReadEvent.DISABLE");
+                    ctx.channel().config().setAutoRead(false);
                 }
-            } catch(Exception ex) {
-                LOG.error("Unexpected exception in receive", ex);
-                throw ex;
+            } finally {
+                ReferenceCountUtil.release(evt);
             }
         }
 
-        private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
-                        + cnxn.queuedBuffer);
-            }
-
-            if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
-                LOG.debug("Received ResumeMessageEvent");
-                if (cnxn.queuedBuffer != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("processing queue "
-                                + Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                    }
-                    cnxn.receiveMessage(cnxn.queuedBuffer);
-                    if (!cnxn.queuedBuffer.readable()) {
-                        LOG.debug("Processed queue - no bytes remaining");
-                        cnxn.queuedBuffer = null;
-                    } else {
-                        LOG.debug("Processed queue - bytes remaining");
-                    }
-                } else {
-                    LOG.debug("queue empty");
-                }
-                cnxn.channel.setReadable(true);
-            } else {
-                ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+            try {
                 if (LOG.isTraceEnabled()) {
-                    LOG.trace(Long.toHexString(cnxn.sessionId)
-                            + " buf 0x"
-                            + ChannelBuffers.hexDump(buf));
+                    LOG.trace("message received called {}", msg);
                 }
-                
-                if (cnxn.throttled) {
-                    LOG.debug("Received message while throttled");
-                    // we are throttled, so we need to queue
-                    if (cnxn.queuedBuffer == null) {
-                        LOG.debug("allocating queue");
-                        cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
-                    }
-                    cnxn.queuedBuffer.writeBytes(buf);
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace(Long.toHexString(cnxn.sessionId)
-                                + " queuedBuffer 0x"
-                                + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("New message {} from {}", msg, 
ctx.channel());
                     }
-                } else {
-                    LOG.debug("not throttled");
-                    if (cnxn.queuedBuffer != null) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-                        cnxn.queuedBuffer.writeBytes(buf);
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(Long.toHexString(cnxn.sessionId)
-                                    + " queuedBuffer 0x"
-                                    + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                        }
-
-                        cnxn.receiveMessage(cnxn.queuedBuffer);
-                        if (!cnxn.queuedBuffer.readable()) {
-                            LOG.debug("Processed queue - no bytes remaining");
-                            cnxn.queuedBuffer = null;
-                        } else {
-                            LOG.debug("Processed queue - bytes remaining");
-                        }
+                    NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+                    if (cnxn == null) {
+                        LOG.error("channelRead() on a closed or closing 
NettyServerCnxn");
                     } else {
-                        cnxn.receiveMessage(buf);
-                        if (buf.readable()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Before copy " + buf);
-                            }
-                            cnxn.queuedBuffer = 
dynamicBuffer(buf.readableBytes()); 
-                            cnxn.queuedBuffer.writeBytes(buf);
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Copy is " + cnxn.queuedBuffer);
-                                LOG.trace(Long.toHexString(cnxn.sessionId)
-                                        + " queuedBuffer 0x"
-                                        + 
ChannelBuffers.hexDump(cnxn.queuedBuffer));
-                            }
-                        }
+                        cnxn.processMessage((ByteBuf) msg);
                     }
+                } catch (Exception ex) {
+                    LOG.error("Unexpected exception in receive", ex);
+                    throw ex;
                 }
+            } finally {
+                ReferenceCountUtil.release(msg);
             }
         }
 
         @Override
-        public void writeComplete(ChannelHandlerContext ctx,
-                WriteCompletionEvent e) throws Exception
-        {
+        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("write complete " + e);
+                promise.addListener((future) -> {
+                    LOG.trace("write {}",
+                            future.isSuccess() ? "complete" : "failed");
+                });
             }
+            super.write(ctx, msg, promise);
         }
 
-        private final class CertificateVerifier
-                implements ChannelFutureListener {
+        private final class CertificateVerifier implements 
GenericFutureListener<Future<Channel>> {
 
 Review comment:
   Big thanks for that! Looking forward to committing it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to