This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 9190994801 JAMES-2586 implement pg connection factory (#1774)
9190994801 is described below

commit 9190994801b67405cab2e4c0ab157a6317f4d6da
Author: hungphan227 <[email protected]>
AuthorDate: Wed Nov 8 13:51:35 2023 +0700

    JAMES-2586 implement pg connection factory (#1774)
---
 .../utils/JamesPostgresConnectionFactory.java      |  37 ++++
 .../SimpleJamesPostgresConnectionFactory.java      |  83 ++++++++
 .../postgres/ConnectionThreadSafetyTest.java       | 219 +++++++++++++++++++++
 .../JamesPostgresConnectionFactoryTest.java        |  96 +++++++++
 .../james/backends/postgres/PostgresExtension.java |  13 +-
 .../SimpleJamesPostgresConnectionFactoryTest.java  | 146 ++++++++++++++
 6 files changed, 593 insertions(+), 1 deletion(-)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
new file mode 100644
index 0000000000..8d8391e209
--- /dev/null
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres.utils;
+
+import java.util.Optional;
+
+import org.apache.james.core.Domain;
+
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Mono;
+
+public interface JamesPostgresConnectionFactory {
+    String DOMAIN_ATTRIBUTE = "app.current_domain";
+
+    default Mono<Connection> getConnection(Domain domain) {
+        return getConnection(Optional.ofNullable(domain));
+    }
+
+    Mono<Connection> getConnection(Optional<Domain> domain);
+}
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
new file mode 100644
index 0000000000..edfba85ce9
--- /dev/null
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
@@ -0,0 +1,83 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.core.Domain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
+import reactor.core.publisher.Mono;
+
+public class SimpleJamesPostgresConnectionFactory implements 
JamesPostgresConnectionFactory {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleJamesPostgresConnectionFactory.class);
+    private static final Domain DEFAULT = Domain.of("default");
+
+    private final ConnectionFactory connectionFactory;
+    private final Map<Domain, Connection> mapDomainToConnection = new 
ConcurrentHashMap<>();
+
+    public SimpleJamesPostgresConnectionFactory(ConnectionFactory 
connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public Mono<Connection> getConnection(Optional<Domain> maybeDomain) {
+        return maybeDomain.map(this::getConnectionForDomain)
+            .orElse(getConnectionForDomain(DEFAULT));
+    }
+
+    private Mono<Connection> getConnectionForDomain(Domain domain) {
+        return Mono.just(domain)
+            .flatMap(domainValue -> Mono.fromCallable(() -> 
mapDomainToConnection.get(domainValue))
+                .switchIfEmpty(create(domainValue)));
+    }
+
+    private Mono<Connection> create(Domain domain) {
+        return Mono.from(connectionFactory.create())
+            .doOnError(e -> LOGGER.error("Error while creating connection for 
domain {}", domain, e))
+            .flatMap(newConnection -> getAndSetConnection(domain, 
newConnection));
+    }
+
+    private Mono<Connection> getAndSetConnection(Domain domain, Connection 
newConnection) {
+        return Mono.justOrEmpty(mapDomainToConnection.putIfAbsent(domain, 
newConnection))
+            .map(postgresqlConnection -> {
+                //close redundant connection
+                Mono.from(newConnection.close())
+                    .doOnError(e -> LOGGER.error("Error while closing 
connection for domain {}", domain, e))
+                    .subscribe();
+                return postgresqlConnection;
+            }).switchIfEmpty(setDomainAttributeForConnection(domain, 
newConnection));
+    }
+
+    private static Mono<Connection> setDomainAttributeForConnection(Domain 
domain, Connection newConnection) {
+        if (DEFAULT.equals(domain)) {
+            return Mono.just(newConnection);
+        } else {
+            return Mono.from(newConnection.createStatement("SET " + 
DOMAIN_ATTRIBUTE + " TO '" + domain.asString() + "'") // It should be set value 
via Bind, but it doesn't work
+                .execute())
+                .doOnError(e -> LOGGER.error("Error while setting domain 
attribute for domain {}", domain, e))
+                .then(Mono.just(newConnection));
+        }
+    }
+}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
new file mode 100644
index 0000000000..20eedcee4d
--- /dev/null
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
@@ -0,0 +1,219 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import 
org.apache.james.backends.postgres.utils.SimpleJamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import io.r2dbc.postgresql.api.PostgresqlConnection;
+import io.r2dbc.postgresql.api.PostgresqlResult;
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.Result;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ConnectionThreadSafetyTest {
+    static final int NUMBER_OF_THREAD = 100;
+    static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS 
person (\n" +
+        "\tid serial PRIMARY KEY,\n" +
+        "\tname VARCHAR ( 50 ) UNIQUE NOT NULL\n" +
+        ");";
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = new PostgresExtension();
+
+    private static PostgresqlConnection postgresqlConnection;
+    private static SimpleJamesPostgresConnectionFactory 
jamesPostgresConnectionFactory;
+
+    @BeforeAll
+    static void beforeAll() {
+        jamesPostgresConnectionFactory = new 
SimpleJamesPostgresConnectionFactory(postgresExtension.getConnectionFactory());
+        postgresqlConnection = (PostgresqlConnection) 
postgresExtension.getConnection().block();
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        postgresqlConnection.createStatement(CREATE_TABLE_STATEMENT)
+            .execute()
+            .flatMap(PostgresqlResult::getRowsUpdated)
+            .then()
+            .block();
+    }
+
+    @AfterEach
+    void afterEach() {
+        postgresqlConnection.createStatement("DROP TABLE person")
+            .execute()
+            .flatMap(PostgresqlResult::getRowsUpdated)
+            .then()
+            .block();
+    }
+
+    @Test
+    void 
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndAllQueriesAreSelect() 
throws Exception {
+        createData(NUMBER_OF_THREAD);
+
+        Connection connection = 
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+        List<String> actual = new Vector<>();
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> getData(connection, 
threadNumber)
+                .doOnNext(s -> actual.add(s))
+                .then())
+            .threadCount(NUMBER_OF_THREAD)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        Set<String> expected = Stream.iterate(0, i -> i + 
1).limit(NUMBER_OF_THREAD).map(i -> i + "|Peter" + 
i).collect(ImmutableSet.toImmutableSet());
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    @Test
+    void 
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndAllQueriesAreInsert() 
throws Exception {
+        Connection connection = 
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> createData(connection, 
threadNumber))
+            .threadCount(NUMBER_OF_THREAD)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        List<String> actual = getData(0, NUMBER_OF_THREAD);
+        Set<String> expected = Stream.iterate(0, i -> i + 
1).limit(NUMBER_OF_THREAD).map(i -> i + "|Peter" + 
i).collect(ImmutableSet.toImmutableSet());
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    @Test
+    void 
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndInsertQueriesAreDuplicated()
 throws Exception {
+        Connection connection = 
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+        AtomicInteger numberOfSuccess = new AtomicInteger(0);
+        AtomicInteger numberOfFail = new AtomicInteger(0);
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> createData(connection, 
threadNumber % 10)
+                .then(Mono.fromCallable(() -> 
numberOfSuccess.incrementAndGet()))
+                .then()
+                .onErrorResume(throwable -> {
+                    if (throwable.getMessage().contains("duplicate key value 
violates unique constraint")) {
+                        numberOfFail.incrementAndGet();
+                    }
+                    return Mono.empty();
+                }))
+            .threadCount(100)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        List<String> actual = getData(0, 100);
+        Set<String> expected = Stream.iterate(0, i -> i + 1).limit(10).map(i 
-> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+        assertThat(numberOfSuccess.get()).isEqualTo(10);
+        assertThat(numberOfFail.get()).isEqualTo(90);
+    }
+
+    @Test
+    void 
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndQueriesIncludeBothSelectAndInsert()
 throws Exception {
+        createData(50);
+
+        Connection connection = 
jamesPostgresConnectionFactory.getConnection(Optional.empty()).block();
+
+        List<String> actualSelect = new Vector<>();
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> {
+                if (threadNumber < 50) {
+                    return getData(connection, threadNumber)
+                        .doOnNext(s -> actualSelect.add(s))
+                        .then();
+                } else {
+                    return createData(connection, threadNumber);
+                }
+            })
+            .threadCount(NUMBER_OF_THREAD)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        List<String> actualInsert = getData(50, 100);
+
+        Set<String> expectedSelect = Stream.iterate(0, i -> i + 
1).limit(50).map(i -> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+        Set<String> expectedInsert = Stream.iterate(50, i -> i + 
1).limit(50).map(i -> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+
+        
assertThat(actualSelect).containsExactlyInAnyOrderElementsOf(expectedSelect);
+        
assertThat(actualInsert).containsExactlyInAnyOrderElementsOf(expectedInsert);
+    }
+
+    private Flux<String> getData(Connection connection, int threadNumber) {
+        return Flux.from(connection.createStatement("SELECT id, name FROM 
PERSON WHERE id = $1")
+                .bind("$1", threadNumber)
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get("id", 
Long.class) + "|" + row.get("name", String.class)));
+    }
+
+    @NotNull
+    private Mono<Void> createData(Connection connection, int threadNumber) {
+        return Flux.from(connection.createStatement("INSERT INTO person (id, 
name) VALUES ($1, $2)")
+                .bind("$1", threadNumber)
+                .bind("$2", "Peter" + threadNumber)
+                .execute())
+            .flatMap(Result::getRowsUpdated)
+            .then();
+    }
+
+    private List<String> getData(int lowerBound, int upperBound) {
+        return Flux.from(postgresqlConnection.createStatement("SELECT id, name 
FROM person WHERE id >= $1 AND id < $2")
+                .bind("$1", lowerBound)
+                .bind("$2", upperBound)
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get("id", 
Long.class) + "|" + row.get("name", String.class)))
+            .collect(ImmutableList.toImmutableList()).block();
+    }
+
+    private void createData(int upperBound) {
+        for (int i = 0; i < upperBound; i++) {
+            postgresqlConnection.createStatement("INSERT INTO person (id, 
name) VALUES ($1, $2)")
+                .bind("$1", i)
+                .bind("$2", "Peter" + i)
+                .execute().flatMap(PostgresqlResult::getRowsUpdated)
+                .then()
+                .block();
+        }
+    }
+}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
new file mode 100644
index 0000000000..ab68dd611a
--- /dev/null
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Optional;
+
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public abstract class JamesPostgresConnectionFactoryTest {
+
+    abstract JamesPostgresConnectionFactory jamesPostgresConnectionFactory();
+
+    @Test
+    void getConnectionShouldWork() {
+        Connection connection = 
jamesPostgresConnectionFactory().getConnection(Optional.empty()).block();
+        String actual = Flux.from(connection.createStatement("SELECT 1")
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, 
String.class)))
+            .collect(ImmutableList.toImmutableList())
+            .block().get(0);
+
+        assertThat(actual).isEqualTo("1");
+    }
+
+    @Test
+    void getConnectionWithDomainShouldWork() {
+        Connection connection = 
jamesPostgresConnectionFactory().getConnection(Domain.of("james")).block();
+        String actual = Flux.from(connection.createStatement("SELECT 1")
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, 
String.class)))
+            .collect(ImmutableList.toImmutableList())
+            .block().get(0);
+
+        assertThat(actual).isEqualTo("1");
+    }
+
+    @Test
+    void getConnectionShouldSetCurrentDomainAttribute() {
+        Domain domain = Domain.of("james");
+        Connection connection = 
jamesPostgresConnectionFactory().getConnection(domain).block();
+        String actual = getDomainAttributeValue(connection);
+
+        assertThat(actual).isEqualTo(domain.asString());
+    }
+
+    @Test
+    void getConnectionWithoutDomainShouldNotSetCurrentDomainAttribute() {
+        Connection connection = 
jamesPostgresConnectionFactory().getConnection(Optional.empty()).block();
+
+        String message = Flux.from(connection.createStatement("show " + 
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, 
String.class)))
+            .collect(ImmutableList.toImmutableList())
+            .map(strings -> "")
+            .onErrorResume(throwable -> Mono.just(throwable.getMessage()))
+            .block();
+
+        assertThat(message).isEqualTo("unrecognized configuration parameter 
\"" + JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE + "\"");
+    }
+
+    String getDomainAttributeValue(Connection connection) {
+        return Flux.from(connection.createStatement("show " + 
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, 
String.class)))
+            .collect(ImmutableList.toImmutableList())
+            .block().get(0);
+    }
+
+}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 35606c4f8e..fd3d8ef1e1 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -28,11 +28,13 @@ import com.google.inject.Module;
 import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
 import io.r2dbc.postgresql.PostgresqlConnectionFactory;
 import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
 import reactor.core.publisher.Mono;
 
 public class PostgresExtension implements GuiceModuleTestExtension {
     private final PostgresModule postgresModule;
     private PostgresExecutor postgresExecutor;
+    private PostgresqlConnectionFactory connectionFactory;
 
     public PostgresExtension(PostgresModule postgresModule) {
         this.postgresModule = postgresModule;
@@ -51,7 +53,7 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
     }
 
     private void initPostgresSession() {
-        PostgresqlConnectionFactory connectionFactory = new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
+         connectionFactory = new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
             .host(getHost())
             .port(getMappedPort())
             .username(PostgresFixture.Database.DB_USER)
@@ -84,6 +86,12 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
         resetSchema();
     }
 
+    public void restartContainer() {
+        DockerPostgresSingleton.SINGLETON.stop();
+        DockerPostgresSingleton.SINGLETON.start();
+        initPostgresSession();
+    }
+
     @Override
     public Module getModule() {
         // TODO: return PostgresConfiguration bean when doing 
https://github.com/linagora/james-project/issues/4910
@@ -105,6 +113,9 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
     public PostgresExecutor getPostgresExecutor() {
         return postgresExecutor;
     }
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
 
     private void initTablesAndIndexes() {
         PostgresTableManager postgresTableManager = new 
PostgresTableManager(postgresExecutor, postgresModule);
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
new file mode 100644
index 0000000000..1ebf19ba35
--- /dev/null
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
@@ -0,0 +1,146 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
+import 
org.apache.james.backends.postgres.utils.SimpleJamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+
+import io.r2dbc.postgresql.api.PostgresqlConnection;
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class SimpleJamesPostgresConnectionFactoryTest extends 
JamesPostgresConnectionFactoryTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = new PostgresExtension();
+
+    private PostgresqlConnection postgresqlConnection;
+    private SimpleJamesPostgresConnectionFactory 
jamesPostgresConnectionFactory;
+
+    JamesPostgresConnectionFactory jamesPostgresConnectionFactory() {
+        return jamesPostgresConnectionFactory;
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        jamesPostgresConnectionFactory = new 
SimpleJamesPostgresConnectionFactory(postgresExtension.getConnectionFactory());
+        postgresqlConnection = (PostgresqlConnection) 
postgresExtension.getConnection().block();
+    }
+
+    @AfterEach
+    void afterEach() {
+        postgresExtension.restartContainer();
+    }
+
+    @Test
+    void factoryShouldCreateCorrectNumberOfConnections() {
+        Integer previousDbActiveNumberOfConnections = getNumberOfConnections();
+
+        // create 50 connections
+        Flux.range(1, 50)
+            .flatMap(i -> 
jamesPostgresConnectionFactory.getConnection(Domain.of("james" + i)))
+            .last()
+            .block();
+
+        Integer dbActiveNumberOfConnections = getNumberOfConnections();
+
+        assertThat(dbActiveNumberOfConnections - 
previousDbActiveNumberOfConnections).isEqualTo(50);
+    }
+
+    @Nullable
+    private Integer getNumberOfConnections() {
+        return Mono.from(postgresqlConnection.createStatement("SELECT count(*) 
from pg_stat_activity where usename = $1;")
+            .bind("$1", PostgresFixture.Database.DB_USER)
+            .execute()).flatMap(result -> Mono.from(result.map((row, 
rowMetadata) -> row.get(0, Integer.class)))).block();
+    }
+
+    @Test
+    void factoryShouldNotCreateNewConnectionWhenDomainsAreTheSame() {
+        Domain domain = Domain.of("james");
+        Connection connectionOne = 
jamesPostgresConnectionFactory.getConnection(domain).block();
+        Connection connectionTwo = 
jamesPostgresConnectionFactory.getConnection(domain).block();
+
+        assertThat(connectionOne == connectionTwo).isTrue();
+    }
+
+    @Test
+    void factoryShouldCreateNewConnectionWhenDomainsAreDifferent() {
+        Connection connectionOne = 
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+        Connection connectionTwo = 
jamesPostgresConnectionFactory.getConnection(Domain.of("lin")).block();
+
+        String domainOne = getDomainAttributeValue(connectionOne);
+
+        String domainTwo = Flux.from(connectionTwo.createStatement("show " + 
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+                .execute())
+            .flatMap(result -> result.map((row, rowMetadata) -> row.get(0, 
String.class)))
+            .collect(ImmutableList.toImmutableList())
+            .block().get(0);
+
+        assertThat(connectionOne).isNotEqualTo(connectionTwo);
+        assertThat(domainOne).isNotEqualTo(domainTwo);
+    }
+
+    @Test
+    void 
factoryShouldNotCreateNewConnectionWhenDomainsAreTheSameAndRequestsAreFromDifferentThreads()
 throws Exception {
+        Set<Connection> connectionSet = ConcurrentHashMap.newKeySet();
+
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> 
jamesPostgresConnectionFactory.getConnection(Domain.of("james"))
+                .doOnNext(connectionSet::add)
+                .then())
+            .threadCount(50)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(connectionSet).hasSize(1);
+    }
+
+    @Test
+    void factoryShouldCreateOnlyOneDefaultConnection() throws Exception {
+        Set<Connection> connectionSet = ConcurrentHashMap.newKeySet();
+
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> 
jamesPostgresConnectionFactory.getConnection(Optional.empty())
+                .doOnNext(connectionSet::add)
+                .then())
+            .threadCount(50)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(connectionSet).hasSize(1);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to