This is an automated email from the ASF dual-hosted git repository.

hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7902ba492ac92aa2819e630cb21605c94976ab21
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Mon Jun 2 16:09:01 2025 +0700

    JAMES-3693 Redis cluster mode should support readFrom
    
    And apply readFrom for rate limiting when cluster mode.
---
 .../james/backends/redis/RedisConfiguration.scala  | 24 ++++---
 .../backends/redis/RedisClusterExtension.java      |  4 +-
 .../backends/redis/RedisConfigurationTest.scala    | 14 +++-
 .../redis/RedisClusterRateLimiterFactory.java      | 78 ++++++++++++++++++++++
 .../rate/limiter/redis/RedisRateLimiter.scala      |  6 +-
 5 files changed, 112 insertions(+), 14 deletions(-)

diff --git 
a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
 
b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
index cc4b387b79..6f1a591658 100644
--- 
a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
+++ 
b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala
@@ -27,7 +27,7 @@ import eu.timepit.refined.api.Refined
 import eu.timepit.refined.collection.NonEmpty
 import io.lettuce.core.{ReadFrom, RedisCredentials, RedisCredentialsProvider, 
RedisURI}
 import org.apache.commons.configuration2.Configuration
-import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, 
KEY_STORE_FILE_PATH_DEFAULT_VALUE, KEY_STORE_PASSWORD_DEFAULT_VALUE, 
MASTER_REPLICA_TOPOLOGY, REDIS_IGNORE_CERTIFICATE_CHECK, 
REDIS_IGNORE_CERTIFICATE_CHECK_DEFAULT_VALUE, 
REDIS_KEY_STORE_FILE_PATH_PROPERTY_NAME, 
REDIS_KEY_STORE_PASSWORD_PROPERTY_NAME, REDIS_READ_FROM_DEFAULT_VALUE, 
REDIS_READ_FROM_PROPERTY_NAME, REDIS_SENTINEL_PASSWORD, REDIS_USE_SSL, 
REDIS_USE_SSL_DEFAULT_VALUE, SENTINEL_TOPOLOGY, STANDALONE_T [...]
+import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, 
KEY_STORE_FILE_PATH_DEFAULT_VALUE, KEY_STORE_PASSWORD_DEFAULT_VALUE, 
MASTER_REPLICA_TOPOLOGY, REDIS_IGNORE_CERTIFICATE_CHECK, 
REDIS_IGNORE_CERTIFICATE_CHECK_DEFAULT_VALUE, 
REDIS_KEY_STORE_FILE_PATH_PROPERTY_NAME, 
REDIS_KEY_STORE_PASSWORD_PROPERTY_NAME, REDIS_SENTINEL_PASSWORD, REDIS_USE_SSL, 
REDIS_USE_SSL_DEFAULT_VALUE, SENTINEL_TOPOLOGY, STANDALONE_TOPOLOGY}
 import org.apache.james.backends.redis.RedisUris.{REDIS_URL_PROPERTY_NAME, 
RedisUris}
 import org.apache.james.filesystem.api.FileSystem
 import org.slf4j.{Logger, LoggerFactory}
@@ -56,6 +56,8 @@ object RedisConfiguration {
 
   def redisWorkerThreadsFrom(config: Configuration): Option[Int] = 
Option(config.getInteger("redis.workerThreads", null)).map(Integer2int)
 
+  def redisReadFrom(config: Configuration): ReadFrom = 
Option(config.getString(REDIS_READ_FROM_PROPERTY_NAME, 
null)).map(ReadFrom.valueOf).getOrElse(REDIS_READ_FROM_DEFAULT_VALUE)
+
   def from(config: Configuration): RedisConfiguration = {
     val redisConfiguration: RedisConfiguration = 
config.getString("redis.topology", STANDALONE_TOPOLOGY) match {
       case STANDALONE_TOPOLOGY => StandaloneRedisConfiguration.from(config)
@@ -139,7 +141,7 @@ object MasterReplicaRedisConfiguration {
     from(redisUris,
       config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE),
       SSLConfiguration.from(config),
-      Option(config.getString("redis.readFrom", 
null)).map(ReadFrom.valueOf).getOrElse(ReadFrom.MASTER),
+      RedisConfiguration.redisReadFrom(config),
       RedisConfiguration.redisIoThreadsFrom(config),
       RedisConfiguration.redisWorkerThreadsFrom(config))
   }
@@ -185,29 +187,33 @@ object ClusterRedisConfiguration {
       config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE),
       SSLConfiguration.from(config),
       RedisConfiguration.redisIoThreadsFrom(config),
-      RedisConfiguration.redisWorkerThreadsFrom(config))
+      RedisConfiguration.redisWorkerThreadsFrom(config),
+      RedisConfiguration.redisReadFrom(config))
 
   def from(redisUris: Array[String],
            ioThreads: Option[Int],
-           workerThreads: Option[Int]): ClusterRedisConfiguration =
-    from(redisUris, useSSL = false, Option.empty, ioThreads, workerThreads)
+           workerThreads: Option[Int],
+           readFrom: ReadFrom): ClusterRedisConfiguration =
+    from(redisUris, useSSL = false, Option.empty, ioThreads, workerThreads, 
readFrom)
 
   def from(redisUris: Array[String],
            useSSL: Boolean,
            mayBeSSLConfiguration: Option[SSLConfiguration],
            ioThreads: Option[Int] = None,
-           workerThreads: Option[Int] = None): ClusterRedisConfiguration = {
+           workerThreads: Option[Int] = None,
+           readFrom: ReadFrom): ClusterRedisConfiguration = {
     Preconditions.checkArgument(redisUris != null && redisUris.length > 0)
-    ClusterRedisConfiguration(RedisUris.from(redisUris), useSSL, 
mayBeSSLConfiguration, ioThreads, workerThreads)
+    ClusterRedisConfiguration(RedisUris.from(redisUris), useSSL, 
mayBeSSLConfiguration, ioThreads, workerThreads, readFrom)
   }
 }
 
