This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dfb5d803795083b01a472bc0db42bb081f3f0ccd Author: Quan Tran <hqt...@linagora.com> AuthorDate: Tue Dec 5 12:15:33 2023 +0700 JAMES-2586 PostgresExecutor: Retry upon PreparedStatement conflicts PreparedStatement id is unique per PG connection. We share a PG connection across multi threads leads to PreparedStatement id conflicts. We can retry upon PreparedStatement id conflicts. --- .../james/backends/postgres/utils/PostgresExecutor.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java index 1fa3ccb410..b8c39ae88f 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java @@ -19,6 +19,7 @@ package org.apache.james.backends.postgres.utils; +import java.time.Duration; import java.util.Optional; import java.util.function.Function; @@ -38,10 +39,13 @@ import com.google.common.annotations.VisibleForTesting; import io.r2dbc.spi.Connection; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class PostgresExecutor { public static final String DEFAULT_INJECT = "default"; + public static final int MAX_RETRY_ATTEMPTS = 5; + public static final Duration MIN_BACKOFF = Duration.ofMillis(1); public static class Factory { @@ -78,22 +82,26 @@ public class PostgresExecutor { public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) { return dslContext() .flatMap(queryFunction) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)) .then(); } public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) { return dslContext() - .flatMapMany(queryFunction); + .flatMapMany(queryFunction) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)); } public Mono<Record> executeRow(Function<DSLContext, Mono<Record>> queryFunction) { return dslContext() - .flatMap(queryFunction); + .flatMap(queryFunction) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)); } public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) { return dslContext() .flatMap(queryFunction) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)) .map(Record1::value1); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org