chibenwa commented on code in PR #1671:
URL: https://github.com/apache/james-project/pull/1671#discussion_r1295394799


##########
server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisHealthCheck.scala:
##########
@@ -20,33 +20,43 @@
 package org.apache.james.rate.limiter.redis
 
 import java.time.Duration
-import java.util.concurrent.TimeUnit
 
-import eu.timepit.refined.auto._
+import io.lettuce.core.api.StatefulConnection
+import io.lettuce.core.cluster.RedisClusterClient
+import io.lettuce.core.{RedisClient, RedisURI}
 import javax.inject.Inject
 import org.apache.james.core.healthcheck.{ComponentName, HealthCheck, Result}
-import org.apache.james.rate.limiter.api.{RateLimitingKey, Rule, Rules}
 import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
 import reactor.core.scala.publisher.SMono
 
-import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
 
-sealed class RedisHealthCheckKey() extends RateLimitingKey {
-  override def asString(): String = "RedisHealthCheck"
-}
-
-class RedisHealthCheck @Inject()(redisRateLimiterFactory: 
RedisRateLimiterFactory) extends HealthCheck {
+class RedisHealthCheck @Inject()(redisConfiguration: 
RedisRateLimiterConfiguration) extends HealthCheck {
   private val redisComponent: ComponentName = new ComponentName("Redis")
-  private val redisHealthcheckKey: RedisHealthCheckKey = new 
RedisHealthCheckKey
-  private val flexibleRateLimitRule = Rules(Seq(Rule(100000L, 
Duration.ofSeconds(60))))
-  private val redisHealthcheckRateLimiter = 
redisRateLimiterFactory.withSpecification(flexibleRateLimitRule, None)
-  private val healthcheckTimeout = new FiniteDuration(3, TimeUnit.SECONDS)
+  private val healthcheckTimeout = Duration.ofSeconds(3)
 
   override def componentName(): ComponentName = redisComponent
 
   override def check(): Publisher[Result] =
-    
SMono.fromPublisher(redisHealthcheckRateLimiter.rateLimit(redisHealthcheckKey, 
1))
-      .timeout(healthcheckTimeout)
+    connectRedis()
       .`then`(SMono.just(Result.healthy(redisComponent)))
-      .onErrorResume(e => SMono.just(Result.unhealthy(redisComponent, "Can not 
connect to Redis.", e)))
+      .onErrorResume(_ => SMono.just(Result.degraded(redisComponent, "Can not 
connect to Redis.")))
+
+  private def connectRedis(): SMono[StatefulConnection[String, String]] =
+    if (redisConfiguration.isCluster) {
+      val redisUris = redisConfiguration.redisURI.value.asJava
+      redisUris.forEach(redisUri => redisUri.setTimeout(healthcheckTimeout))
+
+      SMono.fromPublisher(Mono.using(() => 
RedisClusterClient.create(redisUris),
+        (redisClusterClient: RedisClusterClient) => Mono.fromCallable(() => 
redisClusterClient.connect()),
+        (redisClusterClient: RedisClusterClient) => 
redisClusterClient.close()))
+    } else {
+      val redisUri: RedisURI = redisConfiguration.redisURI.value.last
+      redisUri.setTimeout(healthcheckTimeout)
+
+      SMono.fromPublisher(Mono.using(() => RedisClient.create(redisUri),
+        (redisClient: RedisClient) => Mono.fromCallable(() => 
redisClient.connect()),

Review Comment:
   Is connect blocking? If yes, is there a way to do in on a non blocking 
fashion? 
   If impossible please do this on boundedElastic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to