This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 4720ef72f6 JAMES-2586 Create metrics for PostgresExecutor
4720ef72f6 is described below

commit 4720ef72f6ad8571b85fc72b330c9b502e5e8b72
Author: hung phan <[email protected]>
AuthorDate: Wed Apr 10 11:57:00 2024 +0700

    JAMES-2586 Create metrics for PostgresExecutor
---
 backends-common/postgres/pom.xml                   |   4 +
 .../backends/postgres/utils/PostgresExecutor.java  | 103 ++++++++++++---------
 .../james/backends/postgres/PostgresExtension.java |   8 +-
 ...stgresAnnotationMapperRowLevelSecurityTest.java |   3 +-
 .../PostgresMailboxMapperRowLevelSecurityTest.java |   4 +-
 .../PostgresMessageMapperRowLevelSecurityTest.java |   3 +-
 ...gresSubscriptionMapperRowLevelSecurityTest.java |   3 +-
 .../james/modules/data/PostgresCommonModule.java   |   6 +-
 8 files changed, 80 insertions(+), 54 deletions(-)

diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml
index 437c49bd53..b3477faa88 100644
--- a/backends-common/postgres/pom.xml
+++ b/backends-common/postgres/pom.xml
@@ -52,6 +52,10 @@
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index cb63a7e4f7..879708aad7 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -32,6 +32,7 @@ import javax.inject.Inject;
 
 import org.apache.james.backends.postgres.PostgresConfiguration;
 import org.apache.james.core.Domain;
+import org.apache.james.metrics.api.MetricFactory;
 import org.jooq.DSLContext;
 import org.jooq.DeleteResultStep;
 import org.jooq.Record;
@@ -66,16 +67,19 @@ public class PostgresExecutor {
 
         private final JamesPostgresConnectionFactory 
jamesPostgresConnectionFactory;
         private final PostgresConfiguration postgresConfiguration;
+        private final MetricFactory metricFactory;
 
         @Inject
         public Factory(JamesPostgresConnectionFactory 
jamesPostgresConnectionFactory,
-                       PostgresConfiguration postgresConfiguration) {
+                       PostgresConfiguration postgresConfiguration,
+                       MetricFactory metricFactory) {
             this.jamesPostgresConnectionFactory = 
jamesPostgresConnectionFactory;
             this.postgresConfiguration = postgresConfiguration;
+            this.metricFactory = metricFactory;
         }
 
         public PostgresExecutor create(Optional<Domain> domain) {
-            return new 
PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), 
postgresConfiguration);
+            return new 
PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), 
postgresConfiguration, metricFactory);
         }
 
         public PostgresExecutor create() {
@@ -90,11 +94,14 @@ public class PostgresExecutor {
 
     private final Mono<Connection> connection;
     private final PostgresConfiguration postgresConfiguration;
+    private final MetricFactory metricFactory;
 
     private PostgresExecutor(Mono<Connection> connection,
-                             PostgresConfiguration postgresConfiguration) {
+                             PostgresConfiguration postgresConfiguration,
+                             MetricFactory metricFactory) {
         this.connection = connection;
         this.postgresConfiguration = postgresConfiguration;
+        this.metricFactory = metricFactory;
     }
 
     public Mono<DSLContext> dslContext() {
@@ -102,44 +109,48 @@ public class PostgresExecutor {
     }
 
     public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) 
{
-        return dslContext()
-            .flatMap(queryFunction)
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()))
-            .then();
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMap(queryFunction)
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))
+                .then()));
     }
 
     public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> 
queryFunction) {
-        return dslContext()
-            .flatMapMany(queryFunction)
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .collectList()
-            .flatMapIterable(list -> list) // Mitigation fix for 
https://github.com/jOOQ/jOOQ/issues/16556
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()));
+        return 
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMapMany(queryFunction)
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .collectList()
+                .flatMapIterable(list -> list) // Mitigation fix for 
https://github.com/jOOQ/jOOQ/issues/16556
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))));
     }
 
     public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, 
DeleteResultStep<Record>> queryFunction) {
-        return dslContext()
-            .flatMapMany(queryFunction)
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .collectList()
-            .flatMapIterable(list -> list) // The convert Flux -> Mono<List> 
-> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()));
+        return 
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMapMany(queryFunction)
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .collectList()
+                .flatMapIterable(list -> list) // The convert Flux -> 
Mono<List> -> Flux to avoid a hanging issue. See: 
https://github.com/jOOQ/jOOQ/issues/16055
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))));
     }
 
     public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> 
queryFunction) {
-        return dslContext()
-            .flatMap(queryFunction.andThen(Mono::from))
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()));
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMap(queryFunction.andThen(Mono::from))
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))));
     }
 
     public Mono<Optional<Record>> 
executeSingleRowOptional(Function<DSLContext, Publisher<Record>> queryFunction) 
{
@@ -149,13 +160,14 @@ public class PostgresExecutor {
     }
 
     public Mono<Integer> executeCount(Function<DSLContext, 
Mono<Record1<Integer>>> queryFunction) {
-        return dslContext()
-            .flatMap(queryFunction)
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()))
-            .map(Record1::value1);
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMap(queryFunction)
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))
+                .map(Record1::value1)));
     }
 
     public Mono<Boolean> executeExists(Function<DSLContext, 
SelectConditionStep<?>> queryFunction) {
@@ -164,12 +176,13 @@ public class PostgresExecutor {
     }
 
     public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, 
Mono<Integer>> queryFunction) {
-        return dslContext()
-            .flatMap(queryFunction)
-            .timeout(postgresConfiguration.getJooqReactiveTimeout())
-            .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-            .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                .filter(preparedStatementConflictException()));
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
+            dslContext()
+                .flatMap(queryFunction)
+                .timeout(postgresConfiguration.getJooqReactiveTimeout())
+                .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
+                    .filter(preparedStatementConflictException()))));
     }
 
     public Mono<Connection> connection() {
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 88c21244ce..35a4e9b33b 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -30,6 +30,7 @@ import org.apache.james.GuiceModuleTestExtension;
 import 
org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
 import 
org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.testcontainers.containers.PostgreSQLContainer;
 
@@ -139,12 +140,13 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
             .build());
 
         if (rlsEnabled) {
-            executorFactory = new PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration);
+            executorFactory = new PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration, 
new RecordingMetricFactory());
         } else {
             executorFactory = new PostgresExecutor.Factory(new 
SinglePostgresConnectionFactory(connectionFactory.create()
                 .cache()
                 .cast(Connection.class).block()),
-                postgresConfiguration);
+                postgresConfiguration,
+                new RecordingMetricFactory());
         }
 
         postgresExecutor = executorFactory.create();