-case class ClusterRedisConfiguration(redisURI: RedisUris, useSSL: Boolean, 
mayBeSSLConfiguration: Option[SSLConfiguration], ioThreads: Option[Int], 
workerThreads: Option[Int]) extends RedisConfiguration {
+case class ClusterRedisConfiguration(redisURI: RedisUris, useSSL: Boolean, 
mayBeSSLConfiguration: Option[SSLConfiguration], ioThreads: Option[Int], 
workerThreads: Option[Int], readFrom: ReadFrom) extends RedisConfiguration {
   override def asString: String = MoreObjects.toStringHelper(this)
     .add("topology", CLUSTER_TOPOLOGY)
     .add("redisURI", redisURI.value.map(_.toString).mkString(";"))
     .add("redis.ioThreads", ioThreads)
     .add("redis.workerThreads", workerThreads)
+    .add("readFrom", readFrom)
     .toString
 }
 
@@ -216,7 +222,7 @@ object SentinelRedisConfiguration {
     config.getStringArray(REDIS_URL_PROPERTY_NAME).mkString(","),
     config.getBoolean(REDIS_USE_SSL, REDIS_USE_SSL_DEFAULT_VALUE),
     SSLConfiguration.from(config),
-    Option(config.getString(REDIS_READ_FROM_PROPERTY_NAME, 
null)).map(ReadFrom.valueOf).getOrElse(REDIS_READ_FROM_DEFAULT_VALUE),
+    RedisConfiguration.redisReadFrom(config),
     Option(config.getString(REDIS_SENTINEL_PASSWORD, null)),
     RedisConfiguration.redisIoThreadsFrom(config),
     RedisConfiguration.redisWorkerThreadsFrom(config))
diff --git 
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
 
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
index eed1e3c3bb..347eca3c39 100644
--- 
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
+++ 
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java
@@ -52,6 +52,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 
+import io.lettuce.core.ReadFrom;
 import scala.jdk.javaapi.OptionConverters;
 
 public class RedisClusterExtension implements GuiceModuleTestExtension {
@@ -67,7 +68,8 @@ public class RedisClusterExtension implements 
GuiceModuleTestExtension {
                     .map(URI::toString)
                     .toArray(String[]::new),
                 OptionConverters.toScala(Optional.empty()),
-                OptionConverters.toScala(Optional.empty()));
+                OptionConverters.toScala(Optional.empty()),
+                ReadFrom.MASTER);
         }
 
         public void pauseOne() {
diff --git 
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
 
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
index 117391ea0e..1cc8547f54 100644
--- 
a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
+++ 
b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala
@@ -46,9 +46,21 @@ class RedisConfigurationTest extends AnyFlatSpec with 
Matchers {
     config.addProperty("redis.topology", "cluster")
     config.addProperty("redis.ioThreads", 16)
     config.addProperty("redis.workerThreads", 32)
+    config.addProperty("redis.readFrom", "any")
+
+    val redisConfig = RedisConfiguration.from(config)
+    redisConfig shouldEqual 
ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"),
 RedisURI.create("redis://localhost:6380"))), useSSL = false, 
mayBeSSLConfiguration = None, Some(16), Some(32), ReadFrom.ANY)
+  }
+
+  it should "use default values for missing config values when cluster mode" 
in {
+    val config = new PropertiesConfiguration()
+    config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
+    config.addProperty("redisURL", 
"redis://localhost:6379,redis://localhost:6380")
+    config.addProperty("redis.topology", "cluster")
 
     val redisConfig = RedisConfiguration.from(config)
-    redisConfig shouldEqual 
ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"),
 RedisURI.create("redis://localhost:6380"))), useSSL = false, 
