This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 92dcc05469b17f014fc72711ec0c3269b4160b7b
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Mon Mar 7 09:28:31 2022 +0700

    JAMES-3715 Execute core handlers outside of the event loop
    
    WIP SMTP pipelining is currently brokem
---
 .../protocols/netty/AbstractChannelPipelineFactory.java  | 13 ++++++++-----
 .../netty/AbstractSSLAwareChannelPipelineFactory.java    | 10 ++++++----
 .../protocols/netty/BasicChannelUpstreamHandler.java     | 11 +++++++----
 .../james/protocols/netty/NettyProtocolTransport.java    |  7 +++++--
 .../org/apache/james/protocols/netty/NettyServer.java    |  8 +++++---
 .../org/apache/james/imapserver/netty/IMAPServer.java    | 10 +++++-----
 .../lib/netty/AbstractConfigurableAsyncServer.java       | 16 ++++++++++++----
 .../AbstractExecutorAwareChannelPipelineFactory.java     |  5 +++--
 .../org/apache/james/lmtpserver/netty/LMTPServer.java    |  2 +-
 .../james/managesieveserver/netty/ManageSieveServer.java | 14 +++++++-------
 .../org/apache/james/pop3server/netty/POP3Server.java    |  2 +-
 .../smtpserver/netty/SMTPChannelUpstreamHandler.java     |  9 +++++----
 .../org/apache/james/smtpserver/netty/SMTPServer.java    |  2 +-
 13 files changed, 66 insertions(+), 43 deletions(-)

diff --git 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
index 32049af..db4a1bd 100644
--- 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
+++ 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * Abstract base class for {@link ChannelInitializer} implementations
@@ -38,19 +39,21 @@ public abstract class AbstractChannelPipelineFactory<C 
extends SocketChannel> ex
     private final ChannelGroupHandler groupHandler;
     private final int timeout;
     private final ChannelHandlerFactory frameHandlerFactory;
+    private final EventExecutorGroup eventExecutorGroup;
 
     public AbstractChannelPipelineFactory(ChannelGroup channels,
-                                          ChannelHandlerFactory 
frameHandlerFactory) {
-        this(0, 0, 0, channels, frameHandlerFactory);
+                                          ChannelHandlerFactory 
frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+        this(0, 0, 0, channels, frameHandlerFactory, eventExecutorGroup);
     }
 
     public AbstractChannelPipelineFactory(int timeout, int maxConnections, int 
maxConnectsPerIp, ChannelGroup channels,
-                                          ChannelHandlerFactory 
frameHandlerFactory) {
+                                          ChannelHandlerFactory 
frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
         this.connectionLimitHandler = new 
ConnectionLimitUpstreamHandler(maxConnections);
         this.connectionPerIpLimitHandler = new 
ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp);
         this.groupHandler = new ChannelGroupHandler(channels);
         this.timeout = timeout;
         this.frameHandlerFactory = frameHandlerFactory;
+        this.eventExecutorGroup = eventExecutorGroup;
     }
     
     
@@ -66,13 +69,13 @@ public abstract class AbstractChannelPipelineFactory<C 
extends SocketChannel> ex
 
         
         // Add the text line decoder which limit the max line length, don't 
strip the delimiter and use CRLF as delimiter
-        pipeline.addLast(HandlerConstants.FRAMER, 
frameHandlerFactory.create(pipeline));
+        pipeline.addLast(eventExecutorGroup, HandlerConstants.FRAMER, 
frameHandlerFactory.create(pipeline));
        
         // Add the ChunkedWriteHandler to be able to write ChunkInput
         pipeline.addLast(HandlerConstants.CHUNK_HANDLER, new 
ChunkedWriteHandler());
         pipeline.addLast(HandlerConstants.TIMEOUT_HANDLER, new 
TimeoutHandler(timeout));
 
-        pipeline.addLast(HandlerConstants.CORE_HANDLER, createHandler());
+        pipeline.addLast(eventExecutorGroup, HandlerConstants.CORE_HANDLER, 
createHandler());
     }
 
     
