Github user anmolnar commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/669#discussion_r233289109
--- Diff:
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
---
@@ -103,71 +105,102 @@
boolean isConnected() {
// Assuming that isConnected() is only used to initiate connection,
// not used by some other connection status judgement.
- return channel != null;
+ connectLock.lock();
+ try {
+ return channel != null || connectFuture != null;
+ } finally {
+ connectLock.unlock();
+ }
+ }
+
+ private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
+ ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+ if (testAllocator != null) {
+ return bootstrap.option(ChannelOption.ALLOCATOR,
testAllocator);
+ } else {
+ return bootstrap;
+ }
}
@Override
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
- ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
- bootstrap.setPipelineFactory(new
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
- bootstrap.setOption("soLinger", -1);
- bootstrap.setOption("tcpNoDelay", true);
-
- connectFuture = bootstrap.connect(addr);
- connectFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture)
throws Exception {
- // this lock guarantees that channel won't be assgined
after cleanup().
- connectLock.lock();
- try {
- if (!channelFuture.isSuccess() || connectFuture ==
null) {
- LOG.info("future isn't success, cause: {}",
channelFuture.getCause());
- return;
- }
- // setup channel, variables, connection, etc.
- channel = channelFuture.getChannel();
-
- disconnected.set(false);
- initialized = false;
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
-
- sendThread.primeConnection();
- updateNow();
- updateLastSendAndHeard();
-
- if (sendThread.tunnelAuthInProgress()) {
- waitSasl.drainPermits();
- needSasl.set(true);
- sendPrimePacket();
- } else {
- needSasl.set(false);
- }
+ Bootstrap bootstrap = new Bootstrap()
+ .group(eventLoopGroup)
+ .channel(NettyUtils.nioOrEpollSocketChannel())
+ .option(ChannelOption.SO_LINGER, -1)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new ZKClientPipelineFactory(addr.getHostString(),
addr.getPort()));
+ bootstrap = configureBootstrapAllocator(bootstrap);
+ bootstrap.validate();
- // we need to wake up on first connect to avoid
timeout.
- wakeupCnxn();
- firstConnect.countDown();
- LOG.info("channel is connected: {}",
channelFuture.getChannel());
- } finally {
- connectLock.unlock();
+ connectLock.lock();
+ try {
+ connectFuture = bootstrap.connect(addr);
+ connectFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture)
throws Exception {
+ // this lock guarantees that channel won't be assigned
after cleanup().
+ connectLock.lock();
+ try {
+ if (!channelFuture.isSuccess()) {
+ LOG.info("future isn't success, cause:",
channelFuture.cause());
+ return;
+ } else if (connectFuture == null) {
--- End diff --
How could `connectFuture` be null?
`connectFuture.addListener` call would have already thrown NPE in that case.
---