This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 6243bf7d4e JAMES-3977 Test for backpressure
6243bf7d4e is described below
commit 6243bf7d4e01d1102eb3728ba3c5c3486b3262d9
Author: Benoit TELLIER <[email protected]>
AuthorDate: Mon Mar 18 12:55:10 2024 +0100
JAMES-3977 Test for backpressure
- Rely on mock to enforce that backpressure is applied
- Ensure that when read FETCH command proceed
---
.../james/imapserver/netty/IMAPServerTest.java | 47 ++++++++++++++++++++--
1 file changed, 43 insertions(+), 4 deletions(-)
diff --git
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 2a181282aa..7222755b97 100644
---
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -29,6 +29,11 @@ import static
org.apache.james.mailbox.MessageManager.MailboxMetaData.RecentMode
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import java.io.EOFException;
import java.io.FileNotFoundException;
@@ -44,6 +49,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.IntStream;
@@ -107,6 +113,8 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
@@ -127,6 +135,7 @@ import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.ssl.SslContextBuilder;
import nl.altindag.ssl.exception.GenericKeyStoreException;
import nl.altindag.ssl.pem.exception.PrivateKeyParseException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
@@ -145,6 +154,7 @@ class IMAPServerTest {
@RegisterExtension
public TestIMAPClient testIMAPClient = new TestIMAPClient();
+ private InMemoryMailboxManager mailboxManager;
private IMAPServer
createImapServer(HierarchicalConfiguration<ImmutableNode> config,
InMemoryIntegrationResources
inMemoryIntegrationResources) throws Exception {
@@ -152,15 +162,16 @@ class IMAPServerTest {
RecordingMetricFactory metricFactory = new RecordingMetricFactory();
Set<ConnectionCheck> connectionChecks = defaultConnectionChecks();
+ mailboxManager = spy(memoryIntegrationResources.getMailboxManager());
IMAPServer imapServer = new IMAPServer(
DefaultImapDecoderFactory.createDecoder(),
new DefaultImapEncoderFactory().buildImapEncoder(),
DefaultImapProcessorFactory.createXListSupportingProcessor(
- memoryIntegrationResources.getMailboxManager(),
+ mailboxManager,
memoryIntegrationResources.getEventBus(),
- new
StoreSubscriptionManager(memoryIntegrationResources.getMailboxManager().getMapperFactory(),
-
memoryIntegrationResources.getMailboxManager().getMapperFactory(),
-
memoryIntegrationResources.getMailboxManager().getEventBus()),
+ new StoreSubscriptionManager(mailboxManager.getMapperFactory(),
+ mailboxManager.getMapperFactory(),
+ mailboxManager.getEventBus()),
null,
memoryIntegrationResources.getQuotaManager(),
memoryIntegrationResources.getQuotaRootResolver(),
@@ -2501,6 +2512,34 @@ class IMAPServerTest {
// Then the FETCH
readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH
completed."));
}
+
+ @Test
+ void fetchShouldBackPressureWhenNoRead() throws Exception {
+ String msgIn = "MIME-Version: 1.0\r\n\r\nCONTENT\r\n\r\n" +
"0123456789\r\n0123456789\r\n0123456789\r\n".repeat(1024);
+ IntStream.range(0, 500)
+ .forEach(Throwing.intConsumer(i ->
inbox.appendMessage(MessageManager.AppendCommand.builder()
+ .build(msgIn), mailboxSession)));
+ AtomicInteger loaded = new AtomicInteger(0);
+ MessageManager inboxSpy = spy(inbox);
+
doReturn(Mono.just(inboxSpy)).when(mailboxManager).getMailboxReactive(eq(MailboxPath.inbox(USER)),
any());
+
doReturn(Mono.just(inboxSpy)).when(mailboxManager).getMailboxReactive(eq(inbox.getMailboxEntity().getMailboxId()),
any());
+ doAnswer((Answer<Object>) invocationOnMock ->
Flux.from(inbox.getMessagesReactive(invocationOnMock.getArgument(0),
invocationOnMock.getArgument(1), invocationOnMock.getArgument(2)))
+ .doOnNext(any ->
loaded.incrementAndGet())).when(inboxSpy).getMessagesReactive(any(), any(),
any());
+
+ clientConnection.write(ByteBuffer.wrap(String.format("a0 LOGIN %s
%s\r\n", USER.asString(), USER_PASS).getBytes(StandardCharsets.UTF_8)));
+ readBytes(clientConnection);
+
+ clientConnection.write(ByteBuffer.wrap(("A1 SELECT
INBOX\r\n").getBytes(StandardCharsets.UTF_8)));
+ // Select completes first
+ readStringUntil(clientConnection, s -> s.contains("A1 OK
[READ-WRITE] SELECT completed."));
+ clientConnection.write(ByteBuffer.wrap(("A2 UID FETCH 1:500
(BODY[])\r\n").getBytes(StandardCharsets.UTF_8)));
+
+ Thread.sleep(1000);
+
+ assertThat(loaded.get()).isLessThan(500);
+ readStringUntil(clientConnection, s -> s.contains("A2 OK FETCH
completed."));
+ assertThat(loaded.get()).isEqualTo(500);
+ }
}
private byte[] readBytes(SocketChannel channel) throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]