This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new c8c8b83b5b JAMES-4140 Implement adaptative, session-scoped, throttling
for IMAP … (#2766)
c8c8b83b5b is described below
commit c8c8b83b5b9dd66dee1b54d29d1fb4318b8a6d27
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Jul 25 10:06:11 2025 +0200
JAMES-4140 Implement adaptative, session-scoped, throttling for IMAP …
(#2766)
---
docs/modules/servers/partials/configure/imap.adoc | 42 +++++-
.../imapserver/netty/IMAPCommandsThrottler.java | 155 +++++++++++++++++++++
.../apache/james/imapserver/netty/IMAPServer.java | 8 ++
.../netty/IMAPCommandsThrottlerTest.java | 85 +++++++++++
.../src/test/resources/commandsThrottling.xml | 14 ++
5 files changed, 302 insertions(+), 2 deletions(-)
diff --git a/docs/modules/servers/partials/configure/imap.adoc
b/docs/modules/servers/partials/configure/imap.adoc
index 0d3084c121..af6911fe43 100644
--- a/docs/modules/servers/partials/configure/imap.adoc
+++ b/docs/modules/servers/partials/configure/imap.adoc
@@ -161,11 +161,11 @@ It uses the Keycloak OIDC provider, but usage of similar
technologies is definit
== Traffic Shaping
-James ships optional
link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty
built in Traffic Shaping] that can be optionally configured.
+James ships an optional
link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty
built in Traffic Shaping] that can be optionally configured.
This enables both:
- Record per channel bandwidth consumption
- - Allows defining per channel bandwidth limit, which helps at fairness and
maintaining a good quality of service.
+ - Allows defining per channel bandwidth limit, which helps at fairness and
maintaining a good quality of service in terms of incoming/outgoing bandwidth.
Example:
@@ -185,6 +185,44 @@ Those tags maps to the corresponding Netty argument.
If omitted no traffic handle is added to the channel pipeline.
+== IMAP command throttler
+
+James ships an optional IMAP command throttler aimed at slowing down
lower-quality clients that generate a high
+volume of requests. It allows per command granularity and is applied at the
scope of an IMAP session.
+
+The user can declare the list of commands on which throttling needs to be
tracked and for each:
+
+ - `thresholdCount`: below this number of occurrence, no throttling is
applied. Integer.
+ - `additionalDelayPerOperation`: delay to be applied when exceeding the
threshold. The delay is cumulative and thus
+ would always increase. Duration.
+ - `observationPeriod`: the count of observed commands is reset after this
period thus stopping delays. Duration.
+ - `maxDelay`: maximum value the client will be delayed for.
+
+Sample configuration:
+
+....
+<imapserver>
+ <!-- ... -->
+ <perSessionCommandThrottling>
+ <select>
+ <thresholdCount>25</thresholdCount>
+ <additionalDelayPerOperation>2ms</additionalDelayPerOperation>
+ <observationPeriod>10m</observationPeriod>
+ <maxDelay>1s</maxDelay>
+ </select>
+ <append>
+ <thresholdCount>5</thresholdCount>
+ <additionalDelayPerOperation>10ms</additionalDelayPerOperation>
+ <observationPeriod>5m</observationPeriod>
+ <maxDelay>2s</maxDelay>
+ </append>
+ </perSessionCommandThrottling>
+</imapserver>
+....
+
+Note that commands are delayed prior the execution and thus are not subject to
the IMAP upper concurrency limit until
+they are executed.
+
== Extending IMAP
IMAP decoders, processors and encoder can be customized.
xref:customization:imap.adoc[Read more].
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
new file mode 100644
index 0000000000..53e2074f00
--- /dev/null
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
@@ -0,0 +1,155 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static
org.apache.james.imapserver.netty.NettyConstants.IMAP_SESSION_ATTRIBUTE_KEY;
+
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration2.HierarchicalConfiguration;
+import org.apache.commons.configuration2.ImmutableHierarchicalConfiguration;
+import org.apache.commons.configuration2.tree.ImmutableNode;
+import org.apache.james.imap.api.message.request.ImapRequest;
+import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.processor.IdProcessor;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.MDCStructuredLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import it.unimi.dsi.fastutil.Pair;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class IMAPCommandsThrottler extends ChannelInboundHandlerAdapter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IMAPCommandsThrottler.class);
+
+ public record ThrottlerConfigurationEntry(
+ int thresholdCount,
+ Duration additionalDelayPerOperation,
+ Duration observationPeriod,
+ Duration maxDelay) {
+
+ public static ThrottlerConfigurationEntry
from(ImmutableHierarchicalConfiguration configuration) {
+ return new ThrottlerConfigurationEntry(
+ Optional.ofNullable(configuration.getString("thresholdCount",
null))
+ .map(Integer::parseInt)
+ .orElseThrow(() -> new
IllegalArgumentException("thresholdCount in compulsory for
ThrottlerConfigurationEntry")),
+
Optional.ofNullable(configuration.getString("additionalDelayPerOperation",
null))
+ .map(DurationParser::parse)
+ .orElseThrow(() -> new
IllegalArgumentException("additionalDelayPerOperation in compulsory for
ThrottlerConfigurationEntry")),
+
Optional.ofNullable(configuration.getString("observationPeriod", null))
+ .map(DurationParser::parse)
+ .orElseThrow(() -> new
IllegalArgumentException("observationPeriod in compulsory for
ThrottlerConfigurationEntry")),
+ Optional.ofNullable(configuration.getString("maxDelay", null))
+ .map(DurationParser::parse)
+ .orElseThrow(() -> new IllegalArgumentException("maxDelay
in compulsory for ThrottlerConfigurationEntry")));
+ }
+
+ long delayMSFor(long occurrenceCount) {
+ if (occurrenceCount < thresholdCount) {
+ return 0;
+ }
+
+ return Math.min(maxDelay.toMillis(), occurrenceCount *
additionalDelayPerOperation.toMillis());
+ }
+ }
+
+ public record ThrottlerConfiguration(Map<String,
ThrottlerConfigurationEntry> entryMap) {
+ public static ThrottlerConfiguration
from(HierarchicalConfiguration<ImmutableNode> configuration) {
+ return new ThrottlerConfiguration(configuration.getNodeModel()
+ .getNodeHandler()
+ .getRootNode()
+ .getChildren()
+ .stream()
+ .map(key -> Pair.of(key.getNodeName().toUpperCase(Locale.US),
ThrottlerConfigurationEntry.from(configuration.immutableConfigurationAt(key.getNodeName()))))
+ .collect(ImmutableMap.toImmutableMap(Pair::key, Pair::value)));
+ }
+ }
+
+ private final ThrottlerConfiguration configuration;
+
+ public IMAPCommandsThrottler(ThrottlerConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof ImapRequest imapRequest) {
+ String key =
imapRequest.getCommand().getName().toUpperCase(Locale.US);
+ Optional.ofNullable(configuration.entryMap().get(key))
+ .ifPresentOrElse(configurationEntry -> throttle(ctx, msg,
imapRequest, configurationEntry),
+ () -> ctx.fireChannelRead(msg));
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ private static void throttle(ChannelHandlerContext ctx, Object msg,
ImapRequest imapRequest, ThrottlerConfigurationEntry configurationEntry) {
+ ImapSession session = (ImapSession)
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY);
+
+ AtomicLong atomicLong = retrieveAssociatedCounter(imapRequest,
session, configurationEntry);
+ Duration delay =
Duration.ofMillis(configurationEntry.delayMSFor(atomicLong.getAndIncrement()));
+
+ if (delay.isPositive()) {
+ logDelay(imapRequest, session, delay);
+
+ Mono.delay(delay)
+ .then(Mono.fromRunnable(() -> ctx.fireChannelRead(msg)))
+ .subscribeOn(Schedulers.parallel())
+ .subscribe();
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ private static AtomicLong retrieveAssociatedCounter(ImapRequest
imapRequest, ImapSession session, ThrottlerConfigurationEntry entry) {
+ String key = "imap-applicative-traffic-shaper-counter-" +
imapRequest.getCommand().getName();
+ return Optional.ofNullable(session.getAttribute(key))
+ .filter(AtomicLong.class::isInstance)
+ .map(AtomicLong.class::cast)
+ .orElseGet(() -> {
+ AtomicLong res = new AtomicLong(0);
+ session.setAttribute(key, res);
+ session.schedule(() -> session.setAttribute(key, new
AtomicLong(0)), entry.observationPeriod());
+ return res;
+ });
+ }
+
+ private static void logDelay(ImapRequest imapRequest, ImapSession session,
Duration delay) {
+ MDCStructuredLogger.forLogger(LOGGER)
+ .field("username", session.getUserName().asString())
+ .field("userAgent",
Optional.ofNullable(session.getAttribute(IdProcessor.USER_AGENT))
+ .filter(String.class::isInstance)
+ .map(String.class::cast)
+ .orElse(""))
+ .log(logger -> logger.info("Delayed command {} on an IMAP session.
Delay {} ms",
+ imapRequest.getCommand().getName(),
+ delay.toMillis()));
+ }
+}
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 115ea2dd08..b66129f04c 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -177,6 +177,7 @@ public class IMAPServer extends
AbstractConfigurableAsyncServer implements ImapC
private Optional<TrafficShapingConfiguration> trafficShaping =
Optional.empty();
private Optional<ConnectionLimitUpstreamHandler>
connectionLimitUpstreamHandler = Optional.empty();
private Optional<ConnectionPerIpLimitUpstreamHandler>
connectionPerIpLimitUpstreamHandler = Optional.empty();
+ private Optional<IMAPCommandsThrottler.ThrottlerConfiguration>
throttlerConfiguration = Optional.empty();
private boolean ignoreIDLEUponProcessing;
private Duration heartbeatInterval;
private ReactiveThrottler reactiveThrottler;
@@ -222,6 +223,10 @@ public class IMAPServer extends
AbstractConfigurableAsyncServer implements ImapC
trafficShaping =
Optional.ofNullable(configuration.configurationAt("trafficShaping"))
.map(TrafficShapingConfiguration::from);
}
+ if (configuration.getKeys("perSessionCommandThrottling").hasNext()) {
+ throttlerConfiguration =
Optional.ofNullable(configuration.configurationAt("perSessionCommandThrottling"))
+ .map(IMAPCommandsThrottler.ThrottlerConfiguration::from);
+ }
}
@Override
@@ -337,6 +342,9 @@ public class IMAPServer extends
AbstractConfigurableAsyncServer implements ImapC
pipeline.addLast(REQUEST_DECODER, new
ImapRequestFrameDecoder(decoder, inMemorySizeLimit,
literalSizeLimit, maxLineLength));
+ throttlerConfiguration.map(IMAPCommandsThrottler::new)
+ .ifPresent(handler -> pipeline.addLast("commandThrottler",
handler));
+
pipeline.addLast(CORE_HANDLER, createCoreHandler());
}
diff --git
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
new file mode 100644
index 0000000000..32fc769b1b
--- /dev/null
+++
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+
+import org.apache.commons.configuration2.HierarchicalConfiguration;
+import org.apache.commons.configuration2.tree.ImmutableNode;
+import
org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfiguration;
+import
org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfigurationEntry;
+import org.apache.james.protocols.lib.mock.ConfigLoader;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+class IMAPCommandsThrottlerTest {
+ @Nested
+ class ConfigTest {
+ @Test
+ void shouldLoad() throws Exception {
+ HierarchicalConfiguration<ImmutableNode> config =
ConfigLoader.getConfig(ClassLoaderUtils.getSystemResourceAsSharedStream("commandsThrottling.xml"));
+
+ var selectEntry = new ThrottlerConfigurationEntry(25,
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+ var appendEntry = new ThrottlerConfigurationEntry(5,
Duration.ofMillis(10), Duration.ofMinutes(5), Duration.ofSeconds(2));
+
+ assertThat(ThrottlerConfiguration.from(config))
+ .isEqualTo(new ThrottlerConfiguration(
+ ImmutableMap.of(
+ "SELECT", selectEntry,
+ "APPEND", appendEntry)));
+ }
+ }
+
+ @Nested
+ class DelayTest {
+ @Test
+ void shouldNotDelayWhenBelowThreshold() {
+ var selectEntry = new ThrottlerConfigurationEntry(25,
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+ assertThat(selectEntry.delayMSFor(24)).isZero();
+ }
+
+ @Test
+ void shouldDelayWhenThreshold() {
+ var selectEntry = new ThrottlerConfigurationEntry(25,
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+ assertThat(selectEntry.delayMSFor(25)).isEqualTo(50);
+ }
+
+ @Test
+ void shouldAdditionalDelayWhenAboveThreshold() {
+ var selectEntry = new ThrottlerConfigurationEntry(25,
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+ assertThat(selectEntry.delayMSFor(26)).isEqualTo(52);
+ }
+
+ @Test
+ void shouldNotExceedMaximumDelay() {
+ var selectEntry = new ThrottlerConfigurationEntry(25,
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+ assertThat(selectEntry.delayMSFor(2600)).isEqualTo(1000);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml
b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml
new file mode 100644
index 0000000000..dc4f57afb4
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml
@@ -0,0 +1,14 @@
+<perSessionCommandThrottling>
+ <select>
+ <thresholdCount>25</thresholdCount>
+ <additionalDelayPerOperation>2ms</additionalDelayPerOperation>
+ <observationPeriod>10m</observationPeriod>
+ <maxDelay>1s</maxDelay>
+ </select>
+ <append>
+ <thresholdCount>5</thresholdCount>
+ <additionalDelayPerOperation>10ms</additionalDelayPerOperation>
+ <observationPeriod>5m</observationPeriod>
+ <maxDelay>2s</maxDelay>
+ </append>
+</perSessionCommandThrottling>
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]