This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dfb5d803795083b01a472bc0db42bb081f3f0ccd
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Tue Dec 5 12:15:33 2023 +0700

    JAMES-2586 PostgresExecutor: Retry upon PreparedStatement conflicts
    
    PreparedStatement id is unique per PG connection.
    We share a PG connection across multi threads leads to PreparedStatement id 
conflicts.
    We can retry upon PreparedStatement id conflicts.
---
 .../james/backends/postgres/utils/PostgresExecutor.java      | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 1fa3ccb410..b8c39ae88f 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backends.postgres.utils;
 
+import java.time.Duration;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -38,10 +39,13 @@ import com.google.common.annotations.VisibleForTesting;
 import io.r2dbc.spi.Connection;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class PostgresExecutor {
 
     public static final String DEFAULT_INJECT = "default";
+    public static final int MAX_RETRY_ATTEMPTS = 5;
+    public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
 
     public static class Factory {
 
@@ -78,22 +82,26 @@ public class PostgresExecutor {
     public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) 
{
         return dslContext()
             .flatMap(queryFunction)
+            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF))
             .then();
     }
 
     public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> 
queryFunction) {
         return dslContext()
-            .flatMapMany(queryFunction);
+            .flatMapMany(queryFunction)
+            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF));
     }
 
     public Mono<Record> executeRow(Function<DSLContext, Mono<Record>> 
queryFunction) {
         return dslContext()
-            .flatMap(queryFunction);
+            .flatMap(queryFunction)
+            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF));
     }
 
     public Mono<Integer> executeCount(Function<DSLContext, 
Mono<Record1<Integer>>> queryFunction) {
         return dslContext()
             .flatMap(queryFunction)
+            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF))
             .map(Record1::value1);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to