@@ -154,7 +156,7 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
                     
.password(postgresConfiguration.getNonRLSCredential().getPassword())
                     .build())
                 .flatMap(configuration -> new 
PostgresqlConnectionFactory(configuration).create().cache())
-                .map(connection -> new PostgresExecutor.Factory(new 
SinglePostgresConnectionFactory(connection), postgresConfiguration).create())
+                .map(connection -> new PostgresExecutor.Factory(new 
SinglePostgresConnectionFactory(connection), postgresConfiguration, new 
RecordingMetricFactory()).create())
                 .block();
         } else {
             nonRLSPostgresExecutor = postgresExecutor;
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
index c6dced4ffe..f23a8c031f 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresAnnotationMapperRowLevelSecurityTest.java
@@ -42,6 +42,7 @@ import 
org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
 import org.apache.james.mailbox.postgres.PostgresMailboxSessionMapperFactory;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.jupiter.api.BeforeEach;
@@ -73,7 +74,7 @@ public class PostgresAnnotationMapperRowLevelSecurityTest {
     public void setUp() {
         BlobId.Factory blobIdFactory = new HashBlobId.Factory();
         postgresMailboxSessionMapperFactory = new 
PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
-            postgresExtension.getPostgresConfiguration()),
+            postgresExtension.getPostgresConfiguration(), new 
RecordingMetricFactory()),
             new UpdatableTickingClock(Instant.now()),
             new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), 
BucketName.DEFAULT, blobIdFactory),
             blobIdFactory);
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
index 4680c58d36..cfc7dcc1d1 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperRowLevelSecurityTest.java
@@ -31,6 +31,7 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapperFactory;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -45,7 +46,8 @@ public class PostgresMailboxMapperRowLevelSecurityTest {
     @BeforeEach
     public void setUp() {
         PostgresExecutor.Factory executorFactory = new 
PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
-            postgresExtension.getPostgresConfiguration());
+            postgresExtension.getPostgresConfiguration(),
+            new RecordingMetricFactory());
         mailboxMapperFactory = session -> new PostgresMailboxMapper(new 
PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())));
     }
 
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
index 6b472e615e..55743bafb6 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperRowLevelSecurityTest.java
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.jupiter.api.BeforeEach;
@@ -80,7 +81,7 @@ public class PostgresMessageMapperRowLevelSecurityTest {
     public void setUp() {
         BlobId.Factory blobIdFactory = new HashBlobId.Factory();
         postgresMailboxSessionMapperFactory = new 
PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
-            postgresExtension.getPostgresConfiguration()),
+            postgresExtension.getPostgresConfiguration(), new 
RecordingMetricFactory()),
             new UpdatableTickingClock(Instant.now()),
             new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), 
BucketName.DEFAULT, blobIdFactory),
             blobIdFactory);
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
index daf676b664..a28bd34087 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/user/PostgresSubscriptionMapperRowLevelSecurityTest.java
@@ -29,6 +29,7 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
 import org.apache.james.mailbox.store.user.SubscriptionMapperFactory;
 import org.apache.james.mailbox.store.user.model.Subscription;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -42,7 +43,7 @@ public class PostgresSubscriptionMapperRowLevelSecurityTest {
     @BeforeEach
     public void setUp() {
         PostgresExecutor.Factory executorFactory = new 
PostgresExecutor.Factory(new 
DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
-            postgresExtension.getPostgresConfiguration());
+            postgresExtension.getPostgresConfiguration(), new 
RecordingMetricFactory());
         subscriptionMapperFactory = session -> new 
PostgresSubscriptionMapper(new 
PostgresSubscriptionDAO(executorFactory.create(session.getUser().getDomainPart())));
     }
 
diff --git 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
index a65fa05b36..a2e882f1e7 100644
--- 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
+++ 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
@@ -34,6 +34,7 @@ import 
org.apache.james.backends.postgres.utils.PostgresExecutor;
 import org.apache.james.backends.postgres.utils.PostgresHealthCheck;
 import 
org.apache.james.backends.postgres.utils.SinglePostgresConnectionFactory;
 import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
@@ -141,8 +142,9 @@ public class PostgresCommonModule extends AbstractModule {
     @Named(PostgresExecutor.NON_RLS_INJECT)
     @Singleton
     PostgresExecutor.Factory 
postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) 
JamesPostgresConnectionFactory singlePostgresConnectionFactory,
-                                                                  
PostgresConfiguration postgresConfiguration) {
-        return new PostgresExecutor.Factory(singlePostgresConnectionFactory, 
postgresConfiguration);
+                                                                  
PostgresConfiguration postgresConfiguration,
+                                                                  
MetricFactory metricFactory) {
+        return new PostgresExecutor.Factory(singlePostgresConnectionFactory, 
postgresConfiguration, metricFactory);
     }
 
     @Provides


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to