stoty commented on a change in pull request #92:
URL: https://github.com/apache/phoenix-omid/pull/92#discussion_r597911135



##########
File path: 
transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -513,7 +515,7 @@ void decrementRetries() {
         }
 
         public StateMachine.State handleEvent(CloseEvent e) {
-            factory.releaseExternalResources();
+            bootstrap.config().group().shutdownGracefully();

Review comment:
       I checked the code now, and shutdownGracefully is thread-safe.

##########
File path: 
transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -342,7 +344,7 @@ public void nodeChanged() throws Exception {
         setTSOAddress(hp.getHost(), hp.getPort());
         epoch = Long.parseLong(currentTSOAndEpochArray[1]);
         LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", 
hp, getEpoch());
-        if (currentChannel != null && currentChannel.isConnected()) {
+        if (currentChannel != null && currentChannel.isActive()) {

Review comment:
       Which part do you mean ?
   This handles the case when the active server changes in ZK, and AFAICT this 
is the only place where it is handled.

##########
File path: tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
##########
@@ -17,95 +17,116 @@
  */
 package org.apache.omid.tso;
 
-import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ThreadFactory;
+
+import javax.inject.Inject;
+
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 /**
  * ChannelHandler for the TSO Server.
  *
  * Incoming requests are processed in this class
  */
-public class TSOChannelHandler extends SimpleChannelHandler implements 
Closeable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TSOChannelHandler.class);
+// Marked sharable, as all global members used in callbacks are singletons.
+@Sharable
+public class TSOChannelHandler extends ChannelInboundHandlerAdapter implements 
Closeable {
 
-    private final ChannelFactory factory;
+    private final Logger LOG = 
LoggerFactory.getLogger(TSOChannelHandler.class);
 
     private final ServerBootstrap bootstrap;
 
     @VisibleForTesting
     Channel listeningChannel;
     @VisibleForTesting
-    ChannelGroup channelGroup;
+    ChannelGroup allChannels;
 
     private RequestProcessor requestProcessor;
 
     private TSOServerConfig config;
 
     private MetricsRegistry metrics;
 
+    private static final AttributeKey<TSOChannelContext> TSO_CTX =
+            AttributeKey.valueOf("TSO_CTX");
+
     @Inject
     public TSOChannelHandler(TSOServerConfig config, RequestProcessor 
requestProcessor, MetricsRegistry metrics) {
 
         this.config = config;
         this.metrics = metrics;
         this.requestProcessor = requestProcessor;
-        // Setup netty listener
-        this.factory = new NioServerSocketChannelFactory(
-                Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
-                Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
-                (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
-        this.bootstrap = new ServerBootstrap(factory);
-        bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
 
+        // Setup netty listener
+        int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 
+ 1) * 2;
+        ThreadFactory bossThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+        ThreadFactory workerThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, 
workerThreadFactory);
+        EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+        this.bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,  workerGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel channel) throws Exception {
+                ChannelPipeline pipeline = channel.pipeline();
+                // Max packet length is 10MB. Transactions with so many cells
+                // that the packet is rejected will receive a 
ServiceUnavailableException.

Review comment:
       Opened https://issues.apache.org/jira/browse/OMID-204

##########
File path: 
transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
##########
@@ -1012,37 +1016,34 @@ private void closeChannelAndErrorRequests() {
         }
 
         @Override
-        public void channelConnected(ChannelHandlerContext ctx, 
ChannelStateEvent e) {
-            currentChannel = e.getChannel();
-            LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending 
connected event to FSM", e);
-            fsm.sendEvent(new ConnectedEvent(e.getChannel()));
+        public void channelActive(ChannelHandlerContext ctx) {
+            currentChannel = ctx.channel();
+            LOG.debug("HANDLER (CHANNEL ACTIVE): Connection {}. Sending 
connected event to FSM", ctx.channel());
+            fsm.sendEvent(new ConnectedEvent(ctx.channel()));
         }
 
         @Override
-        public void channelDisconnected(ChannelHandlerContext ctx, 
ChannelStateEvent e) throws Exception {
-            LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending 
error event to FSM", e);
+        public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+            LOG.debug("HANDLER (CHANNEL INACTIVE): Connection {}. Sending 
error, then channelClosed event to FSM", ctx.channel());
+            // Netty 3 had separate callbacks, and the FSM expects both events.
+            // Sending both is much easier than rewriting the FSM

Review comment:
       Client has a singleton FSM which tracks the connection state, and tracks 
the outstanding requests and their timeouts.
   
   The FSM has separate **ClosingState**, which seems to exist solely to 
consume the netty3 channelClosedEvent.
   
   As that event no longer exists in netty4, having been consolidated into the 
channelInactive, we either need to synthesize the event, or rewrite the FSM, 
otherwise the FSM will get stuck in it.
   
   See 
https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-channel-state-model
   
   I can have a go at rewriting the FSM, but it's not in a performance critical 
path, so I'd rather leave that logic alone for now, and concentrate on the 
netty API change.
   
   I can open a follow-up for ticket for cleaning up the FSM.

##########
File path: tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
##########
@@ -58,39 +62,39 @@
     private final Channel channel;
 
     public TSOClientRaw(String host, int port) throws InterruptedException, 
ExecutionException {
-        // Start client with Nb of active threads = 3 as maximum.
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(
-                        new 
ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
-                Executors.newCachedThreadPool(
-                        new 
ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
-        // Create the bootstrap
-        ClientBootstrap bootstrap = new ClientBootstrap(factory);
 
         InetSocketAddress addr = new InetSocketAddress(host, port);
 
-        ChannelPipeline pipeline = bootstrap.getPipeline();
-        pipeline.addLast("lengthbaseddecoder",
-                new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
-        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
-        pipeline.addLast("protobufdecoder",
-                new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
-        pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
-        Handler handler = new Handler();
-        pipeline.addLast("handler", handler);
-
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("keepAlive", true);
-        bootstrap.setOption("reuseAddress", true);
-        bootstrap.setOption("connectTimeoutMillis", 100);
+        // Start client with Nb of active threads = 3
+        ThreadFactory workerThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+        EventLoopGroup workerGroup = new NioEventLoopGroup(3, 
workerThreadFactory);

Review comment:
       Not sure about netty3, but according to the netty4 docs, the boss group 
is not used for clients.
   
   https://netty.io/wiki/user-guide-for-4.x.html#writing-a-time-client




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to