mayBeSSLConfiguration = None, Some(16), Some(32))
+    redisConfig shouldEqual 
ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"),
 RedisURI.create("redis://localhost:6380"))), useSSL = false, 
mayBeSSLConfiguration = None,
+      ioThreads = None, workerThreads = None, ReadFrom.MASTER)
   }
 
   it should "use default values for missing config values" in {
diff --git 
a/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java
 
b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java
new file mode 100644
index 0000000000..2207c89ce0
--- /dev/null
+++ 
b/server/mailet/rate-limiter-redis/src/main/java/org/apache/james/rate/limiter/redis/RedisClusterRateLimiterFactory.java
@@ -0,0 +1,78 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter.redis;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Set;
+
+import 
es.moki.ratelimitj.core.limiter.request.AbstractRequestRateLimiterFactory;
+import es.moki.ratelimitj.core.limiter.request.ReactiveRequestRateLimiter;
+import es.moki.ratelimitj.core.limiter.request.RequestLimitRule;
+import es.moki.ratelimitj.core.limiter.request.RequestRateLimiter;
+import es.moki.ratelimitj.redis.request.RedisSlidingWindowRequestRateLimiter;
+import io.lettuce.core.ReadFrom;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
+
+/**
+ * Copy of {@link 
es.moki.ratelimitj.redis.request.RedisClusterRateLimiterFactory}, with readFrom 
support.
+ */
+public class RedisClusterRateLimiterFactory extends 
AbstractRequestRateLimiterFactory<RedisSlidingWindowRequestRateLimiter> {
+
+    private final RedisClusterClient client;
+    private StatefulRedisClusterConnection<String, String> connection;
+    private final ReadFrom readFrom;
+
+    public RedisClusterRateLimiterFactory(RedisClusterClient client, ReadFrom 
readFrom) {
+        this.client = requireNonNull(client);
+        this.readFrom = readFrom;
+    }
+
+    @Override
+    public RequestRateLimiter getInstance(Set<RequestLimitRule> rules) {
+        return lookupInstance(rules);
+    }
+
+    @Override
+    public ReactiveRequestRateLimiter 
getInstanceReactive(Set<RequestLimitRule> rules) {
+        return lookupInstance(rules);
+    }
+
+    @Override
+    protected RedisSlidingWindowRequestRateLimiter 
create(Set<RequestLimitRule> rules) {
+        getConnection().reactive();
+        return new 
RedisSlidingWindowRequestRateLimiter(getConnection().reactive(), 
getConnection().reactive(), rules);
+    }
+
+    @Override
+    public void close() {
+        client.shutdown();
+    }
+
+    private StatefulRedisClusterConnection<String, String> getConnection() {
+        // going to ignore race conditions at the cost of having multiple 
connections
+        if (connection == null) {
+            connection = client.connect();
+            connection.setReadFrom(readFrom);
+        }
+        return connection;
+    }
+}
\ No newline at end of file
diff --git 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
index fb9c64fea5..af1e0360d7 100644
--- 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
+++ 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
@@ -24,7 +24,7 @@ import java.time.Duration
 import com.google.common.collect.ImmutableList
 import com.google.inject.{AbstractModule, Provides, Scopes}
 import 
es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, 
ReactiveRequestRateLimiter, RequestLimitRule}
-import es.moki.ratelimitj.redis.request.{RedisClusterRateLimiterFactory, 
RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => 
RedisSingleInstanceRateLimitjFactory}
+import es.moki.ratelimitj.redis.request.{RedisSlidingWindowRequestRateLimiter, 
RedisRateLimiterFactory => RedisSingleInstanceRateLimitjFactory}
 import io.lettuce.core.cluster.RedisClusterClient
 import io.lettuce.core.{AbstractRedisClient, RedisClient}
 import jakarta.inject.Inject
@@ -55,8 +55,8 @@ class RedisRateLimiterFactory @Inject()(redisConfiguration: 
RedisConfiguration,
   private val rateLimitjFactory: 
AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = 
redisConfiguration match {
     case _: StandaloneRedisConfiguration => new 
RedisSingleInstanceRateLimitjFactory(rawRedisClient.asInstanceOf[RedisClient])
 
-    case _: ClusterRedisConfiguration =>
-      new 
RedisClusterRateLimiterFactory(rawRedisClient.asInstanceOf[RedisClusterClient])
+    case clusterRedisConfiguration: ClusterRedisConfiguration =>
+      new 
RedisClusterRateLimiterFactory(rawRedisClient.asInstanceOf[RedisClusterClient], 
clusterRedisConfiguration.readFrom)
 
     case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => 
new RedisMasterReplicaRateLimiterFactory(
       rawRedisClient.asInstanceOf[RedisClient],


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to