This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 701790ec040011269640e13e416c5283ad649322 Author: hung phan <hp...@linagora.com> AuthorDate: Mon Apr 8 14:16:46 2024 +0700 JAMES-2586 Implement PoolBackedPostgresConnectionFactory --- backends-common/postgres/pom.xml | 5 + .../backends/postgres/PostgresTableManager.java | 106 ++++++++++-------- .../utils/DomainImplPostgresConnectionFactory.java | 18 ++- .../utils/JamesPostgresConnectionFactory.java | 4 + .../utils/PoolBackedPostgresConnectionFactory.java | 92 ++++++++++++++++ .../backends/postgres/utils/PostgresExecutor.java | 122 ++++++++++++--------- .../utils/SinglePostgresConnectionFactory.java | 10 ++ .../DomainImplPostgresConnectionFactoryTest.java | 2 +- .../PoolBackedPostgresConnectionFactoryTest.java} | 24 ++-- 9 files changed, 269 insertions(+), 114 deletions(-) diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml index 6d256a2733..e969e22025 100644 --- a/backends-common/postgres/pom.xml +++ b/backends-common/postgres/pom.xml @@ -61,6 +61,11 @@ <artifactId>testing-base</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.r2dbc</groupId> + <artifactId>r2dbc-pool</artifactId> + <version>1.0.1.RELEASE</version> + </dependency> <dependency> <groupId>jakarta.annotation</groupId> <artifactId>jakarta.annotation-api</artifactId> diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java index 5947579da1..2bff154c6c 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresTableManager.java @@ -20,6 +20,7 @@ package org.apache.james.backends.postgres; import java.util.List; +import java.util.Optional; import jakarta.inject.Inject; @@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import io.r2dbc.spi.Connection; import io.r2dbc.spi.Result; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -68,37 +70,43 @@ public class PostgresTableManager implements Startable { } public Mono<Void> initializePostgresExtension() { - return postgresExecutor.connection() - .flatMapMany(connection -> connection.createStatement("CREATE EXTENSION IF NOT EXISTS hstore") - .execute()) - .flatMap(Result::getRowsUpdated) - .then(); + return Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()), + connection -> Mono.just(connection) + .flatMapMany(pgConnection -> pgConnection.createStatement("CREATE EXTENSION IF NOT EXISTS hstore") + .execute()) + .flatMap(Result::getRowsUpdated) + .then(), + connection -> postgresExecutor.connectionFactory().closeConnection(connection)); } public Mono<Void> initializeTables() { - return postgresExecutor.dslContext() - .flatMapMany(dsl -> listExistTables() - .flatMapMany(existTables -> Flux.fromIterable(module.tables()) - .filter(table -> !existTables.contains(table.getName())) - .flatMap(table -> createAndAlterTable(table, dsl)))) - .then(); + return Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()), + connection -> postgresExecutor.dslContext(connection) + .flatMapMany(dsl -> listExistTables() + .flatMapMany(existTables -> Flux.fromIterable(module.tables()) + .filter(table -> !existTables.contains(table.getName())) + .flatMap(table -> createAndAlterTable(table, dsl, connection)))) + .then(), + connection -> postgresExecutor.connectionFactory().closeConnection(connection)); } - private Mono<Void> createAndAlterTable(PostgresTable table, DSLContext dsl) { + private Mono<Void> createAndAlterTable(PostgresTable table, DSLContext dsl, Connection connection) { return Mono.from(table.getCreateTableStepFunction().apply(dsl)) - .then(alterTableIfNeeded(table)) + .then(alterTableIfNeeded(table, connection)) .doOnSuccess(any -> LOGGER.info("Table {} created", table.getName())) .onErrorResume(exception -> handleTableCreationException(table, exception)); } public Mono<List<String>> listExistTables() { - return postgresExecutor.dslContext() - .flatMapMany(d -> Flux.from(d.select(DSL.field("tablename")) - .from("pg_tables") - .where(DSL.field("schemaname") - .eq(DSL.currentSchema())))) - .map(r -> r.get(0, String.class)) - .collectList(); + return Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()), + connection -> postgresExecutor.dslContext(connection) + .flatMapMany(d -> Flux.from(d.select(DSL.field("tablename")) + .from("pg_tables") + .where(DSL.field("schemaname") + .eq(DSL.currentSchema())))) + .map(r -> r.get(0, String.class)) + .collectList(), + connection -> postgresExecutor.connectionFactory().closeConnection(connection)); } private Mono<Void> handleTableCreationException(PostgresTable table, Throwable e) { @@ -109,15 +117,15 @@ public class PostgresTableManager implements Startable { return Mono.error(e); } - private Mono<Void> alterTableIfNeeded(PostgresTable table) { - return executeAdditionalAlterQueries(table) - .then(enableRLSIfNeeded(table)); + private Mono<Void> alterTableIfNeeded(PostgresTable table, Connection connection) { + return executeAdditionalAlterQueries(table, connection) + .then(enableRLSIfNeeded(table, connection)); } - private Mono<Void> executeAdditionalAlterQueries(PostgresTable table) { + private Mono<Void> executeAdditionalAlterQueries(PostgresTable table, Connection connection) { return Flux.fromIterable(table.getAdditionalAlterQueries()) - .concatMap(alterSQLQuery -> postgresExecutor.connection() - .flatMapMany(connection -> connection.createStatement(alterSQLQuery) + .concatMap(alterSQLQuery -> Mono.just(connection) + .flatMapMany(pgConnection -> pgConnection.createStatement(alterSQLQuery) .execute()) .flatMap(Result::getRowsUpdated) .then() @@ -131,16 +139,16 @@ public class PostgresTableManager implements Startable { .then(); } - private Mono<Void> enableRLSIfNeeded(PostgresTable table) { + private Mono<Void> enableRLSIfNeeded(PostgresTable table, Connection connection) { if (rowLevelSecurityEnabled && table.supportsRowLevelSecurity()) { - return alterTableEnableRLS(table); + return alterTableEnableRLS(table, connection); } return Mono.empty(); } - public Mono<Void> alterTableEnableRLS(PostgresTable table) { - return postgresExecutor.connection() - .flatMapMany(connection -> connection.createStatement(rowLevelSecurityAlterStatement(table.getName())) + private Mono<Void> alterTableEnableRLS(PostgresTable table, Connection connection) { + return Mono.just(connection) + .flatMapMany(pgConnection -> pgConnection.createStatement(rowLevelSecurityAlterStatement(table.getName())) .execute()) .flatMap(Result::getRowsUpdated) .then(); @@ -158,25 +166,29 @@ public class PostgresTableManager implements Startable { } public Mono<Void> truncate() { - return postgresExecutor.dslContext() - .flatMap(dsl -> Flux.fromIterable(module.tables()) - .flatMap(table -> Mono.from(dsl.truncateTable(table.getName())) - .doOnSuccess(any -> LOGGER.info("Table {} truncated", table.getName())) - .doOnError(e -> LOGGER.error("Error while truncating table {}", table.getName(), e))) - .then()); + return Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()), + connection -> postgresExecutor.dslContext(connection) + .flatMap(dsl -> Flux.fromIterable(module.tables()) + .flatMap(table -> Mono.from(dsl.truncateTable(table.getName())) + .doOnSuccess(any -> LOGGER.info("Table {} truncated", table.getName())) + .doOnError(e -> LOGGER.error("Error while truncating table {}", table.getName(), e))) + .then()), + connection -> postgresExecutor.connectionFactory().closeConnection(connection)); } public Mono<Void> initializeTableIndexes() { - return postgresExecutor.dslContext() - .flatMapMany(dsl -> listExistIndexes() - .flatMapMany(existIndexes -> Flux.fromIterable(module.tableIndexes()) - .filter(index -> !existIndexes.contains(index.getName())) - .flatMap(index -> createTableIndex(index, dsl)))) - .then(); - } - - public Mono<List<String>> listExistIndexes() { - return postgresExecutor.dslContext() + return Mono.usingWhen(postgresExecutor.connectionFactory().getConnection(Optional.empty()), + connection -> postgresExecutor.dslContext(connection) + .flatMapMany(dsl -> listExistIndexes(dsl) + .flatMapMany(existIndexes -> Flux.fromIterable(module.tableIndexes()) + .filter(index -> !existIndexes.contains(index.getName())) + .flatMap(index -> createTableIndex(index, dsl)))) + .then(), + connection -> postgresExecutor.connectionFactory().closeConnection(connection)); + } + + private Mono<List<String>> listExistIndexes(DSLContext dslContext) { + return Mono.just(dslContext) .flatMapMany(dsl -> Flux.from(dsl.select(DSL.field("indexname")) .from("pg_indexes") .where(DSL.field("schemaname") diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java index f988b4fcb1..f69dd77569 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/DomainImplPostgresConnectionFactory.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class DomainImplPostgresConnectionFactory implements JamesPostgresConnectionFactory { @@ -46,11 +47,24 @@ public class DomainImplPostgresConnectionFactory implements JamesPostgresConnect this.connectionFactory = connectionFactory; } + @Override public Mono<Connection> getConnection(Optional<Domain> maybeDomain) { return maybeDomain.map(this::getConnectionForDomain) .orElse(getConnectionForDomain(DEFAULT)); } + @Override + public Mono<Void> closeConnection(Connection connection) { + return Mono.empty(); + } + + @Override + public Mono<Void> close() { + return Flux.fromIterable(mapDomainToConnection.values()) + .flatMap(connection -> Mono.from(connection.close())) + .then(); + } + private Mono<Connection> getConnectionForDomain(Domain domain) { return Mono.just(domain) .flatMap(domainValue -> Mono.fromCallable(() -> mapDomainToConnection.get(domainValue)) @@ -74,14 +88,14 @@ public class DomainImplPostgresConnectionFactory implements JamesPostgresConnect }).switchIfEmpty(setDomainAttributeForConnection(domain, newConnection)); } - private static Mono<Connection> setDomainAttributeForConnection(Domain domain, Connection newConnection) { + private Mono<Connection> setDomainAttributeForConnection(Domain domain, Connection newConnection) { return Mono.from(newConnection.createStatement("SET " + DOMAIN_ATTRIBUTE + " TO '" + getDomainAttributeValue(domain) + "'") // It should be set value via Bind, but it doesn't work .execute()) .doOnError(e -> LOGGER.error("Error while setting domain attribute for domain {}", domain, e)) .then(Mono.just(newConnection)); } - private static String getDomainAttributeValue(Domain domain) { + private String getDomainAttributeValue(Domain domain) { if (DEFAULT.equals(domain)) { return DEFAULT_DOMAIN_ATTRIBUTE_VALUE; } else { diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java index c196f80642..7a4fede8a9 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java @@ -35,4 +35,8 @@ public interface JamesPostgresConnectionFactory { } Mono<Connection> getConnection(Optional<Domain> domain); + + Mono<Void> closeConnection(Connection connection); + + Mono<Void> close(); } diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java new file mode 100644 index 0000000000..5441a1f4fe --- /dev/null +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PoolBackedPostgresConnectionFactory.java @@ -0,0 +1,92 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.postgres.utils; + +import java.time.Duration; +import java.util.Optional; + +import javax.inject.Inject; + +import org.apache.james.core.Domain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.r2dbc.pool.ConnectionPool; +import io.r2dbc.pool.ConnectionPoolConfiguration; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import reactor.core.publisher.Mono; + +public class PoolBackedPostgresConnectionFactory implements JamesPostgresConnectionFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PoolBackedPostgresConnectionFactory.class); + private static final Domain DEFAULT = Domain.of("default"); + private static final String DEFAULT_DOMAIN_ATTRIBUTE_VALUE = ""; + private static final int INITIAL_SIZE = 10; + private static final int MAX_SIZE = 50; + private static final Duration MAX_IDLE_TIME = Duration.ofMillis(5000); + + private final boolean rowLevelSecurityEnabled; + private final ConnectionPool pool; + + @Inject + public PoolBackedPostgresConnectionFactory(boolean rowLevelSecurityEnabled, ConnectionFactory connectionFactory) { + this.rowLevelSecurityEnabled = rowLevelSecurityEnabled; + final ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory) + .maxIdleTime(MAX_IDLE_TIME) + .initialSize(INITIAL_SIZE) + .maxSize(MAX_SIZE) + .build(); + pool = new ConnectionPool(configuration); + } + + @Override + public Mono<Connection> getConnection(Optional<Domain> domain) { + if (rowLevelSecurityEnabled) { + return pool.create().flatMap(connection -> setDomainAttributeForConnection(domain.orElse(DEFAULT), connection)); + } else { + return pool.create(); + } + } + + @Override + public Mono<Void> closeConnection(Connection connection) { + return Mono.from(connection.close()); + } + + @Override + public Mono<Void> close() { + return pool.close(); + } + + private Mono<Connection> setDomainAttributeForConnection(Domain domain, Connection connection) { + return Mono.from(connection.createStatement("SET " + DOMAIN_ATTRIBUTE + " TO '" + getDomainAttributeValue(domain) + "'") // It should be set value via Bind, but it doesn't work + .execute()) + .doOnError(e -> LOGGER.error("Error while setting domain attribute for domain {}", domain, e)) + .then(Mono.just(connection)); + } + + private String getDomainAttributeValue(Domain domain) { + if (DEFAULT.equals(domain)) { + return DEFAULT_DOMAIN_ATTRIBUTE_VALUE; + } else { + return domain.asString(); + } + } +} diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java index 27b81cce40..195606844b 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java @@ -79,7 +79,7 @@ public class PostgresExecutor { } public PostgresExecutor create(Optional<Domain> domain) { - return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration, metricFactory); + return new PostgresExecutor(domain, jamesPostgresConnectionFactory, postgresConfiguration, metricFactory); } public PostgresExecutor create() { @@ -92,65 +92,81 @@ public class PostgresExecutor { .withRenderFormatted(true) .withStatementType(StatementType.PREPARED_STATEMENT); - private final Mono<Connection> connection; + private final Optional<Domain> domain; + private final JamesPostgresConnectionFactory jamesPostgresConnectionFactory; private final PostgresConfiguration postgresConfiguration; private final MetricFactory metricFactory; - private PostgresExecutor(Mono<Connection> connection, + private PostgresExecutor(Optional<Domain> domain, + JamesPostgresConnectionFactory jamesPostgresConnectionFactory, PostgresConfiguration postgresConfiguration, MetricFactory metricFactory) { - this.connection = connection; + this.domain = domain; + this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory; this.postgresConfiguration = postgresConfiguration; this.metricFactory = metricFactory; } public Mono<DSLContext> dslContext() { - return connection.map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS)); + return jamesPostgresConnectionFactory.getConnection(domain) + .map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS)); + } + + public Mono<DSLContext> dslContext(Connection connection) { + return Mono.fromCallable(() -> DSL.using(connection, PGSQL_DIALECT, SETTINGS)); } public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMap(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())) - .then())); + Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())) + .then(), + jamesPostgresConnectionFactory::closeConnection))); } public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) { return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMapMany(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .collectList() - .flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556 - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())))); + Flux.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMapMany(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .collectList() + .flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556 + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())), + jamesPostgresConnectionFactory::closeConnection))); } public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResultStep<Record>> queryFunction) { return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMapMany(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .collectList() - .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055 - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())))); + Flux.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMapMany(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .collectList() + .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055 + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())), + jamesPostgresConnectionFactory::closeConnection))); } public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> queryFunction) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMap(queryFunction.andThen(Mono::from)) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())))); + Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMap(queryFunction.andThen(Mono::from)) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())), + jamesPostgresConnectionFactory::closeConnection))); } public Mono<Optional<Record>> executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction) { @@ -161,13 +177,15 @@ public class PostgresExecutor { public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMap(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())) - .map(Record1::value1))); + Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())) + .map(Record1::value1), + jamesPostgresConnectionFactory::closeConnection))); } public Mono<Boolean> executeExists(Function<DSLContext, SelectConditionStep<?>> queryFunction) { @@ -177,21 +195,27 @@ public class PostgresExecutor { public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, Mono<Integer>> queryFunction) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", - dslContext() - .flatMap(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())))); + Mono.usingWhen(jamesPostgresConnectionFactory.getConnection(domain), + connection -> dslContext(connection) + .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())), + jamesPostgresConnectionFactory::closeConnection))); + } + + public JamesPostgresConnectionFactory connectionFactory() { + return jamesPostgresConnectionFactory; } public Mono<Connection> connection() { - return connection; + return jamesPostgresConnectionFactory.getConnection(domain); } @VisibleForTesting public Mono<Void> dispose() { - return connection.flatMap(con -> Mono.from(con.close())); + return jamesPostgresConnectionFactory.close(); } private Predicate<Throwable> preparedStatementConflictException() { diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java index 58f1dc72f8..3972a27dbd 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java @@ -37,4 +37,14 @@ public class SinglePostgresConnectionFactory implements JamesPostgresConnectionF public Mono<Connection> getConnection(Optional<Domain> domain) { return Mono.just(connection); } + + @Override + public Mono<Void> closeConnection(Connection connection) { + return Mono.empty(); + } + + @Override + public Mono<Void> close() { + return Mono.from(connection.close()); + } } diff --git a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java index dc4b320953..ec79ba3d23 100644 --- a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java +++ b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DomainImplPostgresConnectionFactoryTest.java @@ -28,8 +28,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory; import org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory; +import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory; import org.apache.james.core.Domain; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.jetbrains.annotations.Nullable; diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java similarity index 65% copy from backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java copy to backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java index 58f1dc72f8..31bd7afc46 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SinglePostgresConnectionFactory.java +++ b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PoolBackedPostgresConnectionFactoryTest.java @@ -17,24 +17,18 @@ * under the License. * ****************************************************************/ -package org.apache.james.backends.postgres.utils; +package org.apache.james.backends.postgres; -import java.util.Optional; +import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory; +import org.apache.james.backends.postgres.utils.PoolBackedPostgresConnectionFactory; +import org.junit.jupiter.api.extension.RegisterExtension; -import org.apache.james.core.Domain; - -import io.r2dbc.spi.Connection; -import reactor.core.publisher.Mono; - -public class SinglePostgresConnectionFactory implements JamesPostgresConnectionFactory { - private final Connection connection; - - public SinglePostgresConnectionFactory(Connection connection) { - this.connection = connection; - } +public class PoolBackedPostgresConnectionFactoryTest extends JamesPostgresConnectionFactoryTest { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.empty(); @Override - public Mono<Connection> getConnection(Optional<Domain> domain) { - return Mono.just(connection); + JamesPostgresConnectionFactory jamesPostgresConnectionFactory() { + return new PoolBackedPostgresConnectionFactory(true, postgresExtension.getConnectionFactory()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org