This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 701790ec040011269640e13e416c5283ad649322
Author: hung phan <hp...@linagora.com>
AuthorDate: Mon Apr 8 14:16:46 2024 +0700

    JAMES-2586 Implement PoolBackedPostgresConnectionFactory
---
 backends-common/postgres/pom.xml                   |   5 +
 .../backends/postgres/PostgresTableManager.java    | 106 ++++++++++--------
 .../utils/DomainImplPostgresConnectionFactory.java |  18 ++-
 .../utils/JamesPostgresConnectionFactory.java      |   4 +
 .../utils/PoolBackedPostgresConnectionFactory.java |  92 ++++++++++++++++
 .../backends/postgres/utils/PostgresExecutor.java  | 122 ++++++++++++---------
 .../utils/SinglePostgresConnectionFactory.java     |  10 ++
 .../DomainImplPostgresConnectionFactoryTest.java   |   2 +-
 .../PoolBackedPostgresConnectionFactoryTest.java}  |  24 ++--
 9 files changed, 269 insertions(+), 114 deletions(-)

diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml
index 6d256a2733..e969e22025 100644
--- a/backends-common/postgres/pom.xml
+++ b/backends-common/postgres/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.r2dbc</groupId>
+            <artifactId>r2dbc-pool</artifactId>
+            <version>1.0.1.RELEASE</version>
+        </dependency>
         <dependency>
             <groupId>jakarta.annotation</groupId>
             <artifactId>jakarta.annotation-api</artifactId>
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java
index 5947579da1..2bff154c6c 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java
@@ -20,6 +20,7 @@
 package org.apache.james.backends.postgres;
 
 import java.util.List;
+import java.util.Optional;
 
 import jakarta.inject.Inject;
 
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.r2dbc.spi.Connection;
 import io.r2dbc.spi.Result;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -68,37 +70,43 @@ public class PostgresTableManager implements Startable {
     }
 
     public Mono<Void> initializePostgresExtension() {
-        return postgresExecutor.connection()
-            .flatMapMany(connection -> connection.createStatement("CREATE 
EXTENSION IF NOT EXISTS hstore")
-                .execute())
-            .flatMap(Result::getRowsUpdated)
-            .then();
+        return 
Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()),
+            connection -> Mono.just(connection)
+                .flatMapMany(pgConnection -> 
pgConnection.createStatement("CREATE EXTENSION IF NOT EXISTS hstore")
+                    .execute())
+                .flatMap(Result::getRowsUpdated)
+                .then(),
+            connection -> 
postgresExecutor.connectionFactory().closeConnection(connection));
     }
 
     public Mono<Void> initializeTables() {
-        return postgresExecutor.dslContext()
-            .flatMapMany(dsl -> listExistTables()
-                .flatMapMany(existTables -> Flux.fromIterable(module.tables())
-                    .filter(table -> !existTables.contains(table.getName()))
-                    .flatMap(table -> createAndAlterTable(table, dsl))))
-            .then();
+        return 
Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()),
+            connection -> postgresExecutor.dslContext(connection)
+                .flatMapMany(dsl -> listExistTables()
+                    .flatMapMany(existTables -> 
Flux.fromIterable(module.tables())
+                        .filter(table -> 
!existTables.contains(table.getName()))
+                        .flatMap(table -> createAndAlterTable(table, dsl, 
connection))))
+                .then(),
+            connection -> 
postgresExecutor.connectionFactory().closeConnection(connection));
     }
 
