Updated Branches: refs/heads/trunk c5f2d61e8 -> 62f9fd3fd
http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index bfcfb8d..c033746 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -25,13 +25,10 @@ import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; import org.apache.log4j.Logger; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED; @@ -41,7 +38,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES * @param <R> Request type */ public abstract class RequestServerHandler<R> extends - SimpleChannelUpstreamHandler { + ChannelInboundHandlerAdapter { /** Number of bytes in the encoded response */ public static final int RESPONSE_BYTES = 13; /** Time class to use */ @@ -77,13 +74,13 @@ public abstract class RequestServerHandler<R> extends } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) { + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("messageReceived: Got " + e.getMessage().getClass()); + LOG.trace("messageReceived: Got " + msg.getClass()); } - WritableRequest writableRequest = (WritableRequest) e.getMessage(); + WritableRequest writableRequest = (WritableRequest) msg; // Simulate a closed connection on the first request (if desired) if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) { @@ -91,7 +88,7 @@ public abstract class RequestServerHandler<R> extends "request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId()); setAlreadyClosedFirstRequest(); - ctx.getChannel().close(); + ctx.close(); return; } @@ -121,11 +118,11 @@ public abstract class RequestServerHandler<R> extends } // Send the response with the request id - ChannelBuffer buffer = ChannelBuffers.buffer(RESPONSE_BYTES); + ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES); buffer.writeInt(myTaskInfo.getTaskId()); buffer.writeLong(writableRequest.getRequestId()); buffer.writeByte(alreadyDone); - e.getChannel().write(buffer); + ctx.writeAndFlush(buffer); } /** @@ -143,28 +140,28 @@ public abstract class RequestServerHandler<R> extends public abstract void processRequest(R request); @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("channelConnected: Connected the channel on " + - ctx.getChannel().getRemoteAddress()); + LOG.debug("channelActive: Connected the channel on " + + ctx.channel().remoteAddress()); } + ctx.fireChannelActive(); } @Override - public void channelClosed(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("channelClosed: Closed the channel on " + - ctx.getChannel().getRemoteAddress() + " with event " + - e); + LOG.debug("channelInactive: Closed the channel on " + + ctx.channel().remoteAddress()); } + ctx.fireChannelInactive(); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { LOG.warn("exceptionCaught: Channel failed with " + - "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause()); + "remote address " + ctx.channel().remoteAddress(), cause.getCause()); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java index 1ba06e9..463a47e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java @@ -20,23 +20,20 @@ package org.apache.giraph.comm.netty.handler; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; - -import java.io.IOException; + import java.util.concurrent.ConcurrentMap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; + import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED; /** * Generic handler of responses. */ -public class ResponseClientHandler extends SimpleChannelUpstreamHandler { +public class ResponseClientHandler extends ChannelInboundHandlerAdapter { /** Class logger */ private static final Logger LOG = Logger.getLogger(ResponseClientHandler.class); @@ -45,8 +42,8 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { /** Drop first response (used for simulating failure) */ private final boolean dropFirstResponse; /** Outstanding worker request map */ - private final ConcurrentMap<ClientRequestId, RequestInfo> - workerIdOutstandingRequestMap; + private final ConcurrentMap<ClientRequestId, + RequestInfo> workerIdOutstandingRequestMap; /** * Constructor. @@ -64,27 +61,27 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent event) { - if (!(event.getMessage() instanceof ChannelBuffer)) { + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (!(msg instanceof ByteBuf)) { throw new IllegalStateException("messageReceived: Got a " + - "non-ChannelBuffer message " + event.getMessage()); + "non-ByteBuf message " + msg); } - ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); - ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer); + ByteBuf buf = (ByteBuf) msg; int senderId = -1; long requestId = -1; int response = -1; try { - senderId = inputStream.readInt(); - requestId = inputStream.readLong(); - response = inputStream.readByte(); - inputStream.close(); - } catch (IOException e) { + senderId = buf.readInt(); + requestId = buf.readLong(); + response = buf.readByte(); + } catch (IndexOutOfBoundsException e) { throw new IllegalStateException( - "messageReceived: Got IOException ", e); + "channelRead: Got IndexOutOfBoundsException ", e); } + ReferenceCountUtil.release(buf); + // Simulate a failed response on the first response (if desired) if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) { @@ -98,7 +95,8 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { } if (response == 1) { - LOG.info("messageReceived: Already completed request " + requestId); + LOG.info("messageReceived: Already completed request (taskId = " + + senderId + ", requestId = " + requestId + ")"); } else if (response != 0) { throw new IllegalStateException( "messageReceived: Got illegal response " + response); @@ -107,13 +105,13 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { RequestInfo requestInfo = workerIdOutstandingRequestMap.remove( new ClientRequestId(senderId, requestId)); if (requestInfo == null) { - LOG.info("messageReceived: Already received response for request id = " + - requestId); + LOG.info("messageReceived: Already received response for (taskId = " + + senderId + ", requestId = " + requestId + ")"); } else { if (LOG.isDebugEnabled()) { - LOG.debug("messageReceived: Completed " + requestInfo + - ". Waiting on " + workerIdOutstandingRequestMap.size() + - " requests"); + LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" + + requestInfo + ". Waiting on " + workerIdOutstandingRequestMap + .size() + " requests"); } } @@ -131,17 +129,20 @@ public class ResponseClientHandler extends SimpleChannelUpstreamHandler { } @Override - public void channelClosed(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("channelClosed: Closed the channel on " + - ctx.getChannel().getRemoteAddress()); + ctx.channel().remoteAddress()); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.warn("exceptionCaught: Channel failed with " + - "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + if (LOG.isDebugEnabled()) { + LOG.warn("exceptionCaught: Channel failed with " + + "remote address " + ctx.channel().remoteAddress(), cause.getCause()); + } } } + http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java index e148c8c..c0b45fc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java @@ -21,29 +21,28 @@ package org.apache.giraph.comm.netty.handler; import org.apache.giraph.comm.requests.RequestType; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.log4j.Logger; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; /** * How a server should respond to a client. Currently only used for * responding to client's SASL messages, and removed after client * authenticates. */ -public class ResponseEncoder extends OneToOneEncoder { +public class ResponseEncoder extends ChannelOutboundHandlerAdapter { /** Class logger. */ private static final Logger LOG = Logger.getLogger(ResponseEncoder.class); /** Holds the place of the message length until known. */ private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; @Override - protected Object encode(ChannelHandlerContext ctx, - Channel channel, Object msg) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { if (LOG.isDebugEnabled()) { - LOG.debug("encode(" + ctx + "," + channel + "," + msg); + LOG.debug("write(" + ctx + "," + msg); } if (!(msg instanceof WritableRequest)) { @@ -54,10 +53,10 @@ public class ResponseEncoder extends OneToOneEncoder { } @SuppressWarnings("unchecked") WritableRequest writableRequest = - (WritableRequest) msg; - ChannelBufferOutputStream outputStream = - new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer( - 10, ctx.getChannel().getConfig().getBufferFactory())); + (WritableRequest) msg; + ByteBuf buf = ctx.alloc().buffer(10); + ByteBufOutputStream outputStream = + new ByteBufOutputStream(buf); if (LOG.isDebugEnabled()) { LOG.debug("encode: Encoding a message of type " + msg.getClass()); @@ -71,14 +70,15 @@ public class ResponseEncoder extends OneToOneEncoder { writableRequest.write(outputStream); outputStream.flush(); + outputStream.close(); // Set the correct size at the end. - ChannelBuffer encodedBuffer = outputStream.buffer(); - encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4); + buf.setInt(0, buf.writerIndex() - 4); if (LOG.isDebugEnabled()) { LOG.debug("encode: Encoding a message of type " + msg.getClass()); } + ctx.write(buf, promise); /*if[HADOOP_NON_SECURE] else[HADOOP_NON_SECURE]*/ if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) { @@ -88,13 +88,13 @@ else[HADOOP_NON_SECURE]*/ // the ResponseEncoder to remove itself also. if (LOG.isDebugEnabled()) { LOG.debug("encode: Removing RequestEncoder handler: no longer needed," + - " since client: " + ctx.getChannel().getRemoteAddress() + " has " + + " since client: " + ctx.channel().remoteAddress() + " has " + "completed authenticating."); } - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); } /*end[HADOOP_NON_SECURE]*/ - return encodedBuffer; + ctx.write(buf, promise); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java index b26a314..8921270 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java @@ -27,14 +27,12 @@ import org.apache.giraph.comm.requests.WritableRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; @@ -42,7 +40,7 @@ import java.io.IOException; * Client-side Netty pipeline component that allows authentication with a * server. */ -public class SaslClientHandler extends OneToOneDecoder { +public class SaslClientHandler extends ChannelInboundHandlerAdapter { /** Class logger */ private static final Logger LOG = Logger.getLogger(SaslClientHandler.class); /** Configuration */ @@ -58,21 +56,14 @@ public class SaslClientHandler extends OneToOneDecoder { } @Override - public void handleUpstream( - ChannelHandlerContext ctx, ChannelEvent evt) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(evt instanceof MessageEvent)) { - ctx.sendUpstream(evt); - return; - } - MessageEvent e = (MessageEvent) evt; - Object originalMessage = e.getMessage(); - Object decodedMessage = decode(ctx, ctx.getChannel(), originalMessage); + WritableRequest decodedMessage = decode(ctx, msg); // Generate SASL response to server using Channel-local SASL client. - SaslNettyClient saslNettyClient = NettyClient.SASL.get(ctx.getChannel()); + SaslNettyClient saslNettyClient = ctx.attr(NettyClient.SASL).get(); if (saslNettyClient == null) { throw new Exception("handleUpstream: saslNettyClient was unexpectedly " + - "null for channel: " + ctx.getChannel()); + "null for channel: " + ctx.channel()); } if (decodedMessage.getClass() == SaslCompleteRequest.class) { if (LOG.isDebugEnabled()) { @@ -91,8 +82,8 @@ public class SaslClientHandler extends OneToOneDecoder { } // Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder // from client pipeline. - ctx.getPipeline().remove(this); - ctx.getPipeline().replace("length-field-based-frame-decoder", + ctx.pipeline().remove(this); + ctx.pipeline().replace("length-field-based-frame-decoder", "fixed-length-frame-decoder", new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES)); return; @@ -128,27 +119,34 @@ public class SaslClientHandler extends OneToOneDecoder { // server. SaslTokenMessageRequest saslResponse = new SaslTokenMessageRequest(responseToServer); - ctx.getChannel().write(saslResponse); + ctx.channel().writeAndFlush(saslResponse); } - @Override - protected Object decode(ChannelHandlerContext ctx, - Channel channel, Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { + /** + * Decode the message read by handler + * + * @param ctx channel handler context + * @param msg message to decode into a writable request + * @return decoded writablerequest object + * @throws Exception + */ + protected WritableRequest decode(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (!(msg instanceof ByteBuf)) { throw new IllegalStateException("decode: Got illegal message " + msg); } // Decode msg into an object whose class C implements WritableRequest: // C will be either SaslTokenMessage or SaslComplete. // // 1. Convert message to a stream that can be decoded. - ChannelBuffer buffer = (ChannelBuffer) msg; - ChannelBufferInputStream inputStream = new ChannelBufferInputStream(buffer); + ByteBuf buf = (ByteBuf) msg; + ByteBufInputStream inputStream = new ByteBufInputStream(buf); // 2. Get first byte: message type: int enumValue = inputStream.readByte(); RequestType type = RequestType.values()[enumValue]; if (LOG.isDebugEnabled()) { LOG.debug("decode: Got a response of type " + type + " from server:" + - channel.getRemoteAddress()); + ctx.channel().remoteAddress()); } // 3. Create object of the type determined in step 2. Class<? extends WritableRequest> writableRequestClass = @@ -162,6 +160,7 @@ public class SaslClientHandler extends OneToOneDecoder { } catch (IOException e) { LOG.error("decode: Exception when trying to read server response: " + e); } + ReferenceCountUtil.release(buf); // serverResponse can now be used in the next stage in pipeline. return serverResponse; } http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java index da06334..fff58cc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java @@ -35,11 +35,11 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -52,7 +52,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES * authenticate themselves with this server. */ public class SaslServerHandler extends - SimpleChannelUpstreamHandler { + ChannelInboundHandlerAdapter { /** Class logger */ private static final Logger LOG = Logger.getLogger(SaslServerHandler.class); @@ -80,13 +80,14 @@ public class SaslServerHandler extends } @Override - public void messageReceived( - ChannelHandlerContext ctx, MessageEvent e) { + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (LOG.isDebugEnabled()) { - LOG.debug("messageReceived: Got " + e.getMessage().getClass()); + LOG.debug("messageReceived: Got " + msg.getClass()); } - WritableRequest writableRequest = (WritableRequest) e.getMessage(); + WritableRequest writableRequest = (WritableRequest) msg; // Simulate a closed connection on the first request (if desired) // TODO: Move out into a separate, dedicated handler. if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) { @@ -94,7 +95,7 @@ public class SaslServerHandler extends "request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId()); setAlreadyClosedFirstRequest(); - ctx.getChannel().close(); + ctx.close(); return; } @@ -103,10 +104,10 @@ public class SaslServerHandler extends // (in which case we are looking at the first SASL message from the // client). SaslNettyServer saslNettyServer = - NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel()); + ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get(); if (saslNettyServer == null) { if (LOG.isDebugEnabled()) { - LOG.debug("No saslNettyServer for " + ctx.getChannel() + + LOG.debug("No saslNettyServer for " + ctx.channel() + " yet; creating now, with secret manager: " + secretManager); } try { @@ -115,19 +116,18 @@ public class SaslServerHandler extends } catch (IOException ioe) { //TODO: throw new RuntimeException(ioe); } - NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(), - saslNettyServer); + ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer); } else { if (LOG.isDebugEnabled()) { LOG.debug("Found existing saslNettyServer on server:" + - ctx.getChannel().getLocalAddress() + " for client " + - ctx.getChannel().getRemoteAddress()); + ctx.channel().localAddress() + " for client " + + ctx.channel().remoteAddress()); } } ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer); // Send response to client. - ctx.getChannel().write(writableRequest); + ctx.write(writableRequest); if (saslNettyServer.isComplete()) { // If authentication of client is complete, we will also send a // SASL-Complete message to the client. @@ -136,13 +136,14 @@ public class SaslServerHandler extends "username: " + saslNettyServer.getUserName()); } SaslCompleteRequest saslComplete = new SaslCompleteRequest(); - ctx.getChannel().write(saslComplete); + ctx.write(saslComplete); if (LOG.isDebugEnabled()) { LOG.debug("Removing SaslServerHandler from pipeline since SASL " + "authentication is complete."); } - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); } + ctx.flush(); // do not send upstream to other handlers: no further action needs to be // done for SASL_TOKEN_MESSAGE_REQUEST requests. return; @@ -154,7 +155,7 @@ public class SaslServerHandler extends // not completed. LOG.warn("Sending upstream an unexpected non-SASL message : " + writableRequest); - ctx.sendUpstream(e); + ctx.fireChannelRead(msg); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index abc81e8..ea1f12d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -47,6 +47,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.net.DNS; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; + import java.net.UnknownHostException; /** @@ -59,6 +63,9 @@ import java.net.UnknownHostException; */ public class GiraphConfiguration extends Configuration implements GiraphConstants { + /** ByteBufAllocator to be used by netty */ + private ByteBufAllocator nettyBufferAllocator = null; + /** * Constructor that creates the configuration */ @@ -840,6 +847,25 @@ public class GiraphConfiguration extends Configuration } } + /** + * Used by netty client and server to create ByteBufAllocator + * + * @return ByteBufAllocator + */ + public ByteBufAllocator getNettyAllocator() { + if (nettyBufferAllocator == null) { + if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator + nettyBufferAllocator = new PooledByteBufAllocator( + NETTY_USE_DIRECT_MEMORY.get(this)); + } else { // Use un-pooled allocator + // Note: Current default settings create un-pooled heap allocator + nettyBufferAllocator = new UnpooledByteBufAllocator( + NETTY_USE_DIRECT_MEMORY.get(this)); + } + } + return nettyBufferAllocator; + } + public int getZookeeperConnectionAttempts() { return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this); } http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 9271152..61830e8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -469,6 +469,16 @@ public interface GiraphConstants { IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1, "TCP backlog (defaults to number of workers)"); + /** Use netty pooled memory buffer allocator */ + BooleanConfOption NETTY_USE_POOLED_ALLOCATOR = new BooleanConfOption( + "giraph.useNettyPooledAllocator", true, "Should netty use pooled " + + "memory allocator?"); + + /** Use direct memory buffers in netty */ + BooleanConfOption NETTY_USE_DIRECT_MEMORY = new BooleanConfOption( + "giraph.useNettyDirectMemory", true, "Should netty use direct " + + "memory buffers"); + /** How big to make the encoder buffer? */ IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE = new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB, @@ -502,7 +512,7 @@ public interface GiraphConstants { /** Where to place the netty client execution handle? */ StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER = new StrConfOption("giraph.nettyClientExecutionAfterHandler", - "requestEncoder", + "request-encoder", "Where to place the netty client execution handle?"); /** Use the execution handler in netty on the server? */ http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java index 96e3fad..310d38c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java @@ -20,21 +20,21 @@ package org.apache.giraph.utils; import java.io.DataInput; import java.io.IOException; import java.io.UTFDataFormatException; -import org.jboss.netty.buffer.DynamicChannelBuffer; +import io.netty.buffer.ByteBuf; /** * Special input that reads from a DynamicChannelBuffer. */ public class DynamicChannelBufferInputStream implements DataInput { /** Internal dynamic channel buffer */ - private DynamicChannelBuffer buffer; + private ByteBuf buffer; /** * Constructor. * * @param buffer Buffer to read from */ - public DynamicChannelBufferInputStream(DynamicChannelBuffer buffer) { + public DynamicChannelBufferInputStream(ByteBuf buffer) { this.buffer = buffer; } http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java index ca4a7d7..c63627c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java @@ -20,9 +20,9 @@ package org.apache.giraph.utils; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteOrder; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.DirectChannelBufferFactory; -import org.jboss.netty.buffer.DynamicChannelBuffer; + +import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; /** * Special output stream that can grow as needed and dumps to a @@ -30,7 +30,7 @@ import org.jboss.netty.buffer.DynamicChannelBuffer; */ public class DynamicChannelBufferOutputStream implements DataOutput { /** Internal dynamic channel buffer */ - private DynamicChannelBuffer buffer; + private ByteBuf buffer; /** * Constructor @@ -38,9 +38,10 @@ public class DynamicChannelBufferOutputStream implements DataOutput { * @param estimatedLength Estimated length of the buffer */ public DynamicChannelBufferOutputStream(int estimatedLength) { - buffer = (DynamicChannelBuffer) - ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN, - estimatedLength, DirectChannelBufferFactory.getInstance()); + buffer = Unpooled.unreleasableBuffer(Unpooled.buffer(estimatedLength)) + .order(ByteOrder.LITTLE_ENDIAN); + // -- TODO unresolved what are benefits of using releasable? + // currently nit because it is just used in 1 test file } /** @@ -48,7 +49,7 @@ public class DynamicChannelBufferOutputStream implements DataOutput { * * @param buffer Buffer to be written to (cleared before use) */ - public DynamicChannelBufferOutputStream(DynamicChannelBuffer buffer) { + public DynamicChannelBufferOutputStream(ByteBuf buffer) { this.buffer = buffer; buffer.clear(); } @@ -58,7 +59,7 @@ public class DynamicChannelBufferOutputStream implements DataOutput { * * @return dynamic channel buffer (not a copy) */ - public DynamicChannelBuffer getDynamicChannelBuffer() { + public ByteBuf getDynamicChannelBuffer() { return buffer; } @@ -184,4 +185,3 @@ public class DynamicChannelBufferOutputStream implements DataOutput { } } } - http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java new file mode 100644 index 0000000..290e0fa --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.util.concurrent.EventExecutorGroup; + +/** + * Utility class for ChannelPipeline + */ +public class PipelineUtils { + /** Do not instantiate. */ + private PipelineUtils() { } + + /** + * Add a handler to pipeline if it is not already added + * and configure the handler to use an executor group based on equality + * of compareTo and handlerName + * + * @param handlerName string name of the handler + * @param handler channelhandler object + * @param compareTo string name to compare to for executorgroup usage check + * @param executor executorgroup instance + * @param channel netty channel + */ + public static void addLastWithExecutorCheck(String handlerName, + ChannelHandler handler, String compareTo, EventExecutorGroup executor, + Channel channel) { + ChannelPipeline pipeline = channel.pipeline(); + if (channel.pipeline().get(handlerName) != null) { + return; + } + if (compareTo.equals(handlerName)) { + pipeline.addLast(executor, handlerName, handler); + } else { + pipeline.addLast(handlerName, handler); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java index 78c230a..08a7914 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java @@ -20,8 +20,9 @@ package org.apache.giraph.utils; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.ChannelFuture; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.util.concurrent.EventExecutorGroup; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -74,6 +75,17 @@ public class ProgressableUtils { } /** + * Wait for executorgroup to terminate, while periodically reporting progress + * + * @param group ExecutorGroup whose termination we are awaiting + * @param progressable Progressable for reporting progress (Job context) + */ + public static void awaitTerminationFuture(EventExecutorGroup group, + Progressable progressable) { + waitForever(new FutureWaitable<>(group.terminationFuture()), progressable); + } + + /** * Wait for the result of the future to be ready, while periodically * reporting progress. * http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f06722e..ceaef23 100644 --- a/pom.xml +++ b/pom.xml @@ -299,7 +299,7 @@ under the License. <dep.libthrift.version>0.9.0</dep.libthrift.version> <dep.log4j.version>1.2.17</dep.log4j.version> <dep.mockito.version>1.9.5</dep.mockito.version> - <dep.netty.version>3.5.3.Final</dep.netty.version> + <dep.netty.version>4.0.14.Final</dep.netty.version> <dep.paranamer.version>2.3</dep.paranamer.version> <dep.slf4j.version>1.7.5</dep.slf4j.version> <dep.tinkerpop.rexter.version>2.4.0</dep.tinkerpop.rexter.version> @@ -1410,7 +1410,7 @@ under the License. </exclusion> <exclusion> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-all</artifactId> </exclusion> </exclusions> </dependency> @@ -1536,7 +1536,7 @@ under the License. </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-all</artifactId> <version>${dep.netty.version}</version> </dependency> <dependency>