diff --git 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
index e509ee0..f3399c9 100644
--- 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
+++ 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -39,14 +40,15 @@ public abstract class 
AbstractSSLAwareChannelPipelineFactory<C extends SocketCha
 
     public AbstractSSLAwareChannelPipelineFactory(int timeout,
                                                   int maxConnections, int 
maxConnectsPerIp, ChannelGroup group,
-                                                  ChannelHandlerFactory 
frameHandlerFactory) {
-        super(timeout, maxConnections, maxConnectsPerIp, group, 
frameHandlerFactory);
+                                                  ChannelHandlerFactory 
frameHandlerFactory,
+                                                  EventExecutorGroup 
eventExecutorGroup) {
+        super(timeout, maxConnections, maxConnectsPerIp, group, 
frameHandlerFactory, eventExecutorGroup);
     }
 
     public AbstractSSLAwareChannelPipelineFactory(int timeout,
             int maxConnections, int maxConnectsPerIp, ChannelGroup group, 
Encryption secure,
-            ChannelHandlerFactory frameHandlerFactory) {
-        this(timeout, maxConnections, maxConnectsPerIp, group, 
frameHandlerFactory);
+            ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup 
eventExecutorGroup) {
+        this(timeout, maxConnections, maxConnectsPerIp, group, 
frameHandlerFactory, eventExecutorGroup);
 
         this.secure = secure;
     }
diff --git 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
index 08086a6..1cde815 100644
--- 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
@@ -51,6 +51,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -67,16 +68,18 @@ public class BasicChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter {
     protected final Protocol protocol;
     protected final ProtocolHandlerChain chain;
     protected final Encryption secure;
+    private final EventExecutorGroup eventExecutors;
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory 
mdcContextFactory, Protocol protocol) {
-        this(mdcContextFactory, protocol, null);
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory 
mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) {
+        this(mdcContextFactory, protocol, null, eventExecutors);
     }
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory 
mdcContextFactory, Protocol protocol, Encryption secure) {
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory 
mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup 
eventExecutors) {
         this.mdcContextFactory = mdcContextFactory;
         this.protocol = protocol;
         this.chain = protocol.getProtocolChain();
         this.secure = secure;
+        this.eventExecutors = eventExecutors;
     }
 
 
@@ -197,7 +200,7 @@ public class BasicChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter {
             engine = secure.createSSLEngine();
         }
 
-        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), 
engine));
+        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), 
engine, eventExecutors));
     }
 
     @Override
diff --git 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
index 36d5ab8..bdeee67 100644
--- 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
+++ 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.DefaultFileRegion;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedStream;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -47,11 +48,13 @@ public class NettyProtocolTransport extends 
AbstractProtocolTransport {
     
     private final Channel channel;
     private final SSLEngine engine;
+    private final EventExecutorGroup eventExecutors;
     private int lineHandlerCount = 0;
     
-    public NettyProtocolTransport(Channel channel, SSLEngine engine) {
+    public NettyProtocolTransport(Channel channel, SSLEngine engine, 
EventExecutorGroup eventExecutors) {
         this.channel = channel;
         this.engine = engine;
+        this.eventExecutors = eventExecutors;
     }
 
     @Override
@@ -156,7 +159,7 @@ public class NettyProtocolTransport extends 
AbstractProtocolTransport {
         // it is executed with the same ExecutorHandler as the coreHandler (if 
one exist)
         // 
         // See JAMES-1277
-        channel.pipeline().addBefore(HandlerConstants.CORE_HANDLER, 
"lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, 
overrideCommandHandler));
+        channel.pipeline().addBefore(eventExecutors, 
HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new 
LineHandlerUpstreamHandler(session, overrideCommandHandler));
     }
     
    
diff --git 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index 35da9a9..e50a498 100644
--- 
a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ 
b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -28,8 +28,9 @@ import org.apache.james.protocols.api.Protocol;
 import com.google.common.base.Preconditions;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
