This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a3c4781a50c3a50d237f0a0b9f42a1e3cb8907e9 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Mon Mar 31 16:32:57 2025 +0700 [JAMES-3693] Refactor RedisClientFactory to return an AbstractRedisClient --- .../james/backends/redis/RedisClientFactory.scala | 53 +++++---- .../james/backends/redis/RedisHealthCheck.scala | 121 +++++---------------- .../redis/RedisClusterHealthCheckTest.scala | 2 +- .../redis/RedisMasterReplicaHealthCheckTest.scala | 2 +- .../redis/RedisSentinelHealthCheckTest.scala | 2 +- .../redis/RedisStandaloneHealthCheckTest.scala | 2 +- .../RedisTLSMasterReplicaHealthCheckTest.java | 3 +- .../redis/RedisTLSSentinelHealthCheckTest.java | 3 +- .../redis/RedisTLSStandaloneHealthCheckTest.scala | 2 +- .../james/rate/limiter/RedisRateLimiterTest.scala | 2 +- .../limiter/RedisRateLimiterWithSentinelTest.scala | 21 ++-- .../limiter/TopologyRedisRateLimiterTest.scala | 2 +- 12 files changed, 77 insertions(+), 138 deletions(-) diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisClientFactory.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisClientFactory.scala index 8f975c843e..6343193602 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisClientFactory.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisClientFactory.scala @@ -21,9 +21,9 @@ package org.apache.james.backends.redis import java.time.Duration -import io.lettuce.core.{ClientOptions, RedisClient, SslOptions} import io.lettuce.core.cluster.{ClusterClientOptions, RedisClusterClient} import io.lettuce.core.resource.ClientResources +import io.lettuce.core.{AbstractRedisClient, ClientOptions, RedisClient, SslOptions} import jakarta.inject.{Inject, Singleton} import org.apache.james.filesystem.api.FileSystem import org.apache.james.util.concurrent.NamedThreadFactory @@ -31,52 +31,59 @@ import org.apache.james.util.concurrent.NamedThreadFactory import scala.jdk.CollectionConverters._ class RedisClientFactory @Singleton() @Inject() -(fileSystem: FileSystem) { - def createStandaloneClient(redisConfiguration: StandaloneRedisConfiguration): RedisClient = - createStandaloneClient(redisConfiguration, Option.empty) +(fileSystem: FileSystem, redisConfiguration: RedisConfiguration) { + def createRawRedisClient(): AbstractRedisClient = redisConfiguration match { + case standaloneRedisConfiguration: StandaloneRedisConfiguration => createStandaloneClient(standaloneRedisConfiguration) + case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => createMasterReplicaClient(masterReplicaRedisConfiguration) + case clusterRedisConfiguration: ClusterRedisConfiguration => createClusterClient(clusterRedisConfiguration) + case sentinelRedisConfiguration: SentinelRedisConfiguration => createSentinelClient(sentinelRedisConfiguration) + } + + def createStandaloneClient(standaloneRedisConfiguration: StandaloneRedisConfiguration): RedisClient = + createStandaloneClient(standaloneRedisConfiguration, Option.empty) - def createStandaloneClient(redisConfiguration: StandaloneRedisConfiguration, timeout: Duration): RedisClient = - createStandaloneClient(redisConfiguration, Option.apply(timeout)) + def createStandaloneClient(standaloneRedisConfiguration: StandaloneRedisConfiguration, timeout: Duration): RedisClient = + createStandaloneClient(standaloneRedisConfiguration, Option.apply(timeout)) - def createStandaloneClient(redisConfiguration: StandaloneRedisConfiguration, maybeTimeout: Option[Duration]): RedisClient = { - maybeTimeout.foreach(timeout => redisConfiguration.redisURI.setTimeout(timeout)) - val redisClient = RedisClient.create(redisConfiguration.redisURI) - redisClient.setOptions(createClientOptions(redisConfiguration.useSSL, redisConfiguration.mayBeSSLConfiguration)) + def createStandaloneClient(standaloneRedisConfiguration: StandaloneRedisConfiguration, maybeTimeout: Option[Duration]): RedisClient = { + maybeTimeout.foreach(timeout => standaloneRedisConfiguration.redisURI.setTimeout(timeout)) + val redisClient = RedisClient.create(standaloneRedisConfiguration.redisURI) + redisClient.setOptions(createClientOptions(standaloneRedisConfiguration.useSSL, standaloneRedisConfiguration.mayBeSSLConfiguration)) redisClient } - def createClusterClient(redisConfiguration: ClusterRedisConfiguration): RedisClusterClient = - createClusterClient(redisConfiguration, Option.empty) + def createClusterClient(clusterRedisConfiguration: ClusterRedisConfiguration): RedisClusterClient = + createClusterClient(clusterRedisConfiguration, Option.empty) - def createClusterClient(redisConfiguration: ClusterRedisConfiguration, timeout: Duration): RedisClusterClient = - createClusterClient(redisConfiguration, Option.apply(timeout)) + def createClusterClient(clusterRedisConfiguration: ClusterRedisConfiguration, timeout: Duration): RedisClusterClient = + createClusterClient(clusterRedisConfiguration, Option.apply(timeout)) - def createClusterClient(redisConfiguration: ClusterRedisConfiguration, maybeTimeout: Option[Duration]): RedisClusterClient = { + def createClusterClient(clusterRedisConfiguration: ClusterRedisConfiguration, maybeTimeout: Option[Duration]): RedisClusterClient = { val resourceBuilder: ClientResources.Builder = ClientResources.builder() .threadFactoryProvider(poolName => NamedThreadFactory.withName(s"redis-driver-$poolName")) - redisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value)) - redisConfiguration.workerThreads.foreach(value => resourceBuilder.computationThreadPoolSize(value)) + clusterRedisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value)) + clusterRedisConfiguration.workerThreads.foreach(value => resourceBuilder.computationThreadPoolSize(value)) val redisClient = RedisClusterClient.create(resourceBuilder.build(), - redisConfiguration.redisURI.value + clusterRedisConfiguration.redisURI.value .map(rURI => { maybeTimeout.foreach(timeout => rURI.setTimeout(timeout)) rURI }).asJava) redisClient.setOptions(ClusterClientOptions.builder( - createClientOptions(redisConfiguration.useSSL, redisConfiguration.mayBeSSLConfiguration)) + createClientOptions(clusterRedisConfiguration.useSSL, clusterRedisConfiguration.mayBeSSLConfiguration)) .build()) redisClient } - def createMasterReplicaClient(redisConfiguration: MasterReplicaRedisConfiguration): RedisClient = { + def createMasterReplicaClient(masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration): RedisClient = { val redisClient = RedisClient.create - redisClient.setOptions(createClientOptions(redisConfiguration.useSSL, redisConfiguration.mayBeSSLConfiguration)) + redisClient.setOptions(createClientOptions(masterReplicaRedisConfiguration.useSSL, masterReplicaRedisConfiguration.mayBeSSLConfiguration)) redisClient } - def createSentinelClient(redisConfiguration: SentinelRedisConfiguration): RedisClient = { + def createSentinelClient(sentinelRedisConfiguration: SentinelRedisConfiguration): RedisClient = { val redisClient = RedisClient.create - redisClient.setOptions(createClientOptions(redisConfiguration.useSSL, redisConfiguration.mayBeSSLConfiguration)) + redisClient.setOptions(createClientOptions(sentinelRedisConfiguration.useSSL, sentinelRedisConfiguration.mayBeSSLConfiguration)) redisClient } diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala index 94ce2345b4..30154f154e 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala @@ -21,15 +21,16 @@ package org.apache.james.backends.redis import java.time.Duration -import io.lettuce.core.RedisClient import io.lettuce.core.api.reactive.RedisReactiveCommands import io.lettuce.core.cluster.RedisClusterClient import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands import io.lettuce.core.codec.StringCodec +import io.lettuce.core.masterreplica.MasterReplica +import io.lettuce.core.{AbstractRedisClient, RedisClient} import jakarta.annotation.PreDestroy import jakarta.inject.Inject import org.apache.commons.lang3.StringUtils -import org.apache.james.backends.redis.RedisHealthCheck.redisComponent +import org.apache.james.backends.redis.RedisHealthCheck.{healthcheckTimeout, redisComponent} import org.apache.james.core.healthcheck.{ComponentName, HealthCheck, Result} import org.reactivestreams.Publisher import reactor.core.publisher.Mono @@ -41,17 +42,15 @@ import scala.jdk.DurationConverters._ object RedisHealthCheck { val redisComponent: ComponentName = new ComponentName("Redis") + val healthcheckTimeout: Duration = Duration.ofSeconds(3) } -class RedisHealthCheck @Inject()(redisConfiguration: RedisConfiguration, redisClientFactory: RedisClientFactory) extends HealthCheck { +class RedisHealthCheck @Inject()(redisClientFactory: RedisClientFactory, redisConfiguration: RedisConfiguration) extends HealthCheck { - private val healthcheckTimeout: Duration = Duration.ofSeconds(3) - private val healthcheckPerform: RedisHealthcheckPerform = redisConfiguration match { - case standaloneConfiguration: StandaloneRedisConfiguration => new RedisStandaloneHealthCheckPerform(standaloneConfiguration, redisClientFactory, healthcheckTimeout) - case clusterConfiguration: ClusterRedisConfiguration => new RedisClusterHealthCheckPerform(clusterConfiguration, redisClientFactory, healthcheckTimeout) - case masterReplicaConfiguration: MasterReplicaRedisConfiguration => new RedisMasterReplicaHealthCheckPerform(masterReplicaConfiguration, redisClientFactory, healthcheckTimeout) - case sentinelRedisConfiguration: SentinelRedisConfiguration => - new RedisSentinelHealthCheckPerform(sentinelRedisConfiguration, redisClientFactory, healthcheckTimeout) + private val rawRedisClient: AbstractRedisClient = redisClientFactory.createRawRedisClient() + private val healthcheckPerform: RedisHealthcheckPerform = rawRedisClient match { + case redisClusterClient: RedisClusterClient => new RedisClusterHealthCheckPerform(redisClusterClient) + case redisClient: RedisClient => new RedisRegularHealthCheckPerform(redisClient, redisConfiguration) case _ => throw new NotImplementedError() } @@ -62,25 +61,31 @@ class RedisHealthCheck @Inject()(redisConfiguration: RedisConfiguration, redisCl .onErrorResume(_ => SMono.just(Result.degraded(redisComponent, "Can not connect to Redis."))) @PreDestroy - def close(): Unit = healthcheckPerform.close() + def close(): Unit = + Mono.fromCompletionStage(rawRedisClient.shutdownAsync()) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() } sealed trait RedisHealthcheckPerform { def check(): SMono[Result] - - def close(): Unit } -class RedisStandaloneHealthCheckPerform(val redisConfiguration: StandaloneRedisConfiguration, - val redisClientFactory: RedisClientFactory, - val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { +class RedisRegularHealthCheckPerform(val redisClient: RedisClient, redisConfiguration: RedisConfiguration) extends RedisHealthcheckPerform { private val PING_SUCCESS_RESPONSE = "PONG" - private val redisClient: RedisClient = - redisClientFactory.createStandaloneClient(redisConfiguration, healthcheckTimeout) - - private val redisCommand: RedisReactiveCommands[String, String] = redisClient.connect().reactive() + private val redisCommand: RedisReactiveCommands[String, String] = redisConfiguration match { + case _: StandaloneRedisConfiguration => redisClient.connect().reactive() + case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => MasterReplica.connect(redisClient, + StringCodec.UTF8, + masterReplicaRedisConfiguration.redisURI.value.asJava) + .reactive() + case sentinelRedisConfiguration: SentinelRedisConfiguration => MasterReplica.connect(redisClient, + StringCodec.UTF8, + sentinelRedisConfiguration.redisURI) + .reactive() + } override def check(): SMono[Result] = SMono(redisCommand.ping()) @@ -88,20 +93,11 @@ class RedisStandaloneHealthCheckPerform(val redisConfiguration: StandaloneRedisC .filter(_ == PING_SUCCESS_RESPONSE) .map(_ => Result.healthy(redisComponent)) .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not PING to Redis."))) - - override def close(): Unit = - Mono.fromCompletionStage(redisClient.shutdownAsync()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - } -class RedisClusterHealthCheckPerform(val redisConfiguration: ClusterRedisConfiguration, - val redisClientFactory: RedisClientFactory, - val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { +class RedisClusterHealthCheckPerform(val redisClusterClient: RedisClusterClient) extends RedisHealthcheckPerform { private val CLUSTER_STATUS_OK: String = "ok" - private val redisClusterClient: RedisClusterClient = redisClientFactory.createClusterClient(redisConfiguration, healthcheckTimeout) private val redisCommand: RedisAdvancedClusterReactiveCommands[String, String] = { redisClusterClient.getPartitions @@ -116,69 +112,4 @@ class RedisClusterHealthCheckPerform(val redisConfiguration: ClusterRedisConfigu case CLUSTER_STATUS_OK => Result.healthy(redisComponent) case unExpectedState => Result.degraded(redisComponent, "Redis cluster state: " + unExpectedState) } - - override def close(): Unit = Mono.fromCompletionStage(redisClusterClient.shutdownAsync()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() -} - -class RedisMasterReplicaHealthCheckPerform(val redisConfiguration: MasterReplicaRedisConfiguration, - val redisClientFactory: RedisClientFactory, - val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { - - private val PING_SUCCESS_RESPONSE = "PONG" - - private val redisClient: RedisClient = redisClientFactory.createMasterReplicaClient(redisConfiguration) - - private val redisCommand: RedisReactiveCommands[String, String] = io.lettuce.core.masterreplica.MasterReplica.connect(redisClient, - StringCodec.UTF8, - redisConfiguration.redisURI.value - .map(rURI => { - rURI.setTimeout(healthcheckTimeout) - rURI - }).asJava) - .reactive() - - override def check(): SMono[Result] = - SMono(redisCommand.ping()) - .timeout(healthcheckTimeout.toScala) - .filter(_ == PING_SUCCESS_RESPONSE) - .map(_ => Result.healthy(redisComponent)) - .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not PING to Redis."))) - - override def close(): Unit = - Mono.fromCompletionStage(redisClient.shutdownAsync()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - -} - -class RedisSentinelHealthCheckPerform(val redisConfiguration: SentinelRedisConfiguration, - val redisClientFactory: RedisClientFactory, - val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { - - private val PING_SUCCESS_RESPONSE = "PONG" - - private val redisClient: RedisClient = redisClientFactory.createSentinelClient(redisConfiguration) - - private val redisCommand: RedisReactiveCommands[String, String] = { - redisConfiguration.redisURI.setTimeout(healthcheckTimeout) - io.lettuce.core.masterreplica.MasterReplica.connect(redisClient, - StringCodec.UTF8, - redisConfiguration.redisURI) - .reactive() - } - - override def check(): SMono[Result] = - SMono(redisCommand.ping()) - .timeout(healthcheckTimeout.toScala) - .filter(_ == PING_SUCCESS_RESPONSE) - .map(_ => Result.healthy(redisComponent)) - .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not PING to Redis."))) - - override def close(): Unit = - Mono.fromCompletionStage(redisClient.shutdownAsync()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() - } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterHealthCheckTest.scala index 89d83f22f3..4e5d3c493b 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterHealthCheckTest.scala @@ -35,7 +35,7 @@ class RedisClusterHealthCheckTest { @BeforeEach def setup(redis: RedisClusterContainer): Unit = { - redisHealthCheck = new RedisHealthCheck(redis.getRedisConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redis.getRedisConfiguration), redis.getRedisConfiguration) } @AfterEach diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala index b34f80baff..db101b7f01 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala @@ -31,7 +31,7 @@ class RedisMasterReplicaHealthCheckTest extends RedisHealthCheckTest { @BeforeEach def setup(redis: RedisMasterReplicaContainer): Unit = { - redisHealthCheck = new RedisHealthCheck(redis.getRedisConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redis.getRedisConfiguration), redis.getRedisConfiguration) redisMasterReplicaContainer = redis } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala index ffa632788e..9f1b03d028 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisSentinelHealthCheckTest.scala @@ -31,7 +31,7 @@ class RedisSentinelHealthCheckTest extends RedisHealthCheckTest { @BeforeEach def setup(redis: RedisSentinelCluster): Unit = { - redisHealthCheck = new RedisHealthCheck(redis.redisSentinelContainerList.getRedisConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redis.redisSentinelContainerList.getRedisConfiguration), redis.redisSentinelContainerList.getRedisConfiguration) redisSentinelCluster = redis } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala index f231c5c75b..ff3e2639ed 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala @@ -36,7 +36,7 @@ class RedisStandaloneHealthCheckTest { def setup(redis: DockerRedis): Unit = { val redisConfiguration: StandaloneRedisConfiguration = StandaloneRedisConfiguration.from(redis.redisURI().toString) - redisHealthCheck = new RedisHealthCheck(redisConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redisConfiguration), redisConfiguration) } @AfterEach diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSMasterReplicaHealthCheckTest.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSMasterReplicaHealthCheckTest.java index fac899cbde..0af0192977 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSMasterReplicaHealthCheckTest.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSMasterReplicaHealthCheckTest.java @@ -34,7 +34,8 @@ class RedisTLSMasterReplicaHealthCheckTest extends RedisHealthCheckTest { @BeforeEach void setup() { - redisHealthCheck = new RedisHealthCheck(redisMasterReplicaExtension.getRedisMasterReplicaContainer().getRedisConfiguration(), new RedisClientFactory(FileSystemImpl.forTesting())); + MasterReplicaRedisConfiguration redisConfiguration = redisMasterReplicaExtension.getRedisMasterReplicaContainer().getRedisConfiguration(); + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redisConfiguration), redisConfiguration); redisMasterReplicaContainer = redisMasterReplicaExtension.getRedisMasterReplicaContainer(); } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSSentinelHealthCheckTest.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSSentinelHealthCheckTest.java index 0fda7e1ab0..9eff3cd19a 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSSentinelHealthCheckTest.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSSentinelHealthCheckTest.java @@ -34,7 +34,8 @@ class RedisTLSSentinelHealthCheckTest extends RedisHealthCheckTest { @BeforeEach public void setup() { - redisHealthCheck = new RedisHealthCheck(redisSentinelExtension.getRedisSentinelCluster().redisSentinelContainerList().getRedisConfiguration(), new RedisClientFactory(FileSystemImpl.forTesting())); + SentinelRedisConfiguration redisConfiguration = redisSentinelExtension.getRedisSentinelCluster().redisSentinelContainerList().getRedisConfiguration(); + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redisConfiguration), redisConfiguration); redisSentinelCluster = redisSentinelExtension.getRedisSentinelCluster(); } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSStandaloneHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSStandaloneHealthCheckTest.scala index f17be8a6c8..0bccdada1c 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSStandaloneHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisTLSStandaloneHealthCheckTest.scala @@ -31,7 +31,7 @@ class RedisTLSStandaloneHealthCheckTest extends RedisHealthCheckTest { @BeforeEach def setup(redisContainer: RedisContainer): Unit = { - redisHealthCheck = new RedisHealthCheck(redisContainer.getConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + redisHealthCheck = new RedisHealthCheck(new RedisClientFactory(FileSystemImpl.forTesting(), redisContainer.getConfiguration), redisContainer.getConfiguration) this.redisContainer = redisContainer; } diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala index ecc83514f8..c1a4b4b026 100644 --- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala +++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala @@ -38,7 +38,7 @@ class RedisRateLimiterTest extends RateLimiterContract { redisRateLimiterConfiguration = StandaloneRedisConfiguration.from(redis.redisURI().toString) } - override def testee(): RateLimiterFactory = new RedisRateLimiterFactory(redisRateLimiterConfiguration, new RedisClientFactory(FileSystemImpl.forTesting())) + override def testee(): RateLimiterFactory = new RedisRateLimiterFactory(redisRateLimiterConfiguration, new RedisClientFactory(FileSystemImpl.forTesting(), redisRateLimiterConfiguration)) override def sleep(duration: Duration): Unit = Thread.sleep(duration.toMillis) } diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala index ce88c5c547..0af6345cf1 100644 --- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala +++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithSentinelTest.scala @@ -33,7 +33,7 @@ import org.apache.james.server.core.filesystem.FileSystemImpl import org.assertj.core.api.Assertions.{assertThat, assertThatCode} import org.awaitility.Awaitility import org.junit.jupiter.api.extension.ExtendWith -import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import reactor.core.scala.publisher.SMono object RedisRateLimiterWithSentinelTest { @@ -43,6 +43,15 @@ object RedisRateLimiterWithSentinelTest { @ExtendWith(Array(classOf[RedisSentinelExtension])) class RedisRateLimiterWithSentinelTest { + var rateLimiter: RateLimiter = _ + + @BeforeEach + def setUp(redisClusterContainer: RedisSentinelCluster): Unit = { + val redisConfiguration = redisClusterContainer.redisSentinelContainerList.getRedisConfiguration + val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(redisConfiguration, + new RedisClientFactory(FileSystemImpl.forTesting(), redisConfiguration)) + rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) + } @AfterEach def afterEach(redisClusterContainer: RedisSentinelCluster): Unit = { @@ -52,28 +61,18 @@ class RedisRateLimiterWithSentinelTest { @Test def rateLimitShouldBeAcceptableWhenLimitIsAcceptable(redisClusterContainer: RedisSentinelCluster): Unit = { - val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration, - new RedisClientFactory(FileSystemImpl.forTesting())) - val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 4)).block() assertThat(actual).isEqualTo(AcceptableRate) } @Test def rateLimitShouldWorkNormallyWhenLimitExceeded(redisClusterContainer: RedisSentinelCluster): Unit = { - val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration, - new RedisClientFactory(FileSystemImpl.forTesting())) - val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 5)).block() assertThat(actual).isEqualTo(RateExceeded) } @Test def rateLimitShouldWorkNormallyAfterFailoverComplete(redisClusterContainer: RedisSentinelCluster): Unit = { - val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.redisSentinelContainerList.getRedisConfiguration, - new RedisClientFactory(FileSystemImpl.forTesting())) - val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) - // Before failover, the rate limit should be working normally assertThat(SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 5)).block()) .isEqualTo(RateExceeded) diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/TopologyRedisRateLimiterTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/TopologyRedisRateLimiterTest.scala index 22ed53ee17..d4e4b67dd9 100644 --- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/TopologyRedisRateLimiterTest.scala +++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/TopologyRedisRateLimiterTest.scala @@ -41,7 +41,7 @@ trait TopologyRedisRateLimiterTest { @Test def rateLimitShouldWorkNormally(): Unit = { - val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(getRedisConfiguration(), new RedisClientFactory(FileSystemImpl.forTesting())) + val rateLimiterFactory: RedisRateLimiterFactory = new RedisRateLimiterFactory(getRedisConfiguration(), new RedisClientFactory(FileSystemImpl.forTesting(), getRedisConfiguration())) val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key1"), 4)).block() assertThat(actual).isEqualTo(AcceptableRate) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org