Github user eolivelli commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/669#discussion_r226682998
--- Diff:
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
---
@@ -103,71 +108,95 @@
boolean isConnected() {
// Assuming that isConnected() is only used to initiate connection,
// not used by some other connection status judgement.
- return channel != null;
+ connectLock.lock();
+ try {
+ return connectFuture != null || channel != null;
+ } finally {
+ connectLock.unlock();
+ }
}
@Override
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
- ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
- bootstrap.setPipelineFactory(new
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
- bootstrap.setOption("soLinger", -1);
- bootstrap.setOption("tcpNoDelay", true);
-
- connectFuture = bootstrap.connect(addr);
- connectFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture)
throws Exception {
- // this lock guarantees that channel won't be assgined
after cleanup().
- connectLock.lock();
- try {
- if (!channelFuture.isSuccess() || connectFuture ==
null) {
- LOG.info("future isn't success, cause: {}",
channelFuture.getCause());
- return;
- }
- // setup channel, variables, connection, etc.
- channel = channelFuture.getChannel();
-
- disconnected.set(false);
- initialized = false;
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
-
- sendThread.primeConnection();
- updateNow();
- updateLastSendAndHeard();
-
- if (sendThread.tunnelAuthInProgress()) {
- waitSasl.drainPermits();
- needSasl.set(true);
- sendPrimePacket();
- } else {
- needSasl.set(false);
- }
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(Objects.requireNonNull(eventLoopGroup))
+ .channel(NioSocketChannel.class)
+ .handler(new ZKClientPipelineFactory(addr.getHostString(),
addr.getPort()))
+ .option(ChannelOption.SO_LINGER, -1)
+ .option(ChannelOption.TCP_NODELAY, true);
+ ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+ if (testAllocator != null) {
+ bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+ }
+ bootstrap.validate();
+
+ connectLock.lock();
+ try {
+ connectFuture = bootstrap.connect(addr);
+ connectFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture)
throws Exception {
+ // this lock guarantees that channel won't be assigned
after cleanup().
+ connectLock.lock();
+ try {
+ if (!channelFuture.isSuccess()) {
+ LOG.info("future isn't success, cause:",
channelFuture.cause());
+ return;
+ } else if (connectFuture == null) {
+ LOG.info("connect attempt cancelled");
+ // If the connect attempt was cancelled but
succeeded
+ // anyway, make sure to close the channel,
otherwise
+ // we may leak a file descriptor.
+ channelFuture.channel().close();
--- End diff --
Can this turn into an NPE? As channel() may return null.
---