This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 92dcc05469b17f014fc72711ec0c3269b4160b7b Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Mon Mar 7 09:28:31 2022 +0700 JAMES-3715 Execute core handlers outside of the event loop WIP SMTP pipelining is currently brokem --- .../protocols/netty/AbstractChannelPipelineFactory.java | 13 ++++++++----- .../netty/AbstractSSLAwareChannelPipelineFactory.java | 10 ++++++---- .../protocols/netty/BasicChannelUpstreamHandler.java | 11 +++++++---- .../james/protocols/netty/NettyProtocolTransport.java | 7 +++++-- .../org/apache/james/protocols/netty/NettyServer.java | 8 +++++--- .../org/apache/james/imapserver/netty/IMAPServer.java | 10 +++++----- .../lib/netty/AbstractConfigurableAsyncServer.java | 16 ++++++++++++---- .../AbstractExecutorAwareChannelPipelineFactory.java | 5 +++-- .../org/apache/james/lmtpserver/netty/LMTPServer.java | 2 +- .../james/managesieveserver/netty/ManageSieveServer.java | 14 +++++++------- .../org/apache/james/pop3server/netty/POP3Server.java | 2 +- .../smtpserver/netty/SMTPChannelUpstreamHandler.java | 9 +++++---- .../org/apache/james/smtpserver/netty/SMTPServer.java | 2 +- 13 files changed, 66 insertions(+), 43 deletions(-) diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java index 32049af..db4a1bd 100644 --- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java +++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.concurrent.EventExecutorGroup; /** * Abstract base class for {@link ChannelInitializer} implementations @@ -38,19 +39,21 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex private final ChannelGroupHandler groupHandler; private final int timeout; private final ChannelHandlerFactory frameHandlerFactory; + private final EventExecutorGroup eventExecutorGroup; public AbstractChannelPipelineFactory(ChannelGroup channels, - ChannelHandlerFactory frameHandlerFactory) { - this(0, 0, 0, channels, frameHandlerFactory); + ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) { + this(0, 0, 0, channels, frameHandlerFactory, eventExecutorGroup); } public AbstractChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup channels, - ChannelHandlerFactory frameHandlerFactory) { + ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) { this.connectionLimitHandler = new ConnectionLimitUpstreamHandler(maxConnections); this.connectionPerIpLimitHandler = new ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp); this.groupHandler = new ChannelGroupHandler(channels); this.timeout = timeout; this.frameHandlerFactory = frameHandlerFactory; + this.eventExecutorGroup = eventExecutorGroup; } @@ -66,13 +69,13 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex // Add the text line decoder which limit the max line length, don't strip the delimiter and use CRLF as delimiter - pipeline.addLast(HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline)); + pipeline.addLast(eventExecutorGroup, HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline)); // Add the ChunkedWriteHandler to be able to write ChunkInput pipeline.addLast(HandlerConstants.CHUNK_HANDLER, new ChunkedWriteHandler()); pipeline.addLast(HandlerConstants.TIMEOUT_HANDLER, new TimeoutHandler(timeout)); - pipeline.addLast(HandlerConstants.CORE_HANDLER, createHandler()); + pipeline.addLast(eventExecutorGroup, HandlerConstants.CORE_HANDLER, createHandler()); } diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java index e509ee0..f3399c9 100644 --- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java +++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java @@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.ChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.EventExecutorGroup; /** @@ -39,14 +40,15 @@ public abstract class AbstractSSLAwareChannelPipelineFactory<C extends SocketCha public AbstractSSLAwareChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup group, - ChannelHandlerFactory frameHandlerFactory) { - super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory); + ChannelHandlerFactory frameHandlerFactory, + EventExecutorGroup eventExecutorGroup) { + super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup); } public AbstractSSLAwareChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup group, Encryption secure, - ChannelHandlerFactory frameHandlerFactory) { - this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory); + ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) { + this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup); this.secure = secure; } diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java index 08086a6..1cde815 100644 --- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java +++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java @@ -51,6 +51,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.TooLongFrameException; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutorGroup; /** @@ -67,16 +68,18 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter { protected final Protocol protocol; protected final ProtocolHandlerChain chain; protected final Encryption secure; + private final EventExecutorGroup eventExecutors; - public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) { - this(mdcContextFactory, protocol, null); + public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) { + this(mdcContextFactory, protocol, null, eventExecutors); } - public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) { + public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup eventExecutors) { this.mdcContextFactory = mdcContextFactory; this.protocol = protocol; this.chain = protocol.getProtocolChain(); this.secure = secure; + this.eventExecutors = eventExecutors; } @@ -197,7 +200,7 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter { engine = secure.createSSLEngine(); } - return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine)); + return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine, eventExecutors)); } @Override diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java index 36d5ab8..bdeee67 100644 --- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java +++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java @@ -38,6 +38,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.DefaultFileRegion; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedStream; +import io.netty.util.concurrent.EventExecutorGroup; /** @@ -47,11 +48,13 @@ public class NettyProtocolTransport extends AbstractProtocolTransport { private final Channel channel; private final SSLEngine engine; + private final EventExecutorGroup eventExecutors; private int lineHandlerCount = 0; - public NettyProtocolTransport(Channel channel, SSLEngine engine) { + public NettyProtocolTransport(Channel channel, SSLEngine engine, EventExecutorGroup eventExecutors) { this.channel = channel; this.engine = engine; + this.eventExecutors = eventExecutors; } @Override @@ -156,7 +159,7 @@ public class NettyProtocolTransport extends AbstractProtocolTransport { // it is executed with the same ExecutorHandler as the coreHandler (if one exist) // // See JAMES-1277 - channel.pipeline().addBefore(HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler)); + channel.pipeline().addBefore(eventExecutors, HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler)); } diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java index 35da9a9..e50a498 100644 --- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java +++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java @@ -28,8 +28,9 @@ import org.apache.james.protocols.api.Protocol; import com.google.common.base.Preconditions; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.group.ChannelGroup; - +import io.netty.util.concurrent.DefaultEventExecutorGroup; /** @@ -104,7 +105,7 @@ public class NettyServer extends AbstractAsyncServer { } protected ChannelInboundHandlerAdapter createCoreHandler() { - return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure); + return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure, new DefaultEventExecutorGroup(2)); } @Override @@ -126,7 +127,8 @@ public class NettyServer extends AbstractAsyncServer { maxCurConnectionsPerIP, group, secure, - getFrameHandlerFactory()) { + getFrameHandlerFactory(), + new DefaultEventLoopGroup(16)) { @Override protected ChannelInboundHandlerAdapter createHandler() { diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java index 7896835..649f4a8 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java @@ -211,7 +211,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC @Override protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) { - return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory()) { + return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory(), getExecutorGroup()) { @Override protected ChannelInboundHandlerAdapter createHandler() { @@ -235,7 +235,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC // Add the text line decoder which limit the max line length, // don't strip the delimiter and use CRLF as delimiter // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436 - pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline)); + pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline)); Encryption secure = getEncryption(); if (secure != null && !secure.isStartTLS()) { @@ -248,11 +248,11 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC } pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler()); - pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); + pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); - pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit)); + pipeline.addLast(getExecutorGroup(), REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit)); - pipeline.addLast(CORE_HANDLER, createHandler()); + pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createCoreHandler()); } }; diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java index 1ccbd84..1da9ce6 100644 --- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java +++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java @@ -48,6 +48,7 @@ import org.apache.james.protocols.lib.jmx.ServerMBean; import org.apache.james.protocols.netty.AbstractAsyncServer; import org.apache.james.protocols.netty.AbstractChannelPipelineFactory; import org.apache.james.protocols.netty.ChannelHandlerFactory; +import org.apache.james.util.concurrent.NamedThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,8 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.group.ChannelGroup; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; import nl.altindag.ssl.SSLFactory; import nl.altindag.ssl.util.PemUtils; @@ -118,7 +121,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe private ChannelHandlerFactory frameHandlerFactory; - private int maxExecutorThreads; + private EventExecutorGroup executorGroup; private MBeanServer mbeanServer; @@ -189,8 +192,8 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe int ioWorker = config.getInt("ioWorkerCount", DEFAULT_IO_WORKER_COUNT); setIoWorkerCount(ioWorker); - maxExecutorThreads = config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT); - + executorGroup = new DefaultEventExecutorGroup(config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT), + NamedThreadFactory.withName(jmxName)); configureHelloName(config); @@ -266,6 +269,10 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe } + protected EventExecutorGroup getExecutorGroup() { + return executorGroup; + } + @PostConstruct public final void init() throws Exception { @@ -308,6 +315,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe if (isEnabled()) { unbind(); postDestroy(); + executorGroup.shutdownGracefully(); unregisterMBean(); } @@ -550,7 +558,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe @Override protected AbstractChannelPipelineFactory createPipelineFactory(ChannelGroup group) { return new AbstractExecutorAwareChannelPipelineFactory(getTimeout(), connectionLimit, connPerIP, group, - getEncryption(), getFrameHandlerFactory()) { + getEncryption(), getFrameHandlerFactory(), getExecutorGroup()) { @Override protected ChannelInboundHandlerAdapter createHandler() { diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java index a4271eb..4890f3c 100644 --- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java +++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java @@ -24,6 +24,7 @@ import org.apache.james.protocols.netty.ChannelHandlerFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.group.ChannelGroup; +import io.netty.util.concurrent.EventExecutorGroup; /** * Abstract base class which should get used if you MAY need an {@link ExecutionHandler} @@ -35,8 +36,8 @@ public abstract class AbstractExecutorAwareChannelPipelineFactory extends Abstra public AbstractExecutorAwareChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup group, Encryption encryption, - ChannelHandlerFactory frameHandlerFactory) { - super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory); + ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) { + super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory, eventExecutorGroup); } /** diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java index 7899a7a..0596813 100644 --- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java +++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java @@ -142,7 +142,7 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe @Override protected ChannelInboundHandlerAdapter createCoreHandler() { SMTPProtocol protocol = new SMTPProtocol(getProtocolHandlerChain(), lmtpConfig); - return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics); + return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics, getExecutorGroup()); } @Override diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java index e9f35fa..294ddc7 100644 --- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java +++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java @@ -86,7 +86,7 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement @Override protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) { - return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory()) { + return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory(), getExecutorGroup()) { @Override protected ChannelInboundHandlerAdapter createHandler() { @@ -113,13 +113,13 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement // Add the text line decoder which limit the max line length, // don't strip the delimiter and use CRLF as delimiter // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436 - pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline)); - pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler()); - pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); + pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline)); + pipeline.addLast(getExecutorGroup(), CONNECTION_COUNT_HANDLER, getConnectionCountHandler()); + pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); - pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); - pipeline.addLast(CORE_HANDLER, createHandler()); - pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); + pipeline.addLast(getExecutorGroup(), "stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); + pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createHandler()); + pipeline.addLast(getExecutorGroup(), "stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); } }; diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java index 242874a..f05ec6c 100644 --- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java +++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java @@ -77,7 +77,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve protected void preInit() throws Exception { super.preInit(); POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData); - coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption()); + coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), getExecutorGroup()); } @Override diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java index 8152716..5379db5 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java @@ -29,6 +29,7 @@ import org.apache.james.smtpserver.SMTPConstants; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.EventExecutorGroup; /** * {@link BasicChannelUpstreamHandler} which is used by the SMTPServer @@ -38,13 +39,13 @@ public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler { private final SmtpMetrics smtpMetrics; - public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics) { - super(new SMTPMDCContextFactory(), protocol, encryption); + public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) { + super(new SMTPMDCContextFactory(), protocol, encryption, eventExecutorGroup); this.smtpMetrics = smtpMetrics; } - public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics) { - super(new SMTPMDCContextFactory(), protocol); + public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) { + super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup); this.smtpMetrics = smtpMetrics; } diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java index 65d8904..83f1534 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java @@ -219,7 +219,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe } }; - coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics); + coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup()); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org