-
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
 
 
 /**
@@ -104,7 +105,7 @@ public class NettyServer extends AbstractAsyncServer {
     }
 
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return new BasicChannelUpstreamHandler(new 
ProtocolMDCContextFactory.Standard(), protocol, secure);
+        return new BasicChannelUpstreamHandler(new 
ProtocolMDCContextFactory.Standard(), protocol, secure, new 
DefaultEventExecutorGroup(2));
     }
     
     @Override
@@ -126,7 +127,8 @@ public class NettyServer extends AbstractAsyncServer {
             maxCurConnectionsPerIP,
             group,
             secure,
-            getFrameHandlerFactory()) {
+            getFrameHandlerFactory(),
+            new DefaultEventLoopGroup(16)) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 7896835..649f4a8 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -211,7 +211,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory(final 
ChannelGroup group) {
         
-        return new AbstractChannelPipelineFactory(group, 
getFrameHandlerFactory()) {
+        return new AbstractChannelPipelineFactory(group, 
getFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
@@ -235,7 +235,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
                 // Add the text line decoder which limit the max line length,
                 // don't strip the delimiter and use CRLF as delimiter
                 // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
-                pipeline.addLast(FRAMER, 
getFrameHandlerFactory().create(pipeline));
+                pipeline.addLast(getExecutorGroup(), FRAMER, 
getFrameHandlerFactory().create(pipeline));
                
                 Encryption secure = getEncryption();
                 if (secure != null && !secure.isStartTLS()) {
@@ -248,11 +248,11 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
                 }
                 pipeline.addLast(CONNECTION_COUNT_HANDLER, 
getConnectionCountHandler());
 
-                pipeline.addLast(CHUNK_WRITE_HANDLER, new 
ChunkedWriteHandler());
+                pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new 
ChunkedWriteHandler());
 
-                pipeline.addLast(REQUEST_DECODER, new 
ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
+                pipeline.addLast(getExecutorGroup(), REQUEST_DECODER, new 
ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
 
-                pipeline.addLast(CORE_HANDLER, createHandler());
+                pipeline.addLast(getExecutorGroup(), CORE_HANDLER, 
createCoreHandler());
             }
 
         };
diff --git 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
index 1ccbd84..1da9ce6 100644
--- 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
+++ 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
@@ -48,6 +48,7 @@ import org.apache.james.protocols.lib.jmx.ServerMBean;
 import org.apache.james.protocols.netty.AbstractAsyncServer;
 import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
 import org.apache.james.protocols.netty.ChannelHandlerFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,8 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 import nl.altindag.ssl.SSLFactory;
 import nl.altindag.ssl.util.PemUtils;
 
@@ -118,7 +121,7 @@ public abstract class AbstractConfigurableAsyncServer 
extends AbstractAsyncServe
 
     private ChannelHandlerFactory frameHandlerFactory;
 
-    private int maxExecutorThreads;
+    private EventExecutorGroup executorGroup;
 
     private MBeanServer mbeanServer;
 
@@ -189,8 +192,8 @@ public abstract class AbstractConfigurableAsyncServer 
extends AbstractAsyncServe
         int ioWorker = config.getInt("ioWorkerCount", DEFAULT_IO_WORKER_COUNT);
         setIoWorkerCount(ioWorker);
 
-        maxExecutorThreads = config.getInt("maxExecutorCount", 
DEFAULT_MAX_EXECUTOR_COUNT);
-
+        executorGroup = new 
DefaultEventExecutorGroup(config.getInt("maxExecutorCount", 
DEFAULT_MAX_EXECUTOR_COUNT),
+            NamedThreadFactory.withName(jmxName));
         
         configureHelloName(config);
 
@@ -266,6 +269,10 @@ public abstract class AbstractConfigurableAsyncServer 
extends AbstractAsyncServe
 
     }
 
+    protected EventExecutorGroup getExecutorGroup() {
+        return executorGroup;
+    }
+
     @PostConstruct
     public final void init() throws Exception {
 
@@ -308,6 +315,7 @@ public abstract class AbstractConfigurableAsyncServer 
extends AbstractAsyncServe
         if (isEnabled()) {
             unbind();
             postDestroy();
+            executorGroup.shutdownGracefully();
 
             unregisterMBean();
         }
@@ -550,7 +558,7 @@ public abstract class AbstractConfigurableAsyncServer 
extends AbstractAsyncServe
     @Override
     protected AbstractChannelPipelineFactory 
createPipelineFactory(ChannelGroup group) {
         return new AbstractExecutorAwareChannelPipelineFactory(getTimeout(), 
connectionLimit, connPerIP, group,
-            getEncryption(), getFrameHandlerFactory()) {
+            getEncryption(), getFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
diff --git 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
index a4271eb..4890f3c 100644
--- 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
+++ 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.james.protocols.netty.ChannelHandlerFactory;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * Abstract base class which should get used if you MAY need an {@link 
ExecutionHandler}
@@ -35,8 +36,8 @@ public abstract class 
AbstractExecutorAwareChannelPipelineFactory extends Abstra
 
     public AbstractExecutorAwareChannelPipelineFactory(int timeout, int 
maxConnections, int maxConnectsPerIp,
                                                        ChannelGroup group, 
Encryption encryption,
-                                                       ChannelHandlerFactory 
frameHandlerFactory) {
-        super(timeout, maxConnections, maxConnectsPerIp, group, encryption, 
frameHandlerFactory);
+                                                       ChannelHandlerFactory 
frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+        super(timeout, maxConnections, maxConnectsPerIp, group, encryption, 
frameHandlerFactory, eventExecutorGroup);
     }
     
     /**
diff --git 
a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
 
b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
index 7899a7a..0596813 100644
--- 
a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
+++ 
b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
@@ -142,7 +142,7 @@ public class LMTPServer extends AbstractProtocolAsyncServer 
implements LMTPServe
     @Override
     protected ChannelInboundHandlerAdapter createCoreHandler() {
         SMTPProtocol protocol = new SMTPProtocol(getProtocolHandlerChain(), 
lmtpConfig);
-        return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics);
+        return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics, 
getExecutorGroup());
     }
 
     @Override
diff --git 
a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
 
b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
index e9f35fa..294ddc7 100644
--- 
a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
+++ 
b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
@@ -86,7 +86,7 @@ public class ManageSieveServer extends 
AbstractConfigurableAsyncServer implement
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory(final 
ChannelGroup group) {
 
-        return new AbstractChannelPipelineFactory(group, 
createFrameHandlerFactory()) {
+        return new AbstractChannelPipelineFactory(group, 
createFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
@@ -113,13 +113,13 @@ public class ManageSieveServer extends 
AbstractConfigurableAsyncServer implement
                 // Add the text line decoder which limit the max line length,
                 // don't strip the delimiter and use CRLF as delimiter
                 // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
-                pipeline.addLast(FRAMER, 
getFrameHandlerFactory().create(pipeline));
-                pipeline.addLast(CONNECTION_COUNT_HANDLER, 
getConnectionCountHandler());
-                pipeline.addLast(CHUNK_WRITE_HANDLER, new 
ChunkedWriteHandler());
+                pipeline.addLast(getExecutorGroup(), FRAMER, 
getFrameHandlerFactory().create(pipeline));
+                pipeline.addLast(getExecutorGroup(), CONNECTION_COUNT_HANDLER, 
getConnectionCountHandler());
+                pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new 
ChunkedWriteHandler());
 
-                pipeline.addLast("stringDecoder", new 
StringDecoder(CharsetUtil.UTF_8));
-                pipeline.addLast(CORE_HANDLER, createHandler());
-                pipeline.addLast("stringEncoder", new 
StringEncoder(CharsetUtil.UTF_8));
+                pipeline.addLast(getExecutorGroup(), "stringDecoder", new 
StringDecoder(CharsetUtil.UTF_8));
+                pipeline.addLast(getExecutorGroup(), CORE_HANDLER, 
createHandler());
+                pipeline.addLast(getExecutorGroup(), "stringEncoder", new 
StringEncoder(CharsetUtil.UTF_8));
             }
 
         };
diff --git 
a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
 
b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index 242874a..f05ec6c 100644
--- 
a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ 
b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -77,7 +77,7 @@ public class POP3Server extends AbstractProtocolAsyncServer 
implements POP3Serve
     protected void preInit() throws Exception {
         super.preInit();
         POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), 
theConfigData);
-        coreHandler = new BasicChannelUpstreamHandler(new 
ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
+        coreHandler = new BasicChannelUpstreamHandler(new 
ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), 
getExecutorGroup());
     }
 
     @Override
diff --git 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
index 8152716..5379db5 100644
--- 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
@@ -29,6 +29,7 @@ import org.apache.james.smtpserver.SMTPConstants;
 
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
@@ -38,13 +39,13 @@ public class SMTPChannelUpstreamHandler extends 
BasicChannelUpstreamHandler {
 
     private final SmtpMetrics smtpMetrics;
 
-    public SMTPChannelUpstreamHandler(Protocol protocol, Encryption 
encryption, SmtpMetrics smtpMetrics) {
-        super(new SMTPMDCContextFactory(), protocol, encryption);
+    public SMTPChannelUpstreamHandler(Protocol protocol, Encryption 
encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+        super(new SMTPMDCContextFactory(), protocol, encryption, 
eventExecutorGroup);
         this.smtpMetrics = smtpMetrics;
     }
 
-    public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics 
smtpMetrics) {
-        super(new SMTPMDCContextFactory(), protocol);
+    public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics 
smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+        super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup);
         this.smtpMetrics = smtpMetrics;
     }
 
diff --git 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 65d8904..83f1534 100644
--- 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -219,7 +219,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer 
implements SMTPServe
             }
             
         };
-        coreHandler = new SMTPChannelUpstreamHandler(transport, 
getEncryption(), smtpMetrics);
+        coreHandler = new SMTPChannelUpstreamHandler(transport, 
getEncryption(), smtpMetrics, getExecutorGroup());
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to