This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push:
new 4720ef72f6 JAMES-2586 Create metrics for PostgresExecutor
4720ef72f6 is described below
commit 4720ef72f6ad8571b85fc72b330c9b502e5e8b72
Author: hung phan <[email protected]>
AuthorDate: Wed Apr 10 11:57:00 2024 +0700
JAMES-2586 Create metrics for PostgresExecutor
---
backends-common/postgres/pom.xml | 4 +
.../backends/postgres/utils/PostgresExecutor.java | 103 ++++++++++++---------
.../james/backends/postgres/PostgresExtension.java | 8 +-
...stgresAnnotationMapperRowLevelSecurityTest.java | 3 +-
.../PostgresMailboxMapperRowLevelSecurityTest.java | 4 +-
.../PostgresMessageMapperRowLevelSecurityTest.java | 3 +-
...gresSubscriptionMapperRowLevelSecurityTest.java | 3 +-
.../james/modules/data/PostgresCommonModule.java | 6 +-
8 files changed, 80 insertions(+), 54 deletions(-)
diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml
index 437c49bd53..b3477faa88 100644
--- a/backends-common/postgres/pom.xml
+++ b/backends-common/postgres/pom.xml
@@ -52,6 +52,10 @@
<groupId>${james.groupId}</groupId>
<artifactId>james-server-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>metrics-api</artifactId>
+ </dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index cb63a7e4f7..879708aad7 100644
---
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -32,6 +32,7 @@ import javax.inject.Inject;
import org.apache.james.backends.postgres.PostgresConfiguration;
import org.apache.james.core.Domain;
+import org.apache.james.metrics.api.MetricFactory;
import org.jooq.DSLContext;
import org.jooq.DeleteResultStep;
import org.jooq.Record;
@@ -66,16 +67,19 @@ public class PostgresExecutor {
private final JamesPostgresConnectionFactory
jamesPostgresConnectionFactory;
private final PostgresConfiguration postgresConfiguration;
+ private final MetricFactory metricFactory;
@Inject
public Factory(JamesPostgresConnectionFactory
jamesPostgresConnectionFactory,
- PostgresConfiguration postgresConfiguration) {
+ PostgresConfiguration postgresConfiguration,
+ MetricFactory metricFactory) {
this.jamesPostgresConnectionFactory =
jamesPostgresConnectionFactory;
this.postgresConfiguration = postgresConfiguration;
+ this.metricFactory = metricFactory;
}
public PostgresExecutor create(Optional<Domain> domain) {
- return new
PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain),
postgresConfiguration);
+ return new
PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain),
postgresConfiguration, metricFactory);
}
public PostgresExecutor create() {
@@ -90,11 +94,14 @@ public class PostgresExecutor {
private final Mono<Connection> connection;
private final PostgresConfiguration postgresConfiguration;
+ private final MetricFactory metricFactory;
private PostgresExecutor(Mono<Connection> connection,
- PostgresConfiguration postgresConfiguration) {
+ PostgresConfiguration postgresConfiguration,
+ MetricFactory metricFactory) {
this.connection = connection;
this.postgresConfiguration = postgresConfiguration;
+ this.metricFactory = metricFactory;
}
public Mono<DSLContext> dslContext() {
@@ -102,44 +109,48 @@ public class PostgresExecutor {
}
public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction)
{
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()))
- .then();
+ return
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))
+ .then()));
}
public Flux<Record> executeRows(Function<DSLContext, Flux<Record>>
queryFunction) {
- return dslContext()
- .flatMapMany(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .collectList()
- .flatMapIterable(list -> list) // Mitigation fix for
https://github.com/jOOQ/jOOQ/issues/16556
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMapMany(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .collectList()
+ .flatMapIterable(list -> list) // Mitigation fix for
https://github.com/jOOQ/jOOQ/issues/16556
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Flux<Record> executeDeleteAndReturnList(Function<DSLContext,
DeleteResultStep<Record>> queryFunction) {
- return dslContext()
- .flatMapMany(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .collectList()
- .flatMapIterable(list -> list) // The convert Flux -> Mono<List>
-> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMapMany(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .collectList()
+ .flatMapIterable(list -> list) // The convert Flux ->
Mono<List> -> Flux to avoid a hanging issue. See:
https://github.com/jOOQ/jOOQ/issues/16055
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>>
queryFunction) {
- return dslContext()
- .flatMap(queryFunction.andThen(Mono::from))
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction.andThen(Mono::from))
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Optional<Record>>
executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction)
{
@@ -149,13 +160,14 @@ public class PostgresExecutor {
}
public Mono<Integer> executeCount(Function<DSLContext,
Mono<Record1<Integer>>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()))
- .map(Record1::value1);
+ return
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))
+ .map(Record1::value1)));
}
public Mono<Boolean> executeExists(Function<DSLContext,
SelectConditionStep<?>> queryFunction) {
@@ -164,12 +176,13 @@ public class PostgresExecutor {
}
public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext,
Mono<Integer>> queryFunction) {
- return dslContext()
- .flatMap(queryFunction)
- .timeout(postgresConfiguration.getJooqReactiveTimeout())
- .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
- .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
- .filter(preparedStatementConflictException()));
+ return
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+ dslContext()
+ .flatMap(queryFunction)
+ .timeout(postgresConfiguration.getJooqReactiveTimeout())
+ .doOnError(TimeoutException.class, e ->
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+ .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+ .filter(preparedStatementConflictException()))));
}
public Mono<Connection> connection() {
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 88c21244ce..35a4e9b33b 100644
---
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -30,6 +30,7 @@ import org.apache.james.GuiceModuleTestExtension;
import
org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
import
org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -139,12 +140,13 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
.build());
if (rlsEnabled) {
- executorFactory = new PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration);
+ executorFactory = new PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration,
new RecordingMetricFactory());
} else {
executorFactory = new PostgresExecutor.Factory(new
SinglePostgresConnectionFactory(connectionFactory.create()
.cache()
.cast(Connection.class).block()),
- postgresConfiguration);
+ postgresConfiguration,
+ new RecordingMetricFactory());
}
postgresExecutor = executorFactory.create();
@@ -154,7 +156,7 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
.password(postgresConfiguration.getNonRLSCredential().getPassword())
.build())
.flatMap(configuration -> new
PostgresqlConnectionFactory(configuration).create().cache())
- .map(connection -> new PostgresExecutor.Factory(new
SinglePostgresConnectionFactory(connection), postgresConfiguration).create())
+ .map(connection -> new PostgresExecutor.Factory(new
SinglePostgresConnectionFactory(connection), postgresConfiguration, new
RecordingMetricFactory()).create())
.block();
} else {
nonRLSPostgresExecutor = postgresExecutor;
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
index c6dced4ffe..f23a8c031f 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
@@ -42,6 +42,7 @@ import
org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
import org.apache.james.mailbox.postgres.PostgresMailboxSessionMapperFactory;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
import org.apache.james.utils.UpdatableTickingClock;
import org.junit.jupiter.api.BeforeEach;
@@ -73,7 +74,7 @@ public class PostgresAnnotationMapperRowLevelSecurityTest {
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new
PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration()),
+ postgresExtension.getPostgresConfiguration(), new
RecordingMetricFactory()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(),
BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
index 4680c58d36..cfc7dcc1d1 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
@@ -31,6 +31,7 @@ import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapperFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -45,7 +46,8 @@ public class PostgresMailboxMapperRowLevelSecurityTest {
@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new
PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration());
+ postgresExtension.getPostgresConfiguration(),
+ new RecordingMetricFactory());
mailboxMapperFactory = session -> new PostgresMailboxMapper(new
PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())));
}
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
index 6b472e615e..55743bafb6 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
import org.apache.james.utils.UpdatableTickingClock;
import org.junit.jupiter.api.BeforeEach;
@@ -80,7 +81,7 @@ public class PostgresMessageMapperRowLevelSecurityTest {
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new
PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration()),
+ postgresExtension.getPostgresConfiguration(), new
RecordingMetricFactory()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(),
BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
index daf676b664..a28bd34087 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
@@ -29,6 +29,7 @@ import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.store.user.SubscriptionMapperFactory;
import org.apache.james.mailbox.store.user.model.Subscription;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -42,7 +43,7 @@ public class PostgresSubscriptionMapperRowLevelSecurityTest {
@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new
PostgresExecutor.Factory(new
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
- postgresExtension.getPostgresConfiguration());
+ postgresExtension.getPostgresConfiguration(), new
RecordingMetricFactory());
subscriptionMapperFactory = session -> new
PostgresSubscriptionMapper(new
PostgresSubscriptionDAO(executorFactory.create(session.getUser().getDomainPart())));
}
diff --git
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
index a65fa05b36..a2e882f1e7 100644
---
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
+++
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
@@ -34,6 +34,7 @@ import
org.apache.james.backends.postgres.utils.PostgresExecutor;
import org.apache.james.backends.postgres.utils.PostgresHealthCheck;
import
org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
@@ -141,8 +142,9 @@ public class PostgresCommonModule extends AbstractModule {
@Named(PostgresExecutor.NON_RLS_INJECT)
@Singleton
PostgresExecutor.Factory
postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT)
JamesPostgresConnectionFactory singlePostgresConnectionFactory,
-
PostgresConfiguration postgresConfiguration) {
- return new PostgresExecutor.Factory(singlePostgresConnectionFactory,
postgresConfiguration);
+
PostgresConfiguration postgresConfiguration,
+
MetricFactory metricFactory) {
+ return new PostgresExecutor.Factory(singlePostgresConnectionFactory,
postgresConfiguration, metricFactory);
}
@Provides
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]