Github user ivmaykov commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/669#discussion_r226757190
--- Diff:
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext
ctx,
NettyServerCnxn cnxn = new NettyServerCnxn(channel,
zkServer, NettyServerCnxnFactory.this);
- ctx.setAttachment(cnxn);
+ ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
if (secure) {
- SslHandler sslHandler =
ctx.getPipeline().get(SslHandler.class);
- ChannelFuture handshakeFuture = sslHandler.handshake();
+ SslHandler sslHandler =
ctx.pipeline().get(SslHandler.class);
+ Future<Channel> handshakeFuture =
sslHandler.handshakeFuture();
handshakeFuture.addListener(new
CertificateVerifier(sslHandler, cnxn));
} else {
- allChannels.add(ctx.getChannel());
+ allChannels.add(ctx.channel());
addCnxn(cnxn);
}
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception
- {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Channel disconnected " + e);
- }
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ public void channelInactive(ChannelHandlerContext ctx) throws
Exception {
+ LOG.trace("Channel inactive {}", ctx.channel());
+ allChannels.remove(ctx.channel());
+ NettyServerCnxn cnxn =
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Channel disconnect caused close " + e);
- }
+ LOG.trace("Channel inactive caused close {}", cnxn);
cnxn.close();
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e)
- throws Exception
- {
- LOG.warn("Exception caught " + e, e.getCause());
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception {
+ LOG.warn("Exception caught", cause);
+ NettyServerCnxn cnxn =
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing " + cnxn);
- }
+ LOG.debug("Closing {}", cnxn);
cnxn.close();
}
}
@Override
- public void messageReceived(ChannelHandlerContext ctx,
MessageEvent e)
- throws Exception
- {
- if (LOG.isTraceEnabled()) {
- LOG.trace("message received called " + e.getMessage());
- }
+ public void userEventTriggered(ChannelHandlerContext ctx, Object
evt) throws Exception {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("New message " + e.toString()
- + " from " + ctx.getChannel());
- }
- NettyServerCnxn cnxn =
(NettyServerCnxn)ctx.getAttachment();
- synchronized(cnxn) {
- processMessage(e, cnxn);
+ if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+ LOG.debug("Received AutoReadEvent.ENABLE");
+ NettyServerCnxn cnxn =
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+ // TODO(ilyam): Not sure if cnxn can be null here. It
becomes null if channelInactive()
+ // or exceptionCaught() trigger, but it's unclear to
me if userEventTriggered() can run
+ // after either of those. Check for null just to be
safe ...
+ if (cnxn != null) {
+ cnxn.processQueuedBuffer();
+ }
+ ctx.channel().config().setAutoRead(true);
+ } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+ LOG.debug("Received AutoReadEvent.DISABLE");
+ ctx.channel().config().setAutoRead(false);
}
- } catch(Exception ex) {
- LOG.error("Unexpected exception in receive", ex);
- throw ex;
+ } finally {
+ ReferenceCountUtil.release(evt);
}
}
- private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(Long.toHexString(cnxn.sessionId) + "
queuedBuffer: "
- + cnxn.queuedBuffer);
- }
-
- if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
- LOG.debug("Received ResumeMessageEvent");
- if (cnxn.queuedBuffer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("processing queue "
- + Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- +
ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- cnxn.receiveMessage(cnxn.queuedBuffer);
- if (!cnxn.queuedBuffer.readable()) {
- LOG.debug("Processed queue - no bytes remaining");
- cnxn.queuedBuffer = null;
- } else {
- LOG.debug("Processed queue - bytes remaining");
- }
- } else {
- LOG.debug("queue empty");
- }
- cnxn.channel.setReadable(true);
- } else {
- ChannelBuffer buf = (ChannelBuffer)e.getMessage();
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " buf 0x"
- + ChannelBuffers.hexDump(buf));
- }
-
- if (cnxn.throttled) {
- LOG.debug("Received message while throttled");
- // we are throttled, so we need to queue
- if (cnxn.queuedBuffer == null) {
- LOG.debug("allocating queue");
- cnxn.queuedBuffer =
dynamicBuffer(buf.readableBytes());
- }
- cnxn.queuedBuffer.writeBytes(buf);
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- +
ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- } else {
- LOG.debug("not throttled");
- if (cnxn.queuedBuffer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- +
ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- cnxn.queuedBuffer.writeBytes(buf);
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- +
ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
-
- cnxn.receiveMessage(cnxn.queuedBuffer);
- if (!cnxn.queuedBuffer.readable()) {
- LOG.debug("Processed queue - no bytes
remaining");
- cnxn.queuedBuffer = null;
- } else {
- LOG.debug("Processed queue - bytes remaining");
- }
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
+ try {
+ LOG.trace("message received called {}", msg);
--- End diff --
as above.
---