-    private Mono<Void> createAndAlterTable(PostgresTable table, DSLContext 
dsl) {
+    private Mono<Void> createAndAlterTable(PostgresTable table, DSLContext 
dsl, Connection connection) {
         return Mono.from(table.getCreateTableStepFunction().apply(dsl))
-            .then(alterTableIfNeeded(table))
+            .then(alterTableIfNeeded(table, connection))
             .doOnSuccess(any -> LOGGER.info("Table {} created", 
table.getName()))
             .onErrorResume(exception -> handleTableCreationException(table, 
exception));
     }
 
     public Mono<List<String>> listExistTables() {
-        return postgresExecutor.dslContext()
-            .flatMapMany(d -> Flux.from(d.select(DSL.field("tablename"))
-                .from("pg_tables")
-                .where(DSL.field("schemaname")
-                    .eq(DSL.currentSchema()))))
-            .map(r -> r.get(0, String.class))
-            .collectList();
+        return 
Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()),
+            connection -> postgresExecutor.dslContext(connection)
+                .flatMapMany(d -> Flux.from(d.select(DSL.field("tablename"))
+                    .from("pg_tables")
+                    .where(DSL.field("schemaname")
+                        .eq(DSL.currentSchema()))))
+                .map(r -> r.get(0, String.class))
+                .collectList(),
+            connection -> 
postgresExecutor.connectionFactory().closeConnection(connection));
     }
 
     private Mono<Void> handleTableCreationException(PostgresTable table, 
Throwable e) {
@@ -109,15 +117,15 @@ public class PostgresTableManager implements Startable {
         return Mono.error(e);
     }
 
-    private Mono<Void> alterTableIfNeeded(PostgresTable table) {
-        return executeAdditionalAlterQueries(table)
-            .then(enableRLSIfNeeded(table));
+    private Mono<Void> alterTableIfNeeded(PostgresTable table, Connection 
connection) {
+        return executeAdditionalAlterQueries(table, connection)
+            .then(enableRLSIfNeeded(table, connection));
     }
 
-    private Mono<Void> executeAdditionalAlterQueries(PostgresTable table) {
+    private Mono<Void> executeAdditionalAlterQueries(PostgresTable table, 
Connection connection) {
         return Flux.fromIterable(table.getAdditionalAlterQueries())
-            .concatMap(alterSQLQuery -> postgresExecutor.connection()
-                .flatMapMany(connection -> 
connection.createStatement(alterSQLQuery)
+            .concatMap(alterSQLQuery -> Mono.just(connection)
+                .flatMapMany(pgConnection -> 
pgConnection.createStatement(alterSQLQuery)
                     .execute())
                 .flatMap(Result::getRowsUpdated)
                 .then()
@@ -131,16 +139,16 @@ public class PostgresTableManager implements Startable {
             .then();
     }
 
-    private Mono<Void> enableRLSIfNeeded(PostgresTable table) {
+    private Mono<Void> enableRLSIfNeeded(PostgresTable table, Connection 
connection) {
         if (rowLevelSecurityEnabled && table.supportsRowLevelSecurity()) {
-            return alterTableEnableRLS(table);
+            return alterTableEnableRLS(table, connection);
         }
         return Mono.empty();
     }
 
-    public Mono<Void> alterTableEnableRLS(PostgresTable table) {
-        return postgresExecutor.connection()
-            .flatMapMany(connection -> 
connection.createStatement(rowLevelSecurityAlterStatement(table.getName()))
+    private Mono<Void> alterTableEnableRLS(PostgresTable table, Connection 
connection) {
+        return Mono.just(connection)
+            .flatMapMany(pgConnection -> 
pgConnection.createStatement(rowLevelSecurityAlterStatement(table.getName()))
                 .execute())
             .flatMap(Result::getRowsUpdated)
             .then();
@@ -158,25 +166,29 @@ public class PostgresTableManager implements Startable {
     }
 
     public Mono<Void> truncate() {
-        return postgresExecutor.dslContext()
-            .flatMap(dsl -> Flux.fromIterable(module.tables())
-                .flatMap(table -> Mono.from(dsl.truncateTable(table.getName()))
-                    .doOnSuccess(any -> LOGGER.info("Table {} truncated", 
table.getName()))
-                    .doOnError(e -> LOGGER.error("Error while truncating table 
{}", table.getName(), e)))
-                .then());
+        return 
Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()),
+            connection -> postgresExecutor.dslContext(connection)
+                .flatMap(dsl -> Flux.fromIterable(module.tables())
+                    .flatMap(table -> 
Mono.from(dsl.truncateTable(table.getName()))
+                        .doOnSuccess(any -> LOGGER.info("Table {} truncated", 
table.getName()))
+                        .doOnError(e -> LOGGER.error("Error while truncating 
table {}", table.getName(), e)))
+                    .then()),
+            connection -> 
postgresExecutor.connectionFactory().closeConnection(connection));
     }
 
     public Mono<Void> initializeTableIndexes() {
-        return postgresExecutor.dslContext()
-            .flatMapMany(dsl -> listExistIndexes()
-                .flatMapMany(existIndexes -> 
Flux.fromIterable(module.tableIndexes())
-                    .filter(index -> !existIndexes.contains(index.getName()))
-                    .flatMap(index -> createTableIndex(index, dsl))))
-            .then();
-    }
-
-    public Mono<List<String>> listExistIndexes() {
-        return postgresExecutor.dslContext()
+        return 
Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()),
+            connection -> postgresExecutor.dslContext(connection)
+                .flatMapMany(dsl -> listExistIndexes(dsl)
+                    .flatMapMany(existIndexes -> 
Flux.fromIterable(module.tableIndexes())
+                        .filter(index -> 
!existIndexes.contains(index.getName()))
+                        .flatMap(index -> createTableIndex(index, dsl))))
+                .then(),
+            connection -> 
postgresExecutor.connectionFactory().closeConnection(connection));
+    }
+
+    private Mono<List<String>> listExistIndexes(DSLContext dslContext) {
+        return Mono.just(dslContext)
             .flatMapMany(dsl -> Flux.from(dsl.select(DSL.field("indexname"))
                 .from("pg_indexes")
                 .where(DSL.field("schemaname")
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java
index f988b4fcb1..f69dd77569 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import io.r2dbc.spi.Connection;
 import io.r2dbc.spi.ConnectionFactory;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class DomainImplPostgresConnectionFactory implements 
JamesPostgresConnectionFactory {
@@ -46,11 +47,24 @@ public class DomainImplPostgresConnectionFactory implements 
JamesPostgresConnect
         this.connectionFactory = connectionFactory;
     }
 
+    @Override
     public Mono<Connection> getConnection(Optional<Domain> maybeDomain) {
         return maybeDomain.map(this::getConnectionForDomain)
             .orElse(getConnectionForDomain(DEFAULT));
     }
 
+    @Override
+    public Mono<Void> closeConnection(Connection connection) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> close() {
+        return Flux.fromIterable(mapDomainToConnection.values())
+            .flatMap(connection -> Mono.from(connection.close()))
+            .then();
+    }
+
     private Mono<Connection> getConnectionForDomain(Domain domain) {
         return Mono.just(domain)
             .flatMap(domainValue -> Mono.fromCallable(() -> 
mapDomainToConnection.get(domainValue))
@@ -74,14 +88,14 @@ public class DomainImplPostgresConnectionFactory implements 
JamesPostgresConnect
             }).switchIfEmpty(setDomainAttributeForConnection(domain, 
newConnection));
     }
 
-    private static Mono<Connection> setDomainAttributeForConnection(Domain 
domain, Connection newConnection) {
+    private Mono<Connection> setDomainAttributeForConnection(Domain domain, 
Connection newConnection) {
         return Mono.from(newConnection.createStatement("SET " + 
DOMAIN_ATTRIBUTE + " TO '" + getDomainAttributeValue(domain) + "'") // It 
should be set value via Bind, but it doesn't work
                 .execute())
             .doOnError(e -> LOGGER.error("Error while setting domain attribute 
for domain {}", domain, e))
             .then(Mono.just(newConnection));
     }
 
-    private static String getDomainAttributeValue(Domain domain) {
+    private String getDomainAttributeValue(Domain domain) {
         if (DEFAULT.equals(domain)) {
             return DEFAULT_DOMAIN_ATTRIBUTE_VALUE;
         } else {
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
index c196f80642..7a4fede8a9 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
@@ -35,4 +35,8 @@ public interface JamesPostgresConnectionFactory {
     }
 
     Mono<Connection> getConnection(Optional<Domain> domain);
+
+    Mono<Void> closeConnection(Connection connection);
+
+    Mono<Void> close();
 }
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java
new file mode 100644
index 0000000000..5441a1f4fe
--- /dev/null
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java
@@ -0,0 +1,92 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres.utils;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.core.Domain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.r2dbc.pool.ConnectionPool;
+import io.r2dbc.pool.ConnectionPoolConfiguration;
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
+import reactor.core.publisher.Mono;
+
+public class PoolBackedPostgresConnectionFactory implements 
JamesPostgresConnectionFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PoolBackedPostgresConnectionFactory.class);
+    private static final Domain DEFAULT = Domain.of("default");
+    private static final String DEFAULT_DOMAIN_ATTRIBUTE_VALUE = "";
+    private static final int INITIAL_SIZE = 10;
+    private static final int MAX_SIZE = 50;
+    private static final Duration MAX_IDLE_TIME = Duration.ofMillis(5000);
+
+    private final boolean rowLevelSecurityEnabled;
+    private final ConnectionPool pool;
+
+    @Inject
+    public PoolBackedPostgresConnectionFactory(boolean 
rowLevelSecurityEnabled, ConnectionFactory connectionFactory) {
+        this.rowLevelSecurityEnabled = rowLevelSecurityEnabled;
+        final ConnectionPoolConfiguration configuration = 
ConnectionPoolConfiguration.builder(connectionFactory)
+            .maxIdleTime(MAX_IDLE_TIME)
+            .initialSize(INITIAL_SIZE)
+            .maxSize(MAX_SIZE)
+            .build();
+        pool = new ConnectionPool(configuration);
+    }
+
+    @Override
+    public Mono<Connection> getConnection(Optional<Domain> domain) {
+        if (rowLevelSecurityEnabled) {
+            return pool.create().flatMap(connection -> 
setDomainAttributeForConnection(domain.orElse(DEFAULT), connection));
+        } else {
+            return pool.create();
+        }
+    }
+
+    @Override
+    public Mono<Void> closeConnection(Connection connection) {
+        return Mono.from(connection.close());
+    }
+
+    @Override
+    public Mono<Void> close() {
+        return pool.close();
+    }
+
+    private Mono<Connection> setDomainAttributeForConnection(Domain domain, 
Connection connection) {
+        return Mono.from(connection.createStatement("SET " + DOMAIN_ATTRIBUTE 
+ " TO '" + getDomainAttributeValue(domain) + "'") // It should be set value 
via Bind, but it doesn't work
+                .execute())
+            .doOnError(e -> LOGGER.error("Error while setting domain attribute 
for domain {}", domain, e))
+            .then(Mono.just(connection));
+    }
+
+    private String getDomainAttributeValue(Domain domain) {
+        if (DEFAULT.equals(domain)) {
+            return DEFAULT_DOMAIN_ATTRIBUTE_VALUE;
+        } else {
+            return domain.asString();
+        }
+    }
+}
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 27b81cce40..195606844b 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -79,7 +79,7 @@ public class PostgresExecutor {
         }
 
         public PostgresExecutor create(Optional<Domain> domain) {
-            return new 
PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), 
postgresConfiguration, metricFactory);
+            return new PostgresExecutor(domain, 
jamesPostgresConnectionFactory, postgresConfiguration, metricFactory);
         }
 
         public PostgresExecutor create() {
@@ -92,65 +92,81 @@ public class PostgresExecutor {
         .withRenderFormatted(true)
         .withStatementType(StatementType.PREPARED_STATEMENT);
 
-    private final Mono<Connection> connection;
+    private final Optional<Domain> domain;
+    private final JamesPostgresConnectionFactory 
jamesPostgresConnectionFactory;
     private final PostgresConfiguration postgresConfiguration;
     private final MetricFactory metricFactory;
 
-    private PostgresExecutor(Mono<Connection> connection,
+    private PostgresExecutor(Optional<Domain> domain,
+                             JamesPostgresConnectionFactory 
jamesPostgresConnectionFactory,
                              PostgresConfiguration postgresConfiguration,
                              MetricFactory metricFactory) {
-        this.connection = connection;
+        this.domain = domain;
+        this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory;
         this.postgresConfiguration = postgresConfiguration;
         this.metricFactory = metricFactory;
     }
 
     public Mono<DSLContext> dslContext() {
-        return connection.map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS));
+        return jamesPostgresConnectionFactory.getConnection(domain)
+            .map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS));
+    }
+
+    public Mono<DSLContext> dslContext(Connection connection) {
+        return Mono.fromCallable(() -> DSL.using(connection, PGSQL_DIALECT, 
SETTINGS));
     }
 
     public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) 
{
         return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMap(queryFunction)
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))
-                .then()));
+            
Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMap(queryFunction)
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException()))
+                    .then(),
+                jamesPostgresConnectionFactory::closeConnection)));
     }
 
     public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> 
queryFunction) {
         return 
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMapMany(queryFunction)
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .collectList()
-                .flatMapIterable(list -> list) // Mitigation fix for 
https://github.com/jOOQ/jOOQ/issues/16556
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))));
+            
Flux.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMapMany(queryFunction)
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .collectList()
+                    .flatMapIterable(list -> list) // Mitigation fix for 
https://github.com/jOOQ/jOOQ/issues/16556
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException())),
+                jamesPostgresConnectionFactory::closeConnection)));
     }
 
     public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, 
DeleteResultStep<Record>> queryFunction) {
         return 
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMapMany(queryFunction)
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .collectList()
-                .flatMapIterable(list -> list) // The convert Flux -> 
Mono<List> -> Flux to avoid a hanging issue. See: 
https://github.com/jOOQ/jOOQ/issues/16055
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))));
+            
Flux.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMapMany(queryFunction)
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .collectList()
+                    .flatMapIterable(list -> list) // The convert Flux -> 
Mono<List> -> Flux to avoid a hanging issue. See: 
https://github.com/jOOQ/jOOQ/issues/16055
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException())),
+                jamesPostgresConnectionFactory::closeConnection)));
     }
 
     public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> 
queryFunction) {
         return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMap(queryFunction.andThen(Mono::from))
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))));
+            
Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMap(queryFunction.andThen(Mono::from))
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException())),
+                jamesPostgresConnectionFactory::closeConnection)));
     }
 
     public Mono<Optional<Record>> 
executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction) 
{
@@ -161,13 +177,15 @@ public class PostgresExecutor {
 
     public Mono<Integer> executeCount(Function<DSLContext, 
Mono<Record1<Integer>>> queryFunction) {
         return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMap(queryFunction)
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))
-                .map(Record1::value1)));
+            
Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMap(queryFunction)
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException()))
+                    .map(Record1::value1),
+                jamesPostgresConnectionFactory::closeConnection)));
     }
 
     public Mono<Boolean> executeExists(Function<DSLContext, 
SelectConditionStep<?>> queryFunction) {
@@ -177,21 +195,27 @@ public class PostgresExecutor {
 
     public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, 
Mono<Integer>> queryFunction) {
         return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
-            dslContext()
-                .flatMap(queryFunction)
-                .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                    .filter(preparedStatementConflictException()))));
+            
Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain),
+                connection -> dslContext(connection)
+                    .flatMap(queryFunction)
+                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                        .filter(preparedStatementConflictException())),
+                jamesPostgresConnectionFactory::closeConnection)));
+    }
+
+    public JamesPostgresConnectionFactory connectionFactory() {
+        return jamesPostgresConnectionFactory;
     }
 
     public Mono<Connection> connection() {
-        return connection;
+        return jamesPostgresConnectionFactory.getConnection(domain);
     }
 
     @VisibleForTesting
     public Mono<Void> dispose() {
-        return connection.flatMap(con -> Mono.from(con.close()));
+        return jamesPostgresConnectionFactory.close();
     }
 
     private Predicate<Throwable> preparedStatementConflictException() {
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
index 58f1dc72f8..3972a27dbd 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
@@ -37,4 +37,14 @@ public class SinglePostgresConnectionFactory implements 
JamesPostgresConnectionF
     public Mono<Connection> getConnection(Optional<Domain> domain) {
         return Mono.just(connection);
     }
+
+    @Override
+    public Mono<Void> closeConnection(Connection connection) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> close() {
+        return Mono.from(connection.close());
+    }
 }
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java
index dc4b320953..ec79ba3d23 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java
@@ -28,8 +28,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
 import 
org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
 import org.apache.james.core.Domain;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.jetbrains.annotations.Nullable;
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java
similarity index 65%
copy from 
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
copy to 
backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java
index 58f1dc72f8..31bd7afc46 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java
@@ -17,24 +17,18 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.backends.postgres;
 
-import java.util.Optional;
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
+import 
org.apache.james.backends.postgres.utils.PoolBackedPostgresConnectionFactory;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-import org.apache.james.core.Domain;
-
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
-
-public class SinglePostgresConnectionFactory implements 
JamesPostgresConnectionFactory {
-    private final Connection connection;
-
-    public SinglePostgresConnectionFactory(Connection connection) {
-        this.connection = connection;
-    }
+public class PoolBackedPostgresConnectionFactoryTest extends 
JamesPostgresConnectionFactoryTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = PostgresExtension.empty();
 
     @Override
-    public Mono<Connection> getConnection(Optional<Domain> domain) {
-        return Mono.just(connection);
+    JamesPostgresConnectionFactory jamesPostgresConnectionFactory() {
+        return new PoolBackedPostgresConnectionFactory(true, 
postgresExtension.getConnectionFactory());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to