This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 982d7ecad4 JAMES-3693 Add healthcheck test for case redis sentinel
(#2358)
982d7ecad4 is described below
commit 982d7ecad4f98df23d82716e003400e07787e850
Author: hungphan227 <[email protected]>
AuthorDate: Fri Jul 26 10:01:22 2024 +0700
JAMES-3693 Add healthcheck test for case redis sentinel (#2358)
---
.../backends/redis/RedisSentinelExtension.java | 202 +++++++++++++++++++++
.../redis/RedisSentinelHealthCheckTest.scala | 72 ++++++++
.../redis/src/test/resources/sentinel.conf | 7 +
.../limiter/RedisRateLimiterWithSentinelTest.scala | 93 ++++++++++
4 files changed, 374 insertions(+)
diff --git
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelExtension.java
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelExtension.java
new file mode 100644
index 0000000000..43cbb6fe4f
--- /dev/null
+++
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelExtension.java
@@ -0,0 +1,202 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.redis;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.james.backends.redis.DockerRedis.DEFAULT_IMAGE_NAME;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import jakarta.inject.Singleton;
+
+import org.apache.james.GuiceModuleTestExtension;
+import org.apache.james.util.Runnables;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+
+import io.lettuce.core.ReadFrom;
+import scala.Function2;
+import scala.jdk.javaapi.OptionConverters;
+
+public class RedisSentinelExtension implements GuiceModuleTestExtension {
+ public static final int SENTINEL_PORT = 26379;
+
+ public static class RedisMasterReplicaContainerList extends
ArrayList<GenericContainer> {
+ public RedisMasterReplicaContainerList(Collection<? extends
GenericContainer> c) {
+ super(c);
+ }
+
+ public void pauseMasterNode() {
+ GenericContainer container = this.get(0);
+
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
+ }
+
+ public void unPauseMasterNode() {
+ GenericContainer container = this.get(0);
+ if
(TRUE.equals(container.getDockerClient().inspectContainerCmd(container.getContainerId())
+ .exec()
+ .getState()
+ .getPaused())) {
+
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
+ }
+ }
+ }
+
+ public static class RedisSentinelContainerList extends
ArrayList<GenericContainer> {
+ public RedisSentinelContainerList(Collection<? extends
GenericContainer> c) {
+ super(c);
+ }
+
+ public MasterReplicaRedisConfiguration getRedisConfiguration() {
+ return
MasterReplicaRedisConfiguration.from(ImmutableList.of(createRedisSentinelURI(this.stream().toList().subList(0,1)))
+ .toArray(String[]::new),
+ ReadFrom.MASTER,
+ OptionConverters.toScala(Optional.empty()),
+ OptionConverters.toScala(Optional.empty()));
+ }
+ }
+
+ public record RedisSentinelCluster(RedisMasterReplicaContainerList
redisMasterReplicaContainerList,
+ RedisSentinelContainerList
redisSentinelContainerList) {
+ }
+
+ public static final Function2<String, Boolean, GenericContainer>
redisContainerSupplier = (alias, isSlave) ->
+ new GenericContainer<>(DEFAULT_IMAGE_NAME)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd.withName("james-" + alias + "-test-" + UUID.randomUUID()))
+ .withCommand(Optional.of(isSlave).filter(aBoolean -> aBoolean)
+ .map(aBoolean -> "redis-server --appendonly yes --port 6379
--slaveof redis1 6379 --requirepass 1 --masterauth 1")
+ .orElse("redis-server --appendonly yes --port 6379
--requirepass 1"))
+ .withNetworkAliases(alias)
+ .waitingFor(Wait.forLogMessage(".*Ready to accept connections.*",
1)
+ .withStartupTimeout(Duration.ofMinutes(2)));
+
+ public static final Function<String, GenericContainer>
redisSentinelSupplier = (alias) ->
+ new GenericContainer<>(DEFAULT_IMAGE_NAME)
+ .withExposedPorts(SENTINEL_PORT)
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd.withName("james-" + alias + "-test-" + UUID.randomUUID()))
+ .withCommand("redis-sentinel /etc/redis/sentinel.conf")
+ .withClasspathResourceMapping("sentinel.conf",
+ "/etc/redis/sentinel.conf",
+ BindMode.READ_WRITE)
+ .withNetworkAliases(alias)
+ .waitingFor(Wait.forLogMessage(".*monitor master.*", 1)
+ .withStartupTimeout(Duration.ofMinutes(2)));
+
+ static final GenericContainer redis1 =
redisContainerSupplier.apply("redis1", false);
+ static final GenericContainer redis2 =
redisContainerSupplier.apply("redis2", true);
+ static final GenericContainer redis3 =
redisContainerSupplier.apply("redis3", true);
+ static final GenericContainer sentinel1 =
redisSentinelSupplier.apply("sentinel1");
+ static final GenericContainer sentinel2 =
redisSentinelSupplier.apply("sentinel2");
+ static final GenericContainer sentinel3 =
redisSentinelSupplier.apply("sentinel3");
+
+ private RedisMasterReplicaContainerList redisMasterReplicaContainerList;
+ private RedisSentinelContainerList redisSentinelContainerList;
+ private final Network network;
+
+ public RedisSentinelExtension() {
+ this(Network.newNetwork());
+ }
+
+ public RedisSentinelExtension(Network network) {
+ this.network = network;
+ redis1.withNetwork(network);
+ redis2.withNetwork(network);
+ redis3.withNetwork(network);
+ sentinel1.withNetwork(network);
+ sentinel2.withNetwork(network);
+ sentinel3.withNetwork(network);
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) {
+ redis1.start();
+ redis2.start();
+ redis3.start();
+ sentinel1.start();
+ sentinel2.start();
+ sentinel3.start();
+ redisMasterReplicaContainerList = new
RedisMasterReplicaContainerList(List.of(redis1, redis2, redis3));
+ redisSentinelContainerList = new
RedisSentinelContainerList(List.of(sentinel1, sentinel2, sentinel3));
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) {
+ Runnables.runParallel(
+ sentinel1::stop,
+ sentinel2::stop,
+ sentinel3::stop,
+ redis1::stop,
+ redis2::stop,
+ redis3::stop);
+ network.close();
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext extensionContext) {
+ redisMasterReplicaContainerList.forEach(Throwing.consumer(container ->
container.execInContainer("redis-cli", "flushall")));
+ }
+
+ @Override
+ public Module getModule() {
+ return new AbstractModule() {
+ @Provides
+ @Singleton
+ public RedisConfiguration provideRedisConfiguration() {
+ return redisSentinelContainerList.getRedisConfiguration();
+ }
+ };
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext,
ExtensionContext extensionContext) throws ParameterResolutionException {
+ return parameterContext.getParameter().getType() ==
RedisSentinelCluster.class;
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext,
ExtensionContext extensionContext) throws ParameterResolutionException {
+ return new RedisSentinelCluster(redisMasterReplicaContainerList,
redisSentinelContainerList);
+ }
+
+ private static String createRedisSentinelURI(List<GenericContainer>
containers) {
+ return new StringBuilder().append("redis-sentinel://1@")
+ .append(containers.stream().map(container -> container.getHost() +
":" + container.getMappedPort(SENTINEL_PORT))
+ .collect(Collectors.joining(",")))
+ .append("/0#mymaster")
+ .toString();
+ }
+}
\ No newline at end of file
diff --git
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala
new file mode 100644
index 0000000000..3ffb149f2c
--- /dev/null
+++
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala
@@ -0,0 +1,72 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ************************************************************** */
+
+package org.apache.james.backends.redis
+
+import java.util.concurrent.TimeUnit
+
+import
org.apache.james.backends.redis.RedisSentinelExtension.RedisSentinelCluster
+import org.assertj.core.api.Assertions.assertThat
+import org.awaitility.Awaitility
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import reactor.core.scala.publisher.SMono
+
+@ExtendWith(Array(classOf[RedisSentinelExtension]))
+class RedisSentinelHealthCheckTest {
+ var redisHealthCheck: RedisHealthCheck = _
+
+ @BeforeEach
+ def setup(redis: RedisSentinelCluster): Unit = {
+ redisHealthCheck = new
RedisHealthCheck(redis.redisSentinelContainerList.getRedisConfiguration)
+ }
+
+ @AfterEach
+ def afterEach(redis: RedisSentinelCluster): Unit = {
+ redis.redisMasterReplicaContainerList.unPauseMasterNode();
+ }
+
+ @Test
+ def checkShouldReturnHealthyWhenRedisIsRunning(): Unit = {
+ val result = SMono.fromPublisher(redisHealthCheck.check()).block()
+
+ assertThat(result.isHealthy).isTrue
+ }
+
+ @Test
+ def checkShouldReturnDegradedWhenRedisIsDown(redis: RedisSentinelCluster):
Unit = {
+ redis.redisMasterReplicaContainerList.pauseMasterNode()
+
+ Awaitility.await()
+ .pollInterval(2, TimeUnit.SECONDS)
+ .atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() =>
assertThat(SMono.fromPublisher(redisHealthCheck.check()).block().isDegraded).isTrue)
+ }
+
+ @Test
+ def checkShouldReturnHealthyWhenRedisIsRecovered(redis:
RedisSentinelCluster): Unit = {
+ redis.redisMasterReplicaContainerList.pauseMasterNode()
+ redis.redisMasterReplicaContainerList.unPauseMasterNode()
+
+ Awaitility.await()
+ .pollInterval(2, TimeUnit.SECONDS)
+ .atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() =>
assertThat(SMono.fromPublisher(redisHealthCheck.check()).block().isHealthy).isTrue)
+ }
+}
diff --git a/backends-common/redis/src/test/resources/sentinel.conf
b/backends-common/redis/src/test/resources/sentinel.conf
new file mode 100644
index 0000000000..3730585beb
--- /dev/null
+++ b/backends-common/redis/src/test/resources/sentinel.conf
@@ -0,0 +1,7 @@
+dir /tmp
+sentinel resolve-hostnames yes
+sentinel monitor mymaster redis1 6379 2
+sentinel auth-pass mymaster 1
+sentinel down-after-milliseconds mymaster 5000
+sentinel failover-timeout mymaster 10000
+sentinel parallel-syncs mymaster 1
\ No newline at end of file
diff --git
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala
new file mode 100644
index 0000000000..d80e9731d3
--- /dev/null
+++
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala
@@ -0,0 +1,93 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ************************************************************** */
+
+package org.apache.james.rate.limiter
+
+import java.time.Duration
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import eu.timepit.refined.auto._
+import org.apache.james.backends.redis.RedisSentinelExtension
+import
org.apache.james.backends.redis.RedisSentinelExtension.RedisSentinelCluster
+import
org.apache.james.rate.limiter.RedisRateLimiterWithMasterReplicaTopologyTest.{RULES,
SLIDING_WIDOW_PRECISION}
+import org.apache.james.rate.limiter.api._
+import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory
+import org.assertj.core.api.Assertions.{assertThat, assertThatCode}
+import org.awaitility.Awaitility
+import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.api.extension.ExtendWith
+import reactor.core.scala.publisher.SMono
+
+object RedisRateLimiterWithSentinelTest {
+ val SLIDING_WIDOW_PRECISION: Option[Duration] = Some(Duration.ofSeconds(1))
+ val RULES: Rules = Rules(Seq(Rule(4L, Duration.ofSeconds(2))))
+}
+
+@ExtendWith(Array(classOf[RedisSentinelExtension]))
+class RedisRateLimiterWithSentinelTest {
+
+ @AfterEach
+ def afterEach(redisClusterContainer: RedisSentinelCluster): Unit = {
+ redisClusterContainer.redisMasterReplicaContainerList.unPauseMasterNode()
+ }
+
+ @Test
+ def rateLimitShouldBeAcceptableWhenLimitIsAcceptable(redisClusterContainer:
RedisSentinelCluster): Unit = {
+ val rateLimiterFactory: RedisRateLimiterFactory = new
RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration)
+ val rateLimiter = rateLimiterFactory.withSpecification(RULES,
SLIDING_WIDOW_PRECISION)
+ val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key"
+ UUID.randomUUID().toString), 4)).block()
+ assertThat(actual).isEqualTo(AcceptableRate)
+ }
+
+ @Test
+ def rateLimitShouldWorkNormallyWhenLimitExceeded(redisClusterContainer:
RedisSentinelCluster): Unit = {
+ val rateLimiterFactory: RedisRateLimiterFactory = new
RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration)
+ val rateLimiter = rateLimiterFactory.withSpecification(RULES,
SLIDING_WIDOW_PRECISION)
+ val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key"
+ UUID.randomUUID().toString), 5)).block()
+ assertThat(actual).isEqualTo(RateExceeded)
+ }
+
+ @Test
+ def rateLimitShouldWorkNormallyAfterFailoverComplete(redisClusterContainer:
RedisSentinelCluster): Unit = {
+ val rateLimiterFactory: RedisRateLimiterFactory = new
RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration)
+ val rateLimiter = rateLimiterFactory.withSpecification(RULES,
SLIDING_WIDOW_PRECISION)
+
+ // Before failover, the rate limit should be working normally
+ assertThat(SMono(rateLimiter.rateLimit(TestKey("key" +
UUID.randomUUID().toString), 5)).block())
+ .isEqualTo(RateExceeded)
+
+ // Give stop redis-master node
+ redisClusterContainer.redisMasterReplicaContainerList.pauseMasterNode()
+ // Sleep for a while to let sentinel detect the failover. Here is 5 seconds
+ Thread.sleep(5000)
+
+ // After failover, the rate limit should be working normally
+ Awaitility.await()
+ .pollInterval(2, TimeUnit.SECONDS)
+ .atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() => assertThatCode(() =>
SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString),
1)).block())
+ .doesNotThrowAnyException())
+
+ assertThat(SMono(rateLimiter.rateLimit(TestKey("key" +
UUID.randomUUID().toString), 10)).block())
+ .isNotNull
+ assertThat(SMono(rateLimiter.rateLimit(TestKey("key" +
UUID.randomUUID().toString), 3)).block())
+ .isEqualTo(AcceptableRate)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]