Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226682998 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +108,95 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. - return channel != null; + connectLock.lock(); + try { + return connectFuture != null || channel != null; + } finally { + connectLock.unlock(); + } } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - - bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); - bootstrap.setOption("soLinger", -1); - bootstrap.setOption("tcpNoDelay", true); - - connectFuture = bootstrap.connect(addr); - connectFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - // this lock guarantees that channel won't be assgined after cleanup(). - connectLock.lock(); - try { - if (!channelFuture.isSuccess() || connectFuture == null) { - LOG.info("future isn't success, cause: {}", channelFuture.getCause()); - return; - } - // setup channel, variables, connection, etc. - channel = channelFuture.getChannel(); - - disconnected.set(false); - initialized = false; - lenBuffer.clear(); - incomingBuffer = lenBuffer; - - sendThread.primeConnection(); - updateNow(); - updateLastSendAndHeard(); - - if (sendThread.tunnelAuthInProgress()) { - waitSasl.drainPermits(); - needSasl.set(true); - sendPrimePacket(); - } else { - needSasl.set(false); - } + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(Objects.requireNonNull(eventLoopGroup)) + .channel(NioSocketChannel.class) + .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())) + .option(ChannelOption.SO_LINGER, -1) + .option(ChannelOption.TCP_NODELAY, true); + ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); + if (testAllocator != null) { + bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); + } + bootstrap.validate(); + + connectLock.lock(); + try { + connectFuture = bootstrap.connect(addr); + connectFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + // this lock guarantees that channel won't be assigned after cleanup(). + connectLock.lock(); + try { + if (!channelFuture.isSuccess()) { + LOG.info("future isn't success, cause:", channelFuture.cause()); + return; + } else if (connectFuture == null) { + LOG.info("connect attempt cancelled"); + // If the connect attempt was cancelled but succeeded + // anyway, make sure to close the channel, otherwise + // we may leak a file descriptor. + channelFuture.channel().close(); --- End diff -- Can this turn into an NPE? As channel() may return null.
---