This is an automated email from the ASF dual-hosted git repository. hqtran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7902ba492ac92aa2819e630cb21605c94976ab21 Author: Quan Tran <hqt...@linagora.com> AuthorDate: Mon Jun 2 16:09:01 2025 +0700 JAMES-3693 Redis cluster mode should support readFrom And apply readFrom for rate limiting when cluster mode. --- .../james/backends/redis/RedisConfiguration.scala | 24 ++++--- .../backends/redis/RedisClusterExtension.java | 4 +- .../backends/redis/RedisConfigurationTest.scala | 14 +++- .../redis/RedisClusterRateLimiterFactory.java | 78 ++++++++++++++++++++++ .../rate/limiter/redis/RedisRateLimiter.scala | 6 +- 5 files changed, 112 insertions(+), 14 deletions(-) diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala index cc4b387b79..6f1a591658 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala @@ -27,7 +27,7 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.collection.NonEmpty import io.lettuce.core.{ReadFrom, RedisCredentials, RedisCredentialsProvider, RedisURI} import org.apache.commons.configuration2.Configuration -import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, KEY_STORE_FILE_PATH_DEFAULT_VALUE, KEY_STORE_PASSWORD_DEFAULT_VALUE, MASTER_REPLICA_TOPOLOGY, REDIS_IGNORE_CERTIFICATE_CHECK, REDIS_IGNORE_CERTIFICATE_CHECK_DEFAULT_VALUE, REDIS_KEY_STORE_FILE_PATH_PROPERTY_NAME, REDIS_KEY_STORE_PASSWORD_PROPERTY_NAME, REDIS_READ_FROM_DEFAULT_VALUE, REDIS_READ_FROM_PROPERTY_NAME, REDIS_SENTINEL_PASSWORD, REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE, SENTINEL_TOPOLOGY, STANDALONE_T [...] +import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, KEY_STORE_FILE_PATH_DEFAULT_VALUE, KEY_STORE_PASSWORD_DEFAULT_VALUE, MASTER_REPLICA_TOPOLOGY, REDIS_IGNORE_CERTIFICATE_CHECK, REDIS_IGNORE_CERTIFICATE_CHECK_DEFAULT_VALUE, REDIS_KEY_STORE_FILE_PATH_PROPERTY_NAME, REDIS_KEY_STORE_PASSWORD_PROPERTY_NAME, REDIS_SENTINEL_PASSWORD, REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE, SENTINEL_TOPOLOGY, STANDALONE_TOPOLOGY} import org.apache.james.backends.redis.RedisUris.{REDIS_URL_PROPERTY_NAME, RedisUris} import org.apache.james.filesystem.api.FileSystem import org.slf4j.{Logger, LoggerFactory} @@ -56,6 +56,8 @@ object RedisConfiguration { def redisWorkerThreadsFrom(config: Configuration): Option[Int] = Option(config.getInteger("redis.workerThreads", null)).map(Integer2int) + def redisReadFrom(config: Configuration): ReadFrom = Option(config.getString(REDIS_READ_FROM_PROPERTY_NAME, null)).map(ReadFrom.valueOf).getOrElse(REDIS_READ_FROM_DEFAULT_VALUE) + def from(config: Configuration): RedisConfiguration = { val redisConfiguration: RedisConfiguration = config.getString("redis.topology", STANDALONE_TOPOLOGY) match { case STANDALONE_TOPOLOGY => StandaloneRedisConfiguration.from(config) @@ -139,7 +141,7 @@ object MasterReplicaRedisConfiguration { from(redisUris, config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE), SSLConfiguration.from(config), - Option(config.getString("redis.readFrom", null)).map(ReadFrom.valueOf).getOrElse(ReadFrom.MASTER), + RedisConfiguration.redisReadFrom(config), RedisConfiguration.redisIoThreadsFrom(config), RedisConfiguration.redisWorkerThreadsFrom(config)) } @@ -185,29 +187,33 @@ object ClusterRedisConfiguration { config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE), SSLConfiguration.from(config), RedisConfiguration.redisIoThreadsFrom(config), - RedisConfiguration.redisWorkerThreadsFrom(config)) + RedisConfiguration.redisWorkerThreadsFrom(config), + RedisConfiguration.redisReadFrom(config)) def from(redisUris: Array[String], ioThreads: Option[Int], - workerThreads: Option[Int]): ClusterRedisConfiguration = - from(redisUris, useSSL = false, Option.empty, ioThreads, workerThreads) + workerThreads: Option[Int], + readFrom: ReadFrom): ClusterRedisConfiguration = + from(redisUris, useSSL = false, Option.empty, ioThreads, workerThreads, readFrom) def from(redisUris: Array[String], useSSL: Boolean, mayBeSSLConfiguration: Option[SSLConfiguration], ioThreads: Option[Int] = None, - workerThreads: Option[Int] = None): ClusterRedisConfiguration = { + workerThreads: Option[Int] = None, + readFrom: ReadFrom): ClusterRedisConfiguration = { Preconditions.checkArgument(redisUris != null && redisUris.length > 0) - ClusterRedisConfiguration(RedisUris.from(redisUris), useSSL, mayBeSSLConfiguration, ioThreads, workerThreads) + ClusterRedisConfiguration(RedisUris.from(redisUris), useSSL, mayBeSSLConfiguration, ioThreads, workerThreads, readFrom) } } -case class ClusterRedisConfiguration(redisURI: RedisUris, useSSL: Boolean, mayBeSSLConfiguration: Option[SSLConfiguration], ioThreads: Option[Int], workerThreads: Option[Int]) extends RedisConfiguration { +case class ClusterRedisConfiguration(redisURI: RedisUris, useSSL: Boolean, mayBeSSLConfiguration: Option[SSLConfiguration], ioThreads: Option[Int], workerThreads: Option[Int], readFrom: ReadFrom) extends RedisConfiguration { override def asString: String = MoreObjects.toStringHelper(this) .add("topology", CLUSTER_TOPOLOGY) .add("redisURI", redisURI.value.map(_.toString).mkString(";")) .add("redis.ioThreads", ioThreads) .add("redis.workerThreads", workerThreads) + .add("readFrom", readFrom) .toString } @@ -216,7 +222,7 @@ object SentinelRedisConfiguration { config.getStringArray(REDIS_URL_PROPERTY_NAME).mkString(","), config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE), SSLConfiguration.from(config), - Option(config.getString(REDIS_READ_FROM_PROPERTY_NAME, null)).map(ReadFrom.valueOf).getOrElse(REDIS_READ_FROM_DEFAULT_VALUE), + RedisConfiguration.redisReadFrom(config), Option(config.getString(REDIS_SENTINEL_PASSWORD, null)), RedisConfiguration.redisIoThreadsFrom(config), RedisConfiguration.redisWorkerThreadsFrom(config)) diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java index eed1e3c3bb..347eca3c39 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java @@ -52,6 +52,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Module; import com.google.inject.Provides; +import io.lettuce.core.ReadFrom; import scala.jdk.javaapi.OptionConverters; public class RedisClusterExtension implements GuiceModuleTestExtension { @@ -67,7 +68,8 @@ public class RedisClusterExtension implements GuiceModuleTestExtension { .map(URI::toString) .toArray(String[]::new), OptionConverters.toScala(Optional.empty()), - OptionConverters.toScala(Optional.empty())); + OptionConverters.toScala(Optional.empty()), + ReadFrom.MASTER); } public void pauseOne() { diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala index 117391ea0e..1cc8547f54 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala @@ -46,9 +46,21 @@ class RedisConfigurationTest extends AnyFlatSpec with Matchers { config.addProperty("redis.topology", "cluster") config.addProperty("redis.ioThreads", 16) config.addProperty("redis.workerThreads", 32) + config.addProperty("redis.readFrom", "any") + + val redisConfig = RedisConfiguration.from(config) + redisConfig shouldEqual ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380"))), useSSL = false, mayBeSSLConfiguration = None, Some(16), Some(32), ReadFrom.ANY) + } + + it should "use default values for missing config values when cluster mode" in { + val config = new PropertiesConfiguration() + config.setListDelimiterHandler(new DefaultListDelimiterHandler(',')) + config.addProperty("redisURL", "redis://localhost:6379,redis://localhost:6380") + config.addProperty("redis.topology", "cluster") val redisConfig = RedisConfiguration.from(config) - redisConfig shouldEqual ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380"))), useSSL = false, mayBeSSLConfiguration = None, Some(16), Some(32)) + redisConfig shouldEqual ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380"))), useSSL = false, mayBeSSLConfiguration = None, + ioThreads = None, workerThreads = None, ReadFrom.MASTER) } it should "use default values for missing config values" in { diff --git a/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java new file mode 100644 index 0000000000..2207c89ce0 --- /dev/null +++ b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java @@ -0,0 +1,78 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.rate.limiter.redis; + +import static java.util.Objects.requireNonNull; + +import java.util.Set; + +import es.moki.ratelimitj.core.limiter.request.AbstractRequestRateLimiterFactory; +import es.moki.ratelimitj.core.limiter.request.ReactiveRequestRateLimiter; +import es.moki.ratelimitj.core.limiter.request.RequestLimitRule; +import es.moki.ratelimitj.core.limiter.request.RequestRateLimiter; +import es.moki.ratelimitj.redis.request.RedisSlidingWindowRequestRateLimiter; +import io.lettuce.core.ReadFrom; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; + +/** + * Copy of {@link es.moki.ratelimitj.redis.request.RedisClusterRateLimiterFactory}, with readFrom support. + */ +public class RedisClusterRateLimiterFactory extends AbstractRequestRateLimiterFactory<RedisSlidingWindowRequestRateLimiter> { + + private final RedisClusterClient client; + private StatefulRedisClusterConnection<String, String> connection; + private final ReadFrom readFrom; + + public RedisClusterRateLimiterFactory(RedisClusterClient client, ReadFrom readFrom) { + this.client = requireNonNull(client); + this.readFrom = readFrom; + } + + @Override + public RequestRateLimiter getInstance(Set<RequestLimitRule> rules) { + return lookupInstance(rules); + } + + @Override + public ReactiveRequestRateLimiter getInstanceReactive(Set<RequestLimitRule> rules) { + return lookupInstance(rules); + } + + @Override + protected RedisSlidingWindowRequestRateLimiter create(Set<RequestLimitRule> rules) { + getConnection().reactive(); + return new RedisSlidingWindowRequestRateLimiter(getConnection().reactive(), getConnection().reactive(), rules); + } + + @Override + public void close() { + client.shutdown(); + } + + private StatefulRedisClusterConnection<String, String> getConnection() { + // going to ignore race conditions at the cost of having multiple connections + if (connection == null) { + connection = client.connect(); + connection.setReadFrom(readFrom); + } + return connection; + } +} \ No newline at end of file diff --git a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala index fb9c64fea5..af1e0360d7 100644 --- a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala +++ b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala @@ -24,7 +24,7 @@ import java.time.Duration import com.google.common.collect.ImmutableList import com.google.inject.{AbstractModule, Provides, Scopes} import es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, ReactiveRequestRateLimiter, RequestLimitRule} -import es.moki.ratelimitj.redis.request.{RedisClusterRateLimiterFactory, RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => RedisSingleInstanceRateLimitjFactory} +import es.moki.ratelimitj.redis.request.{RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => RedisSingleInstanceRateLimitjFactory} import io.lettuce.core.cluster.RedisClusterClient import io.lettuce.core.{AbstractRedisClient, RedisClient} import jakarta.inject.Inject @@ -55,8 +55,8 @@ class RedisRateLimiterFactory @Inject()(redisConfiguration: RedisConfiguration, private val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = redisConfiguration match { case _: StandaloneRedisConfiguration => new RedisSingleInstanceRateLimitjFactory(rawRedisClient.asInstanceOf[RedisClient]) - case _: ClusterRedisConfiguration => - new RedisClusterRateLimiterFactory(rawRedisClient.asInstanceOf[RedisClusterClient]) + case clusterRedisConfiguration: ClusterRedisConfiguration => + new RedisClusterRateLimiterFactory(rawRedisClient.asInstanceOf[RedisClusterClient], clusterRedisConfiguration.readFrom) case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => new RedisMasterReplicaRateLimiterFactory( rawRedisClient.asInstanceOf[RedisClient], --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org