This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f35b9085b88ab96fab2fed824eb4fd72071ac5d2 Author: Quan Tran <[email protected]> AuthorDate: Fri Apr 5 13:12:26 2024 +0700 JAMES-2586 Apply reactor timeout for jOOQ --- .../backends/postgres/PostgresConfiguration.java | 29 +++++++++++++++++--- .../backends/postgres/utils/PostgresExecutor.java | 31 +++++++++++++++++++--- .../james/backends/postgres/PostgresExtension.java | 11 +++++--- ...stgresAnnotationMapperRowLevelSecurityTest.java | 3 ++- .../PostgresMailboxMapperRowLevelSecurityTest.java | 3 ++- .../PostgresMessageMapperRowLevelSecurityTest.java | 3 ++- ...gresSubscriptionMapperRowLevelSecurityTest.java | 3 ++- .../sample-configuration/postgres.properties | 5 +++- .../james/modules/data/PostgresCommonModule.java | 5 ++-- 9 files changed, 76 insertions(+), 17 deletions(-) diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java index 88f91d3d23..e00c803fd3 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java @@ -19,10 +19,13 @@ package org.apache.james.backends.postgres; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.Optional; import org.apache.commons.configuration2.Configuration; +import org.apache.james.util.DurationParser; import com.google.common.base.Preconditions; @@ -44,6 +47,8 @@ public class PostgresConfiguration { public static final String RLS_ENABLED = "row.level.security.enabled"; public static final String SSL_MODE = "ssl.mode"; public static final String SSL_MODE_DEFAULT_VALUE = "allow"; + public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout"; + public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = Duration.ofSeconds(10); public static class Credential { private final String username; @@ -75,6 +80,7 @@ public class PostgresConfiguration { private Optional<String> nonRLSPassword = Optional.empty(); private Optional<Boolean> rowLevelSecurityEnabled = Optional.empty(); private Optional<String> sslMode = Optional.empty(); + private Optional<Duration> jooqReactiveTimeout = Optional.empty(); public Builder databaseName(String databaseName) { this.databaseName = Optional.of(databaseName); @@ -176,6 +182,11 @@ public class PostgresConfiguration { return this; } + public Builder jooqReactiveTimeout(Optional<Duration> jooqReactiveTimeout) { + this.jooqReactiveTimeout = jooqReactiveTimeout; + return this; + } + public PostgresConfiguration build() { Preconditions.checkArgument(username.isPresent() && !username.get().isBlank(), "You need to specify username"); Preconditions.checkArgument(password.isPresent() && !password.get().isBlank(), "You need to specify password"); @@ -192,7 +203,8 @@ public class PostgresConfiguration { new Credential(username.get(), password.get()), new Credential(nonRLSUser.orElse(username.get()), nonRLSPassword.orElse(password.get())), rowLevelSecurityEnabled.orElse(false), - SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE))); + SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)), + jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE)); } } @@ -212,6 +224,8 @@ public class PostgresConfiguration { .nonRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_PASSWORD))) .rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false)) .sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE))) + .jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT)) + .map(value -> DurationParser.parse(value, ChronoUnit.SECONDS))) .build(); } @@ -223,10 +237,11 @@ public class PostgresConfiguration { private final Credential nonRLSCredential; private final boolean rowLevelSecurityEnabled; private final SSLMode sslMode; + private final Duration jooqReactiveTimeout; private PostgresConfiguration(String host, int port, String databaseName, String databaseSchema, Credential credential, Credential nonRLSCredential, boolean rowLevelSecurityEnabled, - SSLMode sslMode) { + SSLMode sslMode, Duration jooqReactiveTimeout) { this.host = host; this.port = port; this.databaseName = databaseName; @@ -235,6 +250,7 @@ public class PostgresConfiguration { this.nonRLSCredential = nonRLSCredential; this.rowLevelSecurityEnabled = rowLevelSecurityEnabled; this.sslMode = sslMode; + this.jooqReactiveTimeout = jooqReactiveTimeout; } public String getHost() { @@ -269,9 +285,13 @@ public class PostgresConfiguration { return sslMode; } + public Duration getJooqReactiveTimeout() { + return jooqReactiveTimeout; + } + @Override public final int hashCode() { - return Objects.hash(host, port, databaseName, databaseSchema, credential, nonRLSCredential, rowLevelSecurityEnabled, sslMode); + return Objects.hash(host, port, databaseName, databaseSchema, credential, nonRLSCredential, rowLevelSecurityEnabled, sslMode, jooqReactiveTimeout); } @Override @@ -286,7 +306,8 @@ public class PostgresConfiguration { && Objects.equals(this.nonRLSCredential, that.nonRLSCredential) && Objects.equals(this.databaseName, that.databaseName) && Objects.equals(this.databaseSchema, that.databaseSchema) - && Objects.equals(this.sslMode, that.sslMode); + && Objects.equals(this.sslMode, that.sslMode) + && Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout); } return false; } diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java index 4bfb730ab0..ed192af428 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java @@ -24,11 +24,13 @@ import static org.jooq.impl.DSL.field; import java.time.Duration; import java.util.Optional; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Predicate; import javax.inject.Inject; +import org.apache.james.backends.postgres.PostgresConfiguration; import org.apache.james.core.Domain; import org.jooq.DSLContext; import org.jooq.DeleteResultStep; @@ -40,6 +42,8 @@ import org.jooq.conf.Settings; import org.jooq.conf.StatementType; import org.jooq.impl.DSL; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -55,18 +59,23 @@ public class PostgresExecutor { public static final String NON_RLS_INJECT = "non_rls"; public static final int MAX_RETRY_ATTEMPTS = 5; public static final Duration MIN_BACKOFF = Duration.ofMillis(1); + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresExecutor.class); + private static final String JOOQ_TIMEOUT_ERROR_LOG = "Time out executing Postgres query. May need to check either jOOQ reactive issue or Postgres DB performance."; public static class Factory { private final JamesPostgresConnectionFactory jamesPostgresConnectionFactory; + private final PostgresConfiguration postgresConfiguration; @Inject - public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory) { + public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory, + PostgresConfiguration postgresConfiguration) { this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory; + this.postgresConfiguration = postgresConfiguration; } public PostgresExecutor create(Optional<Domain> domain) { - return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain)); + return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration); } public PostgresExecutor create() { @@ -78,10 +87,14 @@ public class PostgresExecutor { private static final Settings SETTINGS = new Settings() .withRenderFormatted(true) .withStatementType(StatementType.PREPARED_STATEMENT); + private final Mono<Connection> connection; + private final PostgresConfiguration postgresConfiguration; - private PostgresExecutor(Mono<Connection> connection) { + private PostgresExecutor(Mono<Connection> connection, + PostgresConfiguration postgresConfiguration) { this.connection = connection; + this.postgresConfiguration = postgresConfiguration; } public Mono<DSLContext> dslContext() { @@ -91,6 +104,8 @@ public class PostgresExecutor { public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) { return dslContext() .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())) .then(); @@ -99,6 +114,8 @@ public class PostgresExecutor { public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) { return dslContext() .flatMapMany(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())); } @@ -106,6 +123,8 @@ public class PostgresExecutor { public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResultStep<Record>> queryFunction) { return dslContext() .flatMapMany(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .collectList() .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055 .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) @@ -115,6 +134,8 @@ public class PostgresExecutor { public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> queryFunction) { return dslContext() .flatMap(queryFunction.andThen(Mono::from)) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())); } @@ -128,6 +149,8 @@ public class PostgresExecutor { public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) { return dslContext() .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())) .map(Record1::value1); @@ -141,6 +164,8 @@ public class PostgresExecutor { public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, Mono<Integer>> queryFunction) { return dslContext() .flatMap(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())); } diff --git a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java index 1f6f0a200c..88c21244ce 100644 --- a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java +++ b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java @@ -139,11 +139,12 @@ public class PostgresExtension implements GuiceModuleTestExtension { .build()); if (rlsEnabled) { - executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory)); + executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration); } else { executorFactory = new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connectionFactory.create() .cache() - .cast(Connection.class).block())); + .cast(Connection.class).block()), + postgresConfiguration); } postgresExecutor = executorFactory.create(); @@ -153,7 +154,7 @@ public class PostgresExtension implements GuiceModuleTestExtension { .password(postgresConfiguration.getNonRLSCredential().getPassword()) .build()) .flatMap(configuration -> new PostgresqlConnectionFactory(configuration).create().cache()) - .map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection)).create()) + .map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection), postgresConfiguration).create()) .block(); } else { nonRLSPostgresExecutor = postgresExecutor; @@ -225,6 +226,10 @@ public class PostgresExtension implements GuiceModuleTestExtension { return executorFactory; } + public PostgresConfiguration getPostgresConfiguration() { + return postgresConfiguration; + } + private void initTablesAndIndexes() { postgresTableManager.initializeTables().block(); postgresTableManager.initializeTableIndexes().block(); diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java index 826201eea7..c6dced4ffe 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java @@ -72,7 +72,8 @@ public class PostgresAnnotationMapperRowLevelSecurityTest { @BeforeEach public void setUp() { BlobId.Factory blobIdFactory = new HashBlobId.Factory(); - postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())), + postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()), + postgresExtension.getPostgresConfiguration()), new UpdatableTickingClock(Instant.now()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory), blobIdFactory); diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java index bdf719dfe2..4680c58d36 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java @@ -44,7 +44,8 @@ public class PostgresMailboxMapperRowLevelSecurityTest { @BeforeEach public void setUp() { - PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())); + PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()), + postgresExtension.getPostgresConfiguration()); mailboxMapperFactory = session -> new PostgresMailboxMapper(new PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart()))); } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java index 43601491e0..6b472e615e 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java @@ -79,7 +79,8 @@ public class PostgresMessageMapperRowLevelSecurityTest { @BeforeEach public void setUp() { BlobId.Factory blobIdFactory = new HashBlobId.Factory(); - postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())), + postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()), + postgresExtension.getPostgresConfiguration()), new UpdatableTickingClock(Instant.now()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory), blobIdFactory); diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java index acd0bb2cef..daf676b664 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java @@ -41,7 +41,8 @@ public class PostgresSubscriptionMapperRowLevelSecurityTest { @BeforeEach public void setUp() { - PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())); + PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()), + postgresExtension.getPostgresConfiguration()); subscriptionMapperFactory = session -> new PostgresSubscriptionMapper(new PostgresSubscriptionDAO(executorFactory.create(session.getUser().getDomainPart()))); } diff --git a/server/apps/postgres-app/sample-configuration/postgres.properties b/server/apps/postgres-app/sample-configuration/postgres.properties index 36512aa757..8710adb814 100644 --- a/server/apps/postgres-app/sample-configuration/postgres.properties +++ b/server/apps/postgres-app/sample-configuration/postgres.properties @@ -27,4 +27,7 @@ row.level.security.enabled=false # String. Optional, defaults to allow. SSLMode required to connect to the Postgresql db server. # Check https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION for a list of supported SSLModes. -ssl.mode=allow \ No newline at end of file +ssl.mode=allow + +## Duration. Optional, defaults to 10 second. jOOQ reactive timeout when executing Postgres query. This setting prevent jooq reactive bug from causing hanging issue. +#jooq.reactive.timeout=10second \ No newline at end of file diff --git a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java index bc03e224ee..a65fa05b36 100644 --- a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java +++ b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java @@ -140,8 +140,9 @@ public class PostgresCommonModule extends AbstractModule { @Provides @Named(PostgresExecutor.NON_RLS_INJECT) @Singleton - PostgresExecutor.Factory postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) JamesPostgresConnectionFactory singlePostgresConnectionFactory) { - return new PostgresExecutor.Factory(singlePostgresConnectionFactory); + PostgresExecutor.Factory postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) JamesPostgresConnectionFactory singlePostgresConnectionFactory, + PostgresConfiguration postgresConfiguration) { + return new PostgresExecutor.Factory(singlePostgresConnectionFactory, postgresConfiguration); } @Provides --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
