[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user asfgit closed the pull request at: https://github.com/apache/zookeeper/pull/669 ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
GitHub user ivmaykov reopened a pull request: https://github.com/apache/zookeeper/pull/669 ZOOKEEPER-3152: Port ZK netty stack to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. FB Reviewers: nixon You can merge this pull request into a Git repository by running: $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/zookeeper/pull/669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #669 commit 4fb8eb6ebe69a4f9a0852624d652d0893d6ba625 Author: Ilya Maykov Date: 2018-08-31T23:26:55Z port ZK netty stack from netty3 to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. Reviewers: nixon, nwolchko, nedelchev Subscribers: Differential Revision: https://phabricator.intern.facebook.com/D9646262 Tasks: Tags: Blame Revision: ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov closed the pull request at: https://github.com/apache/zookeeper/pull/669 ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233664524 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- yep! ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233663529 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- Ah, right. I think I misunderstood. So the code is good as-is, yes? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233663197 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- That's why I'm saying keep the removing of the connection attribute in both methods. :) ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233661987 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- It will not be called twice, since removing the connection attribute means the second time we will get null and there is a null check in both places. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233659698 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- If `cnxn.close();` triggers channelInactive event too, you need to remove it, otherwise `close()` will be called twice. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233659921 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future) if (KeeperException.Code.OK != authProvider.handleAuthentication(cnxn, null)) { LOG.error("Authentication failed for session 0x{}", -Long.toHexString(cnxn.sessionId)); +Long.toHexString(cnxn.getSessionId())); cnxn.close(); return; } -allChannels.add(future.getChannel()); +final Channel futureChannel = future.getNow(); --- End diff -- It's fine. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233651445 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future) if (KeeperException.Code.OK != authProvider.handleAuthentication(cnxn, null)) { LOG.error("Authentication failed for session 0x{}", -Long.toHexString(cnxn.sessionId)); +Long.toHexString(cnxn.getSessionId())); cnxn.close(); return; } -allChannels.add(future.getChannel()); +final Channel futureChannel = future.getNow(); --- End diff -- I think they are equivalent here since we know the future is completed (we are inside a `if (future.isSuccess())` block). I can change it to `get()` if you prefer. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233651730 --- Diff: zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + * + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { +private static AtomicReference INSTANCE = +new AtomicReference<>(null); + +/** + * Get the singleton testing allocator. + * @return the singleton allocator, creating it if one does not exist. + */ +public static TestByteBufAllocator getInstance() { +TestByteBufAllocator result = INSTANCE.get(); +if (result == null) { +ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel(); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); +INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel)); +result = INSTANCE.get(); +} +return result; +} + +/** + * Destroys the singleton testing allocator and throws an error if any of the + * buffers allocated by it have been leaked. Attempts to print leak details to + * standard error before throwing, by using netty's built-in leak tracking. + * Note that this might not always work, since it only triggers when a buffer + * is garbage-collected and calling System.gc() does not guarantee that a buffer + * will actually be GC'ed. + * + * This should be called at the end of a unit test's tearDown() method. + */ +public static void checkForLeaks() { +TestByteBufAllocator result = INSTANCE.getAndSet(null); +if (result != null) { +result.checkInstanceForLeaks(); +} +} + +private final List trackedBuffers = new ArrayList<>(); +private final ResourceLeakDetector.Level oldLevel; + +private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel) +{ +super(false); +this.oldLevel = oldLevel; +} + +@Override +protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newHeapBuffer(initialCapacity, maxCapacity)); +} + +@Override +protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newDirectBuffer(initialCapacity, maxCapacity)); +} + +@Override +public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) +{ +return track(super.compositeHeapBuffer(maxNumComponents)); +} + +
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233650367 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- I do remove it in both places, by calling `ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null)`. The getAndSet(null) will atomically return the old value of the attribute and set the new value to null. Now that I think about it, I'm not sure if we need to remove the attribute in `exceptionCaught()` ... we can probably leave it and let `channelInactive()` take care of removing the attribute. Let me know if you want me to make this change, I think it probably doesn't matter too much either way. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233652712 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +105,102 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return channel != null || connectFuture != null; --- End diff -- As the comment above says, the `isConnected()` method is only used in the main loop inside `ClientCnxn$SendThread.run()` to see if a new connection should be initiated. So, this method should return false if a connection attempt is already in progress. This is the case when `connectFuture` is not null. Arguably the method should be called `isConnectedOrConnecting()` but I didn't want to go around refactoring APIs in this diff - can do it if you like. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233653584 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +105,102 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return channel != null || connectFuture != null; +} finally { +connectLock.unlock(); +} +} + +private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { +ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); +if (testAllocator != null) { +return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); +} else { +return bootstrap; +} } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); -ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - -bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); -bootstrap.setOption("soLinger", -1); -bootstrap.setOption("tcpNoDelay", true); - -connectFuture = bootstrap.connect(addr); -connectFuture.addListener(new ChannelFutureListener() { -@Override -public void operationComplete(ChannelFuture channelFuture) throws Exception { -// this lock guarantees that channel won't be assgined after cleanup(). -connectLock.lock(); -try { -if (!channelFuture.isSuccess() || connectFuture == null) { -LOG.info("future isn't success, cause: {}", channelFuture.getCause()); -return; -} -// setup channel, variables, connection, etc. -channel = channelFuture.getChannel(); - -disconnected.set(false); -initialized = false; -lenBuffer.clear(); -incomingBuffer = lenBuffer; - -sendThread.primeConnection(); -updateNow(); -updateLastSendAndHeard(); - -if (sendThread.tunnelAuthInProgress()) { -waitSasl.drainPermits(); -needSasl.set(true); -sendPrimePacket(); -} else { -needSasl.set(false); -} +Bootstrap bootstrap = new Bootstrap() +.group(eventLoopGroup) +.channel(NettyUtils.nioOrEpollSocketChannel()) +.option(ChannelOption.SO_LINGER, -1) +.option(ChannelOption.TCP_NODELAY, true) +.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); +bootstrap = configureBootstrapAllocator(bootstrap); +bootstrap.validate(); -// we need to wake up on first connect to avoid timeout. -wakeupCnxn(); -firstConnect.countDown(); -LOG.info("channel is connected: {}", channelFuture.getChannel()); -} finally { -connectLock.unlock(); +connectLock.lock(); +try { +connectFuture = bootstrap.connect(addr); +connectFuture.addListener(new ChannelFutureListener() { +@Override +public void operationComplete(ChannelFuture channelFuture) throws Exception { +// this lock guarantees that channel won't be assigned after cleanup(). +connectLock.lock(); +try { +if (!channelFuture.isSuccess()) { +LOG.info("future isn't success, cause:", channelFuture.cause()); +return; +} else if (connectFuture == null) { --- End diff -- As the comment below says, there could be a race if the connect attempt was cancelled right around the time the listener callback fired. `cleanup()` below will cancel an in-progress connection attempt and clear the `connectFuture` variable. If this happens, `connectFuture` will be null here. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233649908 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { --- End diff -- We call `cnxn.close()` at the end of `exceptionCaught()`, which will end up closing the channel so `channelInactive()` will get called, so I think it would be redundant to remove from `allChannels` here. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233289109 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +105,102 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return channel != null || connectFuture != null; +} finally { +connectLock.unlock(); +} +} + +private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { +ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); +if (testAllocator != null) { +return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); +} else { +return bootstrap; +} } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); -ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - -bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); -bootstrap.setOption("soLinger", -1); -bootstrap.setOption("tcpNoDelay", true); - -connectFuture = bootstrap.connect(addr); -connectFuture.addListener(new ChannelFutureListener() { -@Override -public void operationComplete(ChannelFuture channelFuture) throws Exception { -// this lock guarantees that channel won't be assgined after cleanup(). -connectLock.lock(); -try { -if (!channelFuture.isSuccess() || connectFuture == null) { -LOG.info("future isn't success, cause: {}", channelFuture.getCause()); -return; -} -// setup channel, variables, connection, etc. -channel = channelFuture.getChannel(); - -disconnected.set(false); -initialized = false; -lenBuffer.clear(); -incomingBuffer = lenBuffer; - -sendThread.primeConnection(); -updateNow(); -updateLastSendAndHeard(); - -if (sendThread.tunnelAuthInProgress()) { -waitSasl.drainPermits(); -needSasl.set(true); -sendPrimePacket(); -} else { -needSasl.set(false); -} +Bootstrap bootstrap = new Bootstrap() +.group(eventLoopGroup) +.channel(NettyUtils.nioOrEpollSocketChannel()) +.option(ChannelOption.SO_LINGER, -1) +.option(ChannelOption.TCP_NODELAY, true) +.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); +bootstrap = configureBootstrapAllocator(bootstrap); +bootstrap.validate(); -// we need to wake up on first connect to avoid timeout. -wakeupCnxn(); -firstConnect.countDown(); -LOG.info("channel is connected: {}", channelFuture.getChannel()); -} finally { -connectLock.unlock(); +connectLock.lock(); +try { +connectFuture = bootstrap.connect(addr); +connectFuture.addListener(new ChannelFutureListener() { +@Override +public void operationComplete(ChannelFuture channelFuture) throws Exception { +// this lock guarantees that channel won't be assigned after cleanup(). +connectLock.lock(); +try { +if (!channelFuture.isSuccess()) { +LOG.info("future isn't success, cause:", channelFuture.cause()); +return; +} else if (connectFuture == null) { --- End diff -- How could `connectFuture` be null? `connectFuture.addListener` call would have already thrown NPE in that case. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233288683 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +105,102 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return channel != null || connectFuture != null; --- End diff -- Why would you like to check `connectFuture` too? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233279457 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); +LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() --- End diff -- Do you need to remove `cnxn` from the channel in the mentioned two events? Null check wouldn't do any harm though. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233282137 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future) if (KeeperException.Code.OK != authProvider.handleAuthentication(cnxn, null)) { LOG.error("Authentication failed for session 0x{}", -Long.toHexString(cnxn.sessionId)); +Long.toHexString(cnxn.getSessionId())); cnxn.close(); return; } -allChannels.add(future.getChannel()); +final Channel futureChannel = future.getNow(); --- End diff -- I think `get()` would be enough, but the check is harmful anyway. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233278670 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ +public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); +LOG.trace("Channel inactive {}", ctx.channel()); } -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); +LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { --- End diff -- You remove `ctx.channel()` from `allChannels` in the Inactive method. Which was actually not the case in the original impl, but I think it makes perfect sense. Don't you wanna do the same in here? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233286512 --- Diff: zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + * + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { +private static AtomicReference INSTANCE = +new AtomicReference<>(null); + +/** + * Get the singleton testing allocator. + * @return the singleton allocator, creating it if one does not exist. + */ +public static TestByteBufAllocator getInstance() { +TestByteBufAllocator result = INSTANCE.get(); +if (result == null) { +ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel(); + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); +INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel)); +result = INSTANCE.get(); +} +return result; +} + +/** + * Destroys the singleton testing allocator and throws an error if any of the + * buffers allocated by it have been leaked. Attempts to print leak details to + * standard error before throwing, by using netty's built-in leak tracking. + * Note that this might not always work, since it only triggers when a buffer + * is garbage-collected and calling System.gc() does not guarantee that a buffer + * will actually be GC'ed. + * + * This should be called at the end of a unit test's tearDown() method. + */ +public static void checkForLeaks() { +TestByteBufAllocator result = INSTANCE.getAndSet(null); +if (result != null) { +result.checkInstanceForLeaks(); +} +} + +private final List trackedBuffers = new ArrayList<>(); +private final ResourceLeakDetector.Level oldLevel; + +private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel) +{ +super(false); +this.oldLevel = oldLevel; +} + +@Override +protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newHeapBuffer(initialCapacity, maxCapacity)); +} + +@Override +protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newDirectBuffer(initialCapacity, maxCapacity)); +} + +@Override +public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) +{ +return track(super.compositeHeapBuffer(maxNumComponents)); +} + +
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
GitHub user ivmaykov reopened a pull request: https://github.com/apache/zookeeper/pull/669 ZOOKEEPER-3152: Port ZK netty stack to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. FB Reviewers: nixon You can merge this pull request into a Git repository by running: $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/zookeeper/pull/669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #669 commit 94c4516bea9b46f5428ef29ff51490f9647eaac3 Author: Ilya Maykov Date: 2018-08-31T23:26:55Z port ZK netty stack from netty3 to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. Reviewers: nixon, nwolchko, nedelchev Subscribers: Differential Revision: https://phabricator.intern.facebook.com/D9646262 Tasks: Tags: Blame Revision: ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov closed the pull request at: https://github.com/apache/zookeeper/pull/669 ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226849723 --- Diff: ivy.xml --- @@ -59,9 +59,11 @@ - - - + --- End diff -- I forgot about this. I think it is better to use netty-all because it bundles all of the native artifacts. In Bookkeeper for instance we have netty -all, but this is common practice in many other projects. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226845944 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); --- End diff -- Do you want a similar check for LOG.debug() calls as well, or only LOG.trace()? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226839154 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -267,7 +298,7 @@ private void sendPkt(Packet p) { p.createBB(); updateLastSend(); sentCount++; -channel.write(ChannelBuffers.wrappedBuffer(p.bb)); +channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb)); --- End diff -- Essentially you will save allocations if you don't need listeners ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226839114 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); --- End diff -- It is better to add the 'if' because in general you will skip calling the logger method, with all what is comes with it: evaluating expressions for parameters, passing parameters for the method call, and calling the method. You will trade a single cheap method call with a potential expense of resources and useless allocations. IMHO it is better to have this pattern consistently in the whole code ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226838972 --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { --- End diff -- Okay, very good ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226836043 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx, } } wakeupCnxn(); +// Note: SimpleChannelInboundHandler releases the ByteBuf for us +// so we don't need to do it. } @Override -public void exceptionCaught(ChannelHandlerContext ctx, -ExceptionEvent e) throws Exception { -LOG.warn("Exception caught: {}", e, e.getCause()); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { +LOG.warn("Exception caught", cause); cleanup(); } } + +/** + * Sets the test ByteBufAllocator. This allocator will be used by all + * future instances of this class. + * It is not recommended to use this method outside of testing. + * @param allocator the ByteBufAllocator to use for all netty buffer + * allocations. + */ +public static void setTestAllocator(ByteBufAllocator allocator) { +TEST_ALLOCATOR.set(allocator); --- End diff -- Yes in my case this is a hole because the client code runs in a potentially unsecure JVM when user code can call public methods ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226835980 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -267,7 +298,7 @@ private void sendPkt(Packet p) { p.createBB(); updateLastSend(); sentCount++; -channel.write(ChannelBuffers.wrappedBuffer(p.bb)); +channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb)); --- End diff -- See these very interesting slides http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#8.0 ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226835913 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +108,95 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return connectFuture != null || channel != null; +} finally { +connectLock.unlock(); +} } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); -ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - -bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); -bootstrap.setOption("soLinger", -1); -bootstrap.setOption("tcpNoDelay", true); - -connectFuture = bootstrap.connect(addr); -connectFuture.addListener(new ChannelFutureListener() { -@Override -public void operationComplete(ChannelFuture channelFuture) throws Exception { -// this lock guarantees that channel won't be assgined after cleanup(). -connectLock.lock(); -try { -if (!channelFuture.isSuccess() || connectFuture == null) { -LOG.info("future isn't success, cause: {}", channelFuture.getCause()); -return; -} -// setup channel, variables, connection, etc. -channel = channelFuture.getChannel(); - -disconnected.set(false); -initialized = false; -lenBuffer.clear(); -incomingBuffer = lenBuffer; - -sendThread.primeConnection(); -updateNow(); -updateLastSendAndHeard(); - -if (sendThread.tunnelAuthInProgress()) { -waitSasl.drainPermits(); -needSasl.set(true); -sendPrimePacket(); -} else { -needSasl.set(false); -} +Bootstrap bootstrap = new Bootstrap(); +bootstrap.group(Objects.requireNonNull(eventLoopGroup)) +.channel(NioSocketChannel.class) +.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())) +.option(ChannelOption.SO_LINGER, -1) +.option(ChannelOption.TCP_NODELAY, true); +ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); +if (testAllocator != null) { +bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); +} +bootstrap.validate(); + +connectLock.lock(); +try { +connectFuture = bootstrap.connect(addr); +connectFuture.addListener(new ChannelFutureListener() { +@Override +public void operationComplete(ChannelFuture channelFuture) throws Exception { +// this lock guarantees that channel won't be assigned after cleanup(). +connectLock.lock(); +try { +if (!channelFuture.isSuccess()) { +LOG.info("future isn't success, cause:", channelFuture.cause()); +return; +} else if (connectFuture == null) { +LOG.info("connect attempt cancelled"); +// If the connect attempt was cancelled but succeeded +// anyway, make sure to close the channel, otherwise +// we may leak a file descriptor. +channelFuture.channel().close(); --- End diff -- That was my guess too. Let's keep it single. So okay to me ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226835891 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -68,18 +70,21 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class); -ChannelFactory channelFactory = new NioClientSocketChannelFactory( -Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); -Channel channel; -CountDownLatch firstConnect; -ChannelFuture connectFuture; -Lock connectLock = new ReentrantLock(); -AtomicBoolean disconnected = new AtomicBoolean(); -AtomicBoolean needSasl = new AtomicBoolean(); -Semaphore waitSasl = new Semaphore(0); +private final EventLoopGroup eventLoopGroup; +private Channel channel; +private CountDownLatch firstConnect; +private ChannelFuture connectFuture; +private final Lock connectLock = new ReentrantLock(); +private final AtomicBoolean disconnected = new AtomicBoolean(); +private final AtomicBoolean needSasl = new AtomicBoolean(); +private final Semaphore waitSasl = new Semaphore(0); + +private static final AtomicReference TEST_ALLOCATOR = +new AtomicReference<>(null); ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; +eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); --- End diff -- In almost of the project I know using Netty you are trying to use EPoll ig available and then fallback to Nio. I will be happy to create a JIRA and send a diff once we get this merged ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226757240 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future) CnxnChannelHandler channelHandler = new CnxnChannelHandler(); NettyServerCnxnFactory() { -bootstrap = new ServerBootstrap( -new NioServerSocketChannelFactory( -Executors.newCachedThreadPool(), -Executors.newCachedThreadPool())); -// parent channel -bootstrap.setOption("reuseAddress", true); -// child channels -bootstrap.setOption("child.tcpNoDelay", true); -/* set socket linger to off, so that socket close does not block */ -bootstrap.setOption("child.soLinger", -1); -bootstrap.setPipelineFactory(new ChannelPipelineFactory() { -@Override -public ChannelPipeline getPipeline() throws Exception { -ChannelPipeline p = Channels.pipeline(); -if (secure) { -initSSL(p); -} -p.addLast("servercnxnfactory", channelHandler); - -return p; -} -}); x509Util = new ClientX509Util(); + +EventLoopGroup bossGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); --- End diff -- See comment above about making epoll optional based on a config option. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226757190 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); -} +LOG.trace("Channel inactive caused close {}", cnxn); cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { -if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); -} +LOG.debug("Closing {}", cnxn); cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() +// or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run +// after either of those. Check for null just to be safe ... +if (cnxn != null) { +cnxn.processQueuedBuffer(); +} +ctx.channel().config().setAutoRead(true); +} else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { +LOG.debug("Received AutoReadEvent.DISABLE"); +ctx.channel().config().setAutoRead(false); } -} catch(Exception ex) { -LOG.error("Unexpected exception in
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226757057 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); --- End diff -- LOG.trace() does an isTraceEnabled check internally. If the additional parameters passed to the log method don't do any work (such as converting the contents of a buffer to a hex string), then the enclosing isTraceEnabled check is redundant. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226756739 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java --- @@ -200,24 +186,13 @@ public void setSessionId(long sessionId) { this.sessionId = sessionId; } -@Override -public void enableRecv() { -if (throttled) { -throttled = false; -if (LOG.isDebugEnabled()) { -LOG.debug("Sending unthrottle event " + this); -} -channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel)); -} -} - @Override public void sendBuffer(ByteBuffer sendBuffer) { if (sendBuffer == ServerCnxnFactory.closeConn) { close(); return; } -channel.write(wrappedBuffer(sendBuffer)); +channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)); --- End diff -- As above, I'm not sure what that provides. I am still learning about netty so please excuse my ignorance :) ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226756353 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx, } } wakeupCnxn(); +// Note: SimpleChannelInboundHandler releases the ByteBuf for us +// so we don't need to do it. } @Override -public void exceptionCaught(ChannelHandlerContext ctx, -ExceptionEvent e) throws Exception { -LOG.warn("Exception caught: {}", e, e.getCause()); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { +LOG.warn("Exception caught", cause); cleanup(); } } + +/** + * Sets the test ByteBufAllocator. This allocator will be used by all + * future instances of this class. + * It is not recommended to use this method outside of testing. + * @param allocator the ByteBufAllocator to use for all netty buffer + * allocations. + */ +public static void setTestAllocator(ByteBufAllocator allocator) { +TEST_ALLOCATOR.set(allocator); --- End diff -- Sure, but that would only affect that client and would have no effect on the server. All clients are untrusted by default since they run on a computer you don't own :) I can look into using a mocking framework instead if you feel strongly about it. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226756565 --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { --- End diff -- Paranoid leak detection will just print details about leaks, but the test will still pass and it takes a human to examine the test's stderr output to see that there was a problem. Using this allocator will make the test fail if there are buffer leaks. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226756052 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -267,7 +298,7 @@ private void sendPkt(Packet p) { p.createBB(); updateLastSend(); sentCount++; -channel.write(ChannelBuffers.wrappedBuffer(p.bb)); +channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb)); --- End diff -- Can you explain what the purpose of that would be? According to the documentation, voidPromise() returns a promise that will never be notified of success or failure. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226755668 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -184,7 +213,9 @@ void cleanup() { @Override void close() { -channelFactory.releaseExternalResources(); +if (!eventLoopGroup.isShuttingDown()) { --- End diff -- I'm not sure if calling `shutdownGracefully` more than once is allowed, which is why I added the check. It might not be necessary. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226755419 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +108,95 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return connectFuture != null || channel != null; +} finally { +connectLock.unlock(); +} } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); -ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - -bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); -bootstrap.setOption("soLinger", -1); -bootstrap.setOption("tcpNoDelay", true); - -connectFuture = bootstrap.connect(addr); -connectFuture.addListener(new ChannelFutureListener() { -@Override -public void operationComplete(ChannelFuture channelFuture) throws Exception { -// this lock guarantees that channel won't be assgined after cleanup(). -connectLock.lock(); -try { -if (!channelFuture.isSuccess() || connectFuture == null) { -LOG.info("future isn't success, cause: {}", channelFuture.getCause()); -return; -} -// setup channel, variables, connection, etc. -channel = channelFuture.getChannel(); - -disconnected.set(false); -initialized = false; -lenBuffer.clear(); -incomingBuffer = lenBuffer; - -sendThread.primeConnection(); -updateNow(); -updateLastSendAndHeard(); - -if (sendThread.tunnelAuthInProgress()) { -waitSasl.drainPermits(); -needSasl.set(true); -sendPrimePacket(); -} else { -needSasl.set(false); -} +Bootstrap bootstrap = new Bootstrap(); +bootstrap.group(Objects.requireNonNull(eventLoopGroup)) +.channel(NioSocketChannel.class) +.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())) +.option(ChannelOption.SO_LINGER, -1) +.option(ChannelOption.TCP_NODELAY, true); +ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); +if (testAllocator != null) { +bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); +} +bootstrap.validate(); + +connectLock.lock(); +try { +connectFuture = bootstrap.connect(addr); +connectFuture.addListener(new ChannelFutureListener() { +@Override +public void operationComplete(ChannelFuture channelFuture) throws Exception { +// this lock guarantees that channel won't be assigned after cleanup(). +connectLock.lock(); +try { +if (!channelFuture.isSuccess()) { +LOG.info("future isn't success, cause:", channelFuture.cause()); +return; +} else if (connectFuture == null) { +LOG.info("connect attempt cancelled"); +// If the connect attempt was cancelled but succeeded +// anyway, make sure to close the channel, otherwise +// we may leak a file descriptor. +channelFuture.channel().close(); --- End diff -- I don't think so, since this code can only trigger if the connect future is successful. If the future is not successful, the previous if branch will be taken. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user ivmaykov commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226755285 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -68,18 +70,21 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class); -ChannelFactory channelFactory = new NioClientSocketChannelFactory( -Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); -Channel channel; -CountDownLatch firstConnect; -ChannelFuture connectFuture; -Lock connectLock = new ReentrantLock(); -AtomicBoolean disconnected = new AtomicBoolean(); -AtomicBoolean needSasl = new AtomicBoolean(); -Semaphore waitSasl = new Semaphore(0); +private final EventLoopGroup eventLoopGroup; +private Channel channel; +private CountDownLatch firstConnect; +private ChannelFuture connectFuture; +private final Lock connectLock = new ReentrantLock(); +private final AtomicBoolean disconnected = new AtomicBoolean(); +private final AtomicBoolean needSasl = new AtomicBoolean(); +private final Semaphore waitSasl = new Semaphore(0); + +private static final AtomicReference TEST_ALLOCATOR = +new AtomicReference<>(null); ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; +eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); --- End diff -- I'd like to do it in a follow-up diff. I was thinking we default to NIO (since it works on all OSes), and have a config option to use Epoll instead. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226687650 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java --- @@ -200,24 +186,13 @@ public void setSessionId(long sessionId) { this.sessionId = sessionId; } -@Override -public void enableRecv() { -if (throttled) { -throttled = false; -if (LOG.isDebugEnabled()) { -LOG.debug("Sending unthrottle event " + this); -} -channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel)); -} -} - @Override public void sendBuffer(ByteBuffer sendBuffer) { if (sendBuffer == ServerCnxnFactory.closeConn) { close(); return; } -channel.write(wrappedBuffer(sendBuffer)); +channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)); --- End diff -- Consider using voidPromise() ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226684843 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx, } } wakeupCnxn(); +// Note: SimpleChannelInboundHandler releases the ByteBuf for us +// so we don't need to do it. } @Override -public void exceptionCaught(ChannelHandlerContext ctx, -ExceptionEvent e) throws Exception { -LOG.warn("Exception caught: {}", e, e.getCause()); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { +LOG.warn("Exception caught", cause); cleanup(); } } + +/** + * Sets the test ByteBufAllocator. This allocator will be used by all + * future instances of this class. + * It is not recommended to use this method outside of testing. + * @param allocator the ByteBufAllocator to use for all netty buffer + * allocations. + */ +public static void setTestAllocator(ByteBufAllocator allocator) { +TEST_ALLOCATOR.set(allocator); --- End diff -- I think this is a security hole. We are in the client, so untrusted code may call this public method. We should use mockito/powermock for this stuff. Is there are another way? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226685450 --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { --- End diff -- This is interesting Netty has already built in support for this kind of stuff.I see that this class is smarter. Isn't running test with paranoid leak detection enough? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226690017 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); +allChannels.remove(ctx.channel()); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnect caused close " + e); -} +LOG.trace("Channel inactive caused close {}", cnxn); cnxn.close(); } } @Override -public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) -throws Exception -{ -LOG.warn("Exception caught " + e, e.getCause()); -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +LOG.warn("Exception caught", cause); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { -if (LOG.isDebugEnabled()) { -LOG.debug("Closing " + cnxn); -} +LOG.debug("Closing {}", cnxn); cnxn.close(); } } @Override -public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) -throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("message received called " + e.getMessage()); -} +public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { -if (LOG.isDebugEnabled()) { -LOG.debug("New message " + e.toString() -+ " from " + ctx.getChannel()); -} -NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); -synchronized(cnxn) { -processMessage(e, cnxn); +if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { +LOG.debug("Received AutoReadEvent.ENABLE"); +NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); +// TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() +// or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run +// after either of those. Check for null just to be safe ... +if (cnxn != null) { +cnxn.processQueuedBuffer(); +} +ctx.channel().config().setAutoRead(true); +} else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { +LOG.debug("Received AutoReadEvent.DISABLE"); +ctx.channel().config().setAutoRead(false); } -} catch(Exception ex) { -LOG.error("Unexpected exception
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226684209 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -267,7 +298,7 @@ private void sendPkt(Packet p) { p.createBB(); updateLastSend(); sentCount++; -channel.write(ChannelBuffers.wrappedBuffer(p.bb)); +channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb)); --- End diff -- What about adding ', channel.voidPromise()' ? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226683922 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -184,7 +213,9 @@ void cleanup() { @Override void close() { -channelFactory.releaseExternalResources(); +if (!eventLoopGroup.isShuttingDown()) { --- End diff -- Is this really needed? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226681741 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -68,18 +70,21 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class); -ChannelFactory channelFactory = new NioClientSocketChannelFactory( -Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); -Channel channel; -CountDownLatch firstConnect; -ChannelFuture connectFuture; -Lock connectLock = new ReentrantLock(); -AtomicBoolean disconnected = new AtomicBoolean(); -AtomicBoolean needSasl = new AtomicBoolean(); -Semaphore waitSasl = new Semaphore(0); +private final EventLoopGroup eventLoopGroup; +private Channel channel; +private CountDownLatch firstConnect; +private ChannelFuture connectFuture; +private final Lock connectLock = new ReentrantLock(); +private final AtomicBoolean disconnected = new AtomicBoolean(); +private final AtomicBoolean needSasl = new AtomicBoolean(); +private final Semaphore waitSasl = new Semaphore(0); + +private static final AtomicReference TEST_ALLOCATOR = +new AtomicReference<>(null); ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; +eventLoopGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); --- End diff -- Let's move to Epoll. It can be a followup change (I can send of you don't have already it on your stack of changes) ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226690548 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future) CnxnChannelHandler channelHandler = new CnxnChannelHandler(); NettyServerCnxnFactory() { -bootstrap = new ServerBootstrap( -new NioServerSocketChannelFactory( -Executors.newCachedThreadPool(), -Executors.newCachedThreadPool())); -// parent channel -bootstrap.setOption("reuseAddress", true); -// child channels -bootstrap.setOption("child.tcpNoDelay", true); -/* set socket linger to off, so that socket close does not block */ -bootstrap.setOption("child.soLinger", -1); -bootstrap.setPipelineFactory(new ChannelPipelineFactory() { -@Override -public ChannelPipeline getPipeline() throws Exception { -ChannelPipeline p = Channels.pipeline(); -if (secure) { -initSSL(p); -} -p.addLast("servercnxnfactory", channelHandler); - -return p; -} -}); x509Util = new ClientX509Util(); + +EventLoopGroup bossGroup = new NioEventLoopGroup(0, Executors.newCachedThreadPool()); --- End diff -- Consider EPoll ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226682998 --- Diff: zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +108,95 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. -return channel != null; +connectLock.lock(); +try { +return connectFuture != null || channel != null; +} finally { +connectLock.unlock(); +} } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); -ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - -bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); -bootstrap.setOption("soLinger", -1); -bootstrap.setOption("tcpNoDelay", true); - -connectFuture = bootstrap.connect(addr); -connectFuture.addListener(new ChannelFutureListener() { -@Override -public void operationComplete(ChannelFuture channelFuture) throws Exception { -// this lock guarantees that channel won't be assgined after cleanup(). -connectLock.lock(); -try { -if (!channelFuture.isSuccess() || connectFuture == null) { -LOG.info("future isn't success, cause: {}", channelFuture.getCause()); -return; -} -// setup channel, variables, connection, etc. -channel = channelFuture.getChannel(); - -disconnected.set(false); -initialized = false; -lenBuffer.clear(); -incomingBuffer = lenBuffer; - -sendThread.primeConnection(); -updateNow(); -updateLastSendAndHeard(); - -if (sendThread.tunnelAuthInProgress()) { -waitSasl.drainPermits(); -needSasl.set(true); -sendPrimePacket(); -} else { -needSasl.set(false); -} +Bootstrap bootstrap = new Bootstrap(); +bootstrap.group(Objects.requireNonNull(eventLoopGroup)) +.channel(NioSocketChannel.class) +.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())) +.option(ChannelOption.SO_LINGER, -1) +.option(ChannelOption.TCP_NODELAY, true); +ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); +if (testAllocator != null) { +bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); +} +bootstrap.validate(); + +connectLock.lock(); +try { +connectFuture = bootstrap.connect(addr); +connectFuture.addListener(new ChannelFutureListener() { +@Override +public void operationComplete(ChannelFuture channelFuture) throws Exception { +// this lock guarantees that channel won't be assigned after cleanup(). +connectLock.lock(); +try { +if (!channelFuture.isSuccess()) { +LOG.info("future isn't success, cause:", channelFuture.cause()); +return; +} else if (connectFuture == null) { +LOG.info("connect attempt cancelled"); +// If the connect attempt was cancelled but succeeded +// anyway, make sure to close the channel, otherwise +// we may leak a file descriptor. +channelFuture.channel().close(); --- End diff -- Can this turn into an NPE? As channel() may return null. ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user eolivelli commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r226689302 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java --- @@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext ctx, NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); -ctx.setAttachment(cnxn); +ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { -SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); -ChannelFuture handshakeFuture = sslHandler.handshake(); +SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); +Future handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { -allChannels.add(ctx.getChannel()); +allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override -public void channelDisconnected(ChannelHandlerContext ctx, -ChannelStateEvent e) throws Exception -{ -if (LOG.isTraceEnabled()) { -LOG.trace("Channel disconnected " + e); -} -NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); +public void channelInactive(ChannelHandlerContext ctx) throws Exception { +LOG.trace("Channel inactive {}", ctx.channel()); --- End diff -- isTraceEnabled is missing here? ---
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
Github user dain commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r225332620 --- Diff: zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ResourceLeakDetector; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a custom ByteBufAllocator that tracks outstanding allocations and + * crashes the program if any of them are leaked. + * + * Never use this class in production, it will cause your server to run out + * of memory! This is because it holds strong references to all allocated + * buffers and doesn't release them until checkForLeaks() is called at the + * end of a unit test. + * + * Note: the original code was copied from https://github.com/airlift/drift, + * with the permission and encouragement of airlift's author (dain). Airlift + * uses the same apache 2.0 license as Zookeeper so this should be ok. + * + * However, the code was modified to take advantage of Netty's built-in + * leak tracking and make a best effort to print details about buffer leaks. + */ +public class TestByteBufAllocator extends PooledByteBufAllocator { +private static AtomicReference INSTANCE = +new AtomicReference<>(null); + +/** + * Get the singleton testing allocator. + * @return the singleton allocator, creating it if one does not exist. + */ +public static TestByteBufAllocator getInstance() { +TestByteBufAllocator result = INSTANCE.get(); +if (result == null) { +// Note: the leak detector level never gets reset after this, +// but that's probably ok since this is only used by test code. + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); +INSTANCE.compareAndSet(null, new TestByteBufAllocator()); +result = INSTANCE.get(); +} +return result; +} + +/** + * Destroys the singleton testing allocator and throws an error if any of the + * buffers allocated by it have been leaked. Attempts to print leak details to + * standard error before throwing, by using netty's built-in leak tracking. + * Note that this might not always work, since it only triggers when a buffer + * is garbage-collected and calling System.gc() does not guarantee that a buffer + * will actually be GC'ed. + * + * This should be called at the end of a unit test's tearDown() method. + */ +public static void checkForLeaks() { +TestByteBufAllocator result = INSTANCE.getAndSet(null); +if (result != null) { +result.checkInstanceForLeaks(); +} + +} + +private final List trackedBuffers = new ArrayList<>(); + +public TestByteBufAllocator() +{ +super(false); +} + +@Override +protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newHeapBuffer(initialCapacity, maxCapacity)); +} + +@Override +protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) +{ +return track(super.newDirectBuffer(initialCapacity, maxCapacity)); +} + +@Override +public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) +{ +return track(super.compositeHeapBuffer(maxNumComponents)); +} + +@Override +public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) +{ +
[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4
GitHub user ivmaykov opened a pull request: https://github.com/apache/zookeeper/pull/669 ZOOKEEPER-3152: Port ZK netty stack to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. FB Reviewers: nixon You can merge this pull request into a Git repository by running: $ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/zookeeper/pull/669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #669 commit 34c3e275d012bd14c243633a638c85f1ca4a36c4 Author: Ilya Maykov Date: 2018-08-31T23:26:55Z port ZK netty stack from netty3 to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty). Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. Reviewers: nixon, nwolchko, nedelchev Subscribers: Differential Revision: https://phabricator.intern.facebook.com/D9646262 Tasks: Tags: Blame Revision: ---