MAILBOX-372 handle retrying for event group consumers
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1f390f4f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1f390f4f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1f390f4f Branch: refs/heads/master Commit: 1f390f4f3f69db1c073b5e018b25cdf52acb3647 Parents: bd656cf Author: tran tien duc <dt...@linagora.com> Authored: Tue Jan 15 14:58:45 2019 +0700 Committer: tran tien duc <dt...@linagora.com> Committed: Wed Jan 16 14:21:28 2019 +0700 ---------------------------------------------------------------------- .../james/mailbox/events/RetryBackoff.java | 110 --------------- .../events/RetryBackoffConfiguration.java | 125 +++++++++++++++++ .../mailbox/events/ErrorHandlingContract.java | 45 ++++-- .../mailbox/events/EventBusTestFixture.java | 4 +- .../events/RetryBackoffConfigurationTest.java | 140 +++++++++++++++++++ .../mailbox/events/GroupConsumerRetry.java | 133 ++++++++++++++++++ .../james/mailbox/events/GroupRegistration.java | 62 +++++--- .../events/GroupRegistrationHandler.java | 5 +- .../james/mailbox/events/RabbitMQEventBus.java | 8 +- .../mailbox/events/WaitDelayGenerator.java | 18 ++- .../mailbox/events/RabbitMQEventBusTest.java | 26 +++- .../mailbox/events/WaitDelayGeneratorTest.java | 49 ++++++- 12 files changed, 565 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java deleted file mode 100644 index 2fa15ad..0000000 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoff.java +++ /dev/null @@ -1,110 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.events; - -import java.time.Duration; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; - -class RetryBackoff { - - @FunctionalInterface - interface RequireMaxRetries { - RequireFirstBackoff maxRetries(int maxRetries); - } - - @FunctionalInterface - interface RequireFirstBackoff { - RequireJitterFactor firstBackoff(Duration firstBackoff); - } - - @FunctionalInterface - interface RequireJitterFactor { - ReadyToBuild jitterFactor(double jitterFactor); - } - - static class ReadyToBuild { - private final int maxRetries; - private final Duration firstBackoff; - private final double jitterFactor; - - private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) { - this.maxRetries = maxRetries; - this.firstBackoff = firstBackoff; - this.jitterFactor = jitterFactor; - } - - RetryBackoff build() { - return new RetryBackoff(maxRetries, firstBackoff, jitterFactor); - } - } - - static RequireMaxRetries builder() { - return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor); - } - - static RetryBackoff defaultRetryBackoff() { - return builder() - .maxRetries(DEFAULT_MAX_RETRIES) - .firstBackoff(DEFAULT_FIRST_BACKOFF) - .jitterFactor(DEFAULT_JITTER_FACTOR) - .build(); - } - - private static final double DEFAULT_JITTER_FACTOR = 0.5; - private static final int DEFAULT_MAX_RETRIES = 3; - private static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100); - - private final int maxRetries; - private final Duration firstBackoff; - private final double jitterFactor; - - RetryBackoff(int maxRetries, Duration firstBackoff, double jitterFactor) { - Preconditions.checkArgument(!firstBackoff.isNegative() && !firstBackoff.isZero(), "firstBackoff has to be strictly positive"); - Preconditions.checkArgument(maxRetries > 0, "maxRetries has to be strictly positive"); - Preconditions.checkArgument(jitterFactor > 0, "jitterFactor has to be strictly positive"); - - this.maxRetries = maxRetries; - this.firstBackoff = firstBackoff; - this.jitterFactor = jitterFactor; - } - - public int getMaxRetries() { - return maxRetries; - } - - public Duration getFirstBackoff() { - return firstBackoff; - } - - public double getJitterFactor() { - return jitterFactor; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("maxRetries", maxRetries) - .add("firstBackoff", firstBackoff) - .add("jitterFactor", jitterFactor) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java new file mode 100644 index 0000000..ef16efb --- /dev/null +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java @@ -0,0 +1,125 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import java.time.Duration; +import java.util.Objects; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; + +class RetryBackoffConfiguration { + + @FunctionalInterface + interface RequireMaxRetries { + RequireFirstBackoff maxRetries(int maxRetries); + } + + @FunctionalInterface + interface RequireFirstBackoff { + RequireJitterFactor firstBackoff(Duration firstBackoff); + } + + @FunctionalInterface + interface RequireJitterFactor { + ReadyToBuild jitterFactor(double jitterFactor); + } + + static class ReadyToBuild { + private final int maxRetries; + private final Duration firstBackoff; + private final double jitterFactor; + + private ReadyToBuild(int maxRetries, Duration firstBackoff, double jitterFactor) { + this.maxRetries = maxRetries; + this.firstBackoff = firstBackoff; + this.jitterFactor = jitterFactor; + } + + RetryBackoffConfiguration build() { + return new RetryBackoffConfiguration(maxRetries, firstBackoff, jitterFactor); + } + } + + static RequireMaxRetries builder() { + return maxRetries -> firstBackoff -> jitterFactor -> new ReadyToBuild(maxRetries, firstBackoff, jitterFactor); + } + + static final double DEFAULT_JITTER_FACTOR = 0.5; + static final int DEFAULT_MAX_RETRIES = 3; + static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100); + public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration( + DEFAULT_MAX_RETRIES, + DEFAULT_FIRST_BACKOFF, + DEFAULT_JITTER_FACTOR); + + private final int maxRetries; + private final Duration firstBackoff; + private final double jitterFactor; + + private RetryBackoffConfiguration(int maxRetries, Duration firstBackoff, double jitterFactor) { + Preconditions.checkArgument(!firstBackoff.isNegative(), "firstBackoff is not allowed to be negative"); + Preconditions.checkArgument(maxRetries >= 0, "maxRetries is not allowed to be negative"); + Preconditions.checkArgument(jitterFactor >= 0 && jitterFactor <= 1.0, "jitterFactor is not " + + "allowed to be negative or greater than 1"); + + this.maxRetries = maxRetries; + this.firstBackoff = firstBackoff; + this.jitterFactor = jitterFactor; + } + + public int getMaxRetries() { + return maxRetries; + } + + public Duration getFirstBackoff() { + return firstBackoff; + } + + public double getJitterFactor() { + return jitterFactor; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof RetryBackoffConfiguration) { + RetryBackoffConfiguration that = (RetryBackoffConfiguration) o; + + return Objects.equals(this.maxRetries, that.maxRetries) + && Objects.equals(this.jitterFactor, that.jitterFactor) + && Objects.equals(this.firstBackoff, that.firstBackoff); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(maxRetries, firstBackoff, jitterFactor); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxRetries", maxRetries) + .add("firstBackoff", firstBackoff) + .add("jitterFactor", jitterFactor) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java index 4b17a6f..3f0cdf7 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.events; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; +import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -86,8 +87,8 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(eventCollector, new EventBusTestFixture.GroupA()); eventBus().dispatch(EVENT, NO_KEYS).block(); - assertThat(eventCollector.getEvents()) - .hasSize(1); + WAIT_CONDITION + .until(() -> assertThat(eventCollector.getEvents()).hasSize(1)); } @Test @@ -103,12 +104,12 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(eventCollector, new EventBusTestFixture.GroupA()); eventBus().dispatch(EVENT, NO_KEYS).block(); - assertThat(eventCollector.getEvents()) - .hasSize(1); + WAIT_CONDITION + .until(() -> assertThat(eventCollector.getEvents()).hasSize(1)); } @Test - default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() { + default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception { EventCollector eventCollector = eventCollector(); doThrow(new RuntimeException()) @@ -121,32 +122,50 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(eventCollector, new EventBusTestFixture.GroupA()); eventBus().dispatch(EVENT, NO_KEYS).block(); + TimeUnit.SECONDS.sleep(1); assertThat(eventCollector.getEvents()) .isEmpty(); } @Test - default void retriesBackOffShouldDelayByExponentialGrowth() { + default void exceedingMaxRetriesShouldStopConsumingFailedEvent() throws Exception { + ThrowingListener throwingListener = throwingListener(); + + eventBus().register(throwingListener, new EventBusTestFixture.GroupA()); + eventBus().dispatch(EVENT, NO_KEYS).block(); + + TimeUnit.SECONDS.sleep(5); + int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size(); + TimeUnit.SECONDS.sleep(5); + + assertThat(throwingListener.timeElapsed.size()) + .isEqualTo(numberOfCallsAfterExceedMaxRetries); + } + + @Test + default void retriesBackOffShouldDelayByExponentialGrowth() throws Exception { ThrowingListener throwingListener = throwingListener(); eventBus().register(throwingListener, new EventBusTestFixture.GroupA()); eventBus().dispatch(EVENT, NO_KEYS).block(); + TimeUnit.SECONDS.sleep(5); SoftAssertions.assertSoftly(softly -> { - softly.assertThat(throwingListener.timeElapsed).hasSize(4); + List<Instant> timeElapsed = throwingListener.timeElapsed; + softly.assertThat(timeElapsed).hasSize(4); long minFirstDelayAfter = 100; // first backOff long minSecondDelayAfter = 100; // 200 * jitter factor (200 * 0.5) long minThirdDelayAfter = 200; // 400 * jitter factor (400 * 0.5) - softly.assertThat(throwingListener.timeElapsed.get(1)) - .isAfterOrEqualTo(throwingListener.timeElapsed.get(0).plusMillis(minFirstDelayAfter)); + softly.assertThat(timeElapsed.get(1)) + .isAfterOrEqualTo(timeElapsed.get(0).plusMillis(minFirstDelayAfter)); - softly.assertThat(throwingListener.timeElapsed.get(2)) - .isAfterOrEqualTo(throwingListener.timeElapsed.get(1).plusMillis(minSecondDelayAfter)); + softly.assertThat(timeElapsed.get(2)) + .isAfterOrEqualTo(timeElapsed.get(1).plusMillis(minSecondDelayAfter)); - softly.assertThat(throwingListener.timeElapsed.get(3)) - .isAfterOrEqualTo(throwingListener.timeElapsed.get(2).plusMillis(minThirdDelayAfter)); + softly.assertThat(timeElapsed.get(3)) + .isAfterOrEqualTo(timeElapsed.get(2).plusMillis(minThirdDelayAfter)); }); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java index e6920c0..85c798b 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.TestId; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.jayway.awaitility.Duration; import com.jayway.awaitility.core.ConditionFactory; public interface EventBusTestFixture { @@ -100,8 +101,7 @@ public interface EventBusTestFixture { List<Class<? extends Group>> ALL_GROUPS = ImmutableList.of(GroupA.class, GroupB.class, GroupC.class); GroupA GROUP_A = new GroupA(); - - ConditionFactory WAIT_CONDITION = await().timeout(com.jayway.awaitility.Duration.ONE_SECOND); + ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS); static MailboxListener newListener() { MailboxListener listener = mock(MailboxListener.class); http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java new file mode 100644 index 0000000..2253cd7 --- /dev/null +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/RetryBackoffConfigurationTest.java @@ -0,0 +1,140 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_FIRST_BACKOFF; +import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_JITTER_FACTOR; +import static org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT_MAX_RETRIES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; + +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class RetryBackoffConfigurationTest { + + @Test + void shouldMatchBeanContract() { + EqualsVerifier.forClass(RetryBackoffConfiguration.class) + .verify(); + } + + @Test + void buildShouldThrowWhenNegativeFirstBackoff() { + assertThatThrownBy(() -> RetryBackoffConfiguration.builder() + .maxRetries(DEFAULT_MAX_RETRIES) + .firstBackoff(Duration.ofMillis(-1000L)) + .jitterFactor(DEFAULT_JITTER_FACTOR) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("firstBackoff is not allowed to be negative"); + } + + @Test + void buildShouldThrowWhenNegativeMaxRetries() { + assertThatThrownBy(() -> RetryBackoffConfiguration.builder() + .maxRetries(-6) + .firstBackoff(DEFAULT_FIRST_BACKOFF) + .jitterFactor(DEFAULT_JITTER_FACTOR) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxRetries is not allowed to be negative"); + } + + @Test + void buildShouldThrowWhenNegativeJitterFactor() { + assertThatThrownBy(() -> RetryBackoffConfiguration.builder() + .maxRetries(DEFAULT_MAX_RETRIES) + .firstBackoff(DEFAULT_FIRST_BACKOFF) + .jitterFactor(-2.5) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("jitterFactor is not allowed to be negative or greater than 1"); + } + + @Test + void buildShouldThrowWhenGreaterThanOneJitterFactor() { + assertThatThrownBy(() -> RetryBackoffConfiguration.builder() + .maxRetries(DEFAULT_MAX_RETRIES) + .firstBackoff(DEFAULT_FIRST_BACKOFF) + .jitterFactor(1.000001) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("jitterFactor is not allowed to be negative or greater than 1"); + } + + @Test + void buildShouldSuccessWhenZeroFirstBackoff() { + RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder() + .maxRetries(DEFAULT_MAX_RETRIES) + .firstBackoff(Duration.ZERO) + .jitterFactor(DEFAULT_JITTER_FACTOR) + .build(); + + assertThat(retryBackoff.getFirstBackoff().toMillis()) + .isEqualTo(0L); + } + + @Test + void buildShouldSuccessWhenZeroMaxRetries() { + RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder() + .maxRetries(0) + .firstBackoff(DEFAULT_FIRST_BACKOFF) + .jitterFactor(DEFAULT_JITTER_FACTOR) + .build(); + + assertThat(retryBackoff.getMaxRetries()) + .isEqualTo(0L); + } + + @Test + void buildShouldSuccessWhenZeroJitterFactor() { + RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder() + .maxRetries(DEFAULT_MAX_RETRIES) + .firstBackoff(DEFAULT_FIRST_BACKOFF) + .jitterFactor(0) + .build(); + + assertThat(retryBackoff.getJitterFactor()) + .isEqualTo(0); + } + @Test + void buildShouldReturnCorrespondingValues() { + RetryBackoffConfiguration retryBackoff = RetryBackoffConfiguration.builder() + .maxRetries(5) + .firstBackoff(Duration.ofMillis(200)) + .jitterFactor(0.6) + .build(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(retryBackoff.getJitterFactor()) + .isEqualTo(0.6); + softly.assertThat(retryBackoff.getMaxRetries()) + .isEqualTo(5); + softly.assertThat(retryBackoff.getFirstBackoff()) + .isEqualTo(Duration.ofMillis(200)); + }); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java new file mode 100644 index 0000000..04a513a --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java @@ -0,0 +1,133 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE; +import static org.apache.james.backend.rabbitmq.Constants.DURABLE; +import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY; +import static org.apache.james.mailbox.events.GroupRegistration.RETRY_COUNT; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; + +import org.apache.james.mailbox.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.rabbitmq.client.AMQP; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.Sender; + +class GroupConsumerRetry { + + static class RetryExchangeName { + + static RetryExchangeName of(Group group) { + return new RetryExchangeName(GroupRegistration.groupName(group.getClass())); + } + + static final String MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX = MAILBOX_EVENT + "-retryExchange-"; + + private final String name; + + private RetryExchangeName(String name) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Exchange name must be specified"); + this.name = name; + } + + String asString() { + return MAILBOX_EVENT_RETRY_EXCHANGE_PREFIX + name; + } + } + + static class RetryPublisher { + + private final Sender sender; + private final RetryExchangeName retryExchangeName; + private final RetryBackoffConfiguration retryBackoff; + + RetryPublisher(Sender sender, RetryExchangeName retryExchangeName, RetryBackoffConfiguration retryBackoff) { + this.sender = sender; + this.retryExchangeName = retryExchangeName; + this.retryBackoff = retryBackoff; + } + + Mono<Void> publish(Event event, byte[] eventAsByte, int currentRetryCount) { + return sender.send(createRetryMessage(eventAsByte, currentRetryCount)) + .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," + + "this event will be lost forever", + event.getUser().asString(), throwable)); + } + + private Mono<OutboundMessage> createRetryMessage(byte[] eventAsByte, int currentRetryCount) { + if (currentRetryCount >= retryBackoff.getMaxRetries()) { + return Mono.empty(); // will store event to deadletter latter + } + + return Mono.just(new OutboundMessage( + retryExchangeName.asString(), + EMPTY_ROUTING_KEY, + new AMQP.BasicProperties.Builder() + .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1)) + .build(), + eventAsByte)); + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class); + + private final Sender sender; + private final GroupRegistration.WorkQueueName queueName; + private final RetryExchangeName retryExchangeName; + private final RetryPublisher retryPublisher; + + GroupConsumerRetry(Sender sender, GroupRegistration.WorkQueueName queueName, Group group, + RetryBackoffConfiguration retryBackoff) { + this.sender = sender; + this.queueName = queueName; + this.retryExchangeName = RetryExchangeName.of(group); + this.retryPublisher = new RetryPublisher(sender, retryExchangeName, retryBackoff); + } + + Mono<Void> createRetryExchange() { + return Flux.concat( + sender.declareExchange(ExchangeSpecification.exchange(retryExchangeName.asString()) + .durable(DURABLE) + .type(DIRECT_EXCHANGE)), + sender.bind(BindingSpecification.binding() + .exchange(retryExchangeName.asString()) + .queue(queueName.asString()) + .routingKey(EMPTY_ROUTING_KEY))) + .then(); + } + + Mono<Void> handleRetry(byte[] eventAsBytes, Event event, int currentRetryCount, Throwable throwable) { + LOGGER.error("Exception happens when handling event {} of user {}", + event.getEventId().getId().toString(), event.getUser().asString(), throwable); + + return retryPublisher.publish(event, eventAsBytes, currentRetryCount); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index 36db6ae..b382dee 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -34,20 +34,18 @@ import java.util.Optional; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.Event; import org.apache.james.mailbox.MailboxListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Delivery; -import play.api.libs.json.JsResult; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.AcknowledgableDelivery; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.RabbitFlux; @@ -56,10 +54,11 @@ import reactor.rabbitmq.ReceiverOptions; import reactor.rabbitmq.Sender; class GroupRegistration implements Registration { + static class WorkQueueName { @VisibleForTesting static WorkQueueName of(Class<? extends Group> clazz) { - return new WorkQueueName(clazz.getName()); + return new WorkQueueName(groupName(clazz)); } static WorkQueueName of(Group group) { @@ -80,7 +79,12 @@ class GroupRegistration implements Registration { } } - private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class); + static String groupName(Class<? extends Group> clazz) { + return clazz.getName(); + } + + static final String RETRY_COUNT = "retry-count"; + static final int DEFAULT_RETRY_COUNT = 0; private final MailboxListener mailboxListener; private final WorkQueueName queueName; @@ -88,10 +92,14 @@ class GroupRegistration implements Registration { private final Runnable unregisterGroup; private final Sender sender; private final EventSerializer eventSerializer; + private final GroupConsumerRetry retryHandler; + private final WaitDelayGenerator delayGenerator; + private final RetryBackoffConfiguration retryBackoff; private Optional<Disposable> receiverSubscriber; GroupRegistration(Mono<Connection> connectionSupplier, Sender sender, EventSerializer eventSerializer, - MailboxListener mailboxListener, Group group, Runnable unregisterGroup) { + MailboxListener mailboxListener, Group group, RetryBackoffConfiguration retryBackoff, + Runnable unregisterGroup) { this.eventSerializer = eventSerializer; this.mailboxListener = mailboxListener; this.queueName = WorkQueueName.of(group); @@ -99,10 +107,14 @@ class GroupRegistration implements Registration { this.receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connectionSupplier)); this.receiverSubscriber = Optional.empty(); this.unregisterGroup = unregisterGroup; + this.retryHandler = new GroupConsumerRetry(sender, queueName, group, retryBackoff); + this.retryBackoff = retryBackoff; + this.delayGenerator = WaitDelayGenerator.of(retryBackoff); } GroupRegistration start() { createGroupWorkQueue() + .then(retryHandler.createRetryExchange()) .doOnSuccess(any -> this.subscribeWorkQueue()) .block(); return this; @@ -123,23 +135,33 @@ class GroupRegistration implements Registration { } private void subscribeWorkQueue() { - receiverSubscriber = Optional.of(receiver.consumeAutoAck(queueName.asString()) + receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString()) .subscribeOn(Schedulers.parallel()) - .map(Delivery::getBody) - .filter(Objects::nonNull) - .map(eventInBytes -> new String(eventInBytes, StandardCharsets.UTF_8)) - .map(eventSerializer::fromJson) - .map(JsResult::get) + .filter(delivery -> Objects.nonNull(delivery.getBody())) + .flatMap(this::deliver) .subscribeOn(Schedulers.elastic()) - .subscribe(event -> deliverEvent(mailboxListener, event))); + .subscribe()); } - private void deliverEvent(MailboxListener mailboxListener, Event event) { - try { - mailboxListener.event(event); - } catch (Exception e) { - LOGGER.error("Exception happens when handling event of user {}", event.getUser().asString(), e); - } + private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { + byte[] eventAsBytes = acknowledgableDelivery.getBody(); + Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get(); + int currentRetryCount = getRetryCount(acknowledgableDelivery); + + return delayGenerator.delayIfHaveTo(currentRetryCount) + .flatMap(any -> Mono.fromRunnable(() -> mailboxListener.event(event))) + .onErrorResume(throwable -> retryHandler.handleRetry(eventAsBytes, event, currentRetryCount, throwable)) + .then(Mono.fromRunnable(acknowledgableDelivery::ack)) + .subscribeWith(MonoProcessor.create()) + .then(); + } + + static int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) { + return Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders()) + .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT))) + .filter(object -> object instanceof Integer) + .map(object -> (Integer) object) + .orElse(DEFAULT_RETRY_COUNT); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java index c0f4339..33ba13b 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistrationHandler.java @@ -35,11 +35,13 @@ class GroupRegistrationHandler { private final EventSerializer eventSerializer; private final Sender sender; private final Mono<Connection> connectionMono; + private final RetryBackoffConfiguration retryBackoff; - GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono) { + GroupRegistrationHandler(EventSerializer eventSerializer, Sender sender, Mono<Connection> connectionMono, RetryBackoffConfiguration retryBackoff) { this.eventSerializer = eventSerializer; this.sender = sender; this.connectionMono = connectionMono; + this.retryBackoff = retryBackoff; this.groupRegistrations = new ConcurrentHashMap<>(); } @@ -65,6 +67,7 @@ class GroupRegistrationHandler { eventSerializer, listener, group, + retryBackoff, () -> groupRegistrations.remove(group)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 447b345..e9e594e 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -44,24 +44,28 @@ class RabbitMQEventBus implements EventBus { private final EventSerializer eventSerializer; private final AtomicBoolean isRunning; private final RoutingKeyConverter routingKeyConverter; + private final RetryBackoffConfiguration retryBackoff; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; private EventDispatcher eventDispatcher; private Sender sender; - RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, RoutingKeyConverter routingKeyConverter) { + RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer, + RetryBackoffConfiguration retryBackoff, + RoutingKeyConverter routingKeyConverter) { this.connectionMono = Mono.fromSupplier(rabbitMQConnectionFactory::create).cache(); this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; + this.retryBackoff = retryBackoff; this.isRunning = new AtomicBoolean(false); } public void start() { if (!isRunning.get()) { sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)); - groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono); keyRegistrationHandler = new KeyRegistrationHandler(eventSerializer, sender, connectionMono, routingKeyConverter); + groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, connectionMono, retryBackoff); eventDispatcher = new EventDispatcher(eventSerializer, sender); eventDispatcher.start(); http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java index fdcf9cc..93a20df 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/WaitDelayGenerator.java @@ -29,37 +29,39 @@ import reactor.core.publisher.Mono; class WaitDelayGenerator { - static WaitDelayGenerator of(RetryBackoff retryBackoff) { + static WaitDelayGenerator of(RetryBackoffConfiguration retryBackoff) { return new WaitDelayGenerator(retryBackoff); } private static int randomBetween(int lowest, int highest) { Preconditions.checkArgument(lowest <= highest, "lowest always has to be less than or equals highest"); + if (lowest == highest) { + return lowest; + } return SECURE_RANDOM.nextInt(highest - lowest) + lowest; } private static final SecureRandom SECURE_RANDOM = new SecureRandom(); - private final RetryBackoff retryBackoff; + private final RetryBackoffConfiguration retryBackoff; - private WaitDelayGenerator(RetryBackoff retryBackoff) { + private WaitDelayGenerator(RetryBackoffConfiguration retryBackoff) { this.retryBackoff = retryBackoff; } Mono<Integer> delayIfHaveTo(int retryCount) { Mono<Integer> countRetryMono = Mono.just(retryCount); - if (retryCount < 1) { + if (!shouldDelay(retryCount)) { return countRetryMono; } return countRetryMono - .filter(count -> count <= retryBackoff.getMaxRetries()) .delayElement(generateDelay(retryCount)); } @VisibleForTesting Duration generateDelay(int retryCount) { - if (retryCount < 1) { + if (!shouldDelay(retryCount)) { return Duration.ZERO; } int exponentialFactor = Double.valueOf(Math.pow(2, retryCount - 1)).intValue(); @@ -68,4 +70,8 @@ class WaitDelayGenerator { return Duration.ofMillis(randomBetween(minDelay, maxDelay)); } + + private boolean shouldDelay(int retryCount) { + return retryCount >= 1 && retryCount <= retryBackoff.getMaxRetries(); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index dbb125a..ffcdac8 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GroupA; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.MailboxListenerCountingSuccessfulExecution; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; +import static org.apache.james.mailbox.events.EventBusTestFixture.newListener; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; import static org.assertj.core.api.Assertions.assertThat; @@ -77,7 +78,8 @@ import reactor.rabbitmq.SenderOptions; class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, GroupContract.MultipleEventBusGroupContract, EventBusConcurrentTestContract.SingleEventBusConcurrentContract, EventBusConcurrentTestContract.MultiEventBusConcurrentContract, - KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract { + KeyContract.SingleEventBusKeyContract, KeyContract.MultipleEventBusKeyContract, + ErrorHandlingContract { static class RabbitMQEventExtension implements BeforeEachCallback, AfterEachCallback { static final RabbitMQExtension rabbitMQExtension = new RabbitMQExtension(); @@ -131,9 +133,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventSerializer = new EventSerializer(mailboxIdFactory, new TestMessageId.Factory()); routingKeyConverter = RoutingKeyConverter.forFactories(new MailboxIdRegistrationKey.Factory(mailboxIdFactory)); - eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); - eventBus2 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); - eventBus3 = new RabbitMQEventBus(connectionFactory, eventSerializer, routingKeyConverter); + eventBus = newEventBus(); + eventBus2 = newEventBus(); + eventBus3 = newEventBus(); + eventBus.start(); eventBus2.start(); eventBus3.start(); @@ -152,6 +155,10 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, sender.close(); } + private RabbitMQEventBus newEventBus() { + return new RabbitMQEventBus(connectionFactory, eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter); + } + @Override public EventBus eventBus() { return eventBus; @@ -181,6 +188,17 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } + @Test + void registerGroupShouldCreateRetryExchange() throws Exception { + MailboxListener listener = newListener(); + EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA(); + eventBus.register(listener, registeredGroup); + + GroupConsumerRetry.RetryExchangeName retryExchangeName = GroupConsumerRetry.RetryExchangeName.of(registeredGroup); + assertThat(testExtension.rabbitMQExtension.managementAPI().listExchanges()) + .anyMatch(exchange -> exchange.getName().equals(retryExchangeName.asString())); + } + @Nested class PublishingTest { private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue"; http://git-wip-us.apache.org/repos/asf/james-project/blob/1f390f4f/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java index acc3a9f..9cb2d62 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/WaitDelayGeneratorTest.java @@ -30,7 +30,7 @@ class WaitDelayGeneratorTest { @Test void generateDelayShouldReturnZeroWhenZeroRetryCount() { - WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.defaultRetryBackoff()); + WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.DEFAULT); assertThat(generator.generateDelay(0)) .isEqualTo(Duration.ofMillis(0)); @@ -38,7 +38,7 @@ class WaitDelayGeneratorTest { @Test void generateDelayShouldReturnByRandomInRangeOfExponentialGrowthOfRetryCount() { - WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoff.builder() + WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder() .maxRetries(4) .firstBackoff(Duration.ofMillis(100)) .jitterFactor(0.5) @@ -55,4 +55,49 @@ class WaitDelayGeneratorTest { .isBetween(800L, 1200L); }); } + + @Test + void generateDelayShouldReturnZeroWhenZeroMaxRetries() { + WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder() + .maxRetries(0) + .firstBackoff(Duration.ofMillis(1000)) + .jitterFactor(0.5) + .build()); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO); + softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO); + softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO); + }); + } + + @Test + void generateDelayShouldReturnZeroWhenZeroFirstBackOff() { + WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder() + .maxRetries(3) + .firstBackoff(Duration.ZERO) + .jitterFactor(0.5) + .build()); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ZERO); + softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ZERO); + softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ZERO); + }); + } + + @Test + void generateDelayShouldReturnFloorOfExponentialGrowthStepsWhenZeroJitterFactor() { + WaitDelayGenerator generator = WaitDelayGenerator.of(RetryBackoffConfiguration.builder() + .maxRetries(3) + .firstBackoff(Duration.ofMillis(100)) + .jitterFactor(0.0) + .build()); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(generator.generateDelay(1)).isEqualTo(Duration.ofMillis(100)); + softly.assertThat(generator.generateDelay(2)).isEqualTo(Duration.ofMillis(200)); + softly.assertThat(generator.generateDelay(3)).isEqualTo(Duration.ofMillis(400)); + }); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org