This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push:
new 9190994801 JAMES-2586 implement pg connection factory (#1774)
9190994801 is described below
commit 9190994801b67405cab2e4c0ab157a6317f4d6da
Author: hungphan227 <[email protected]>
AuthorDate: Wed Nov 8 13:51:35 2023 +0700
JAMES-2586 implement pg connection factory (#1774)
---
.../utils/JamesPostgresConnectionFactory.java | 37 ++++
.../SimpleJamesPostgresConnectionFactory.java | 83 ++++++++
.../postgres/ConnectionThreadSafetyTest.java | 219 +++++++++++++++++++++
.../JamesPostgresConnectionFactoryTest.java | 96 +++++++++
.../james/backends/postgres/PostgresExtension.java | 13 +-
.../SimpleJamesPostgresConnectionFactoryTest.java | 146 ++++++++++++++
6 files changed, 593 insertions(+), 1 deletion(-)
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
new file mode 100644
index 0000000000..8d8391e209
--- /dev/null
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres.utils;
+
+import java.util.Optional;
+
+import org.apache.james.core.Domain;
+
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Mono;
+
+public interface JamesPostgresConnectionFactory {
+ String DOMAIN_ATTRIBUTE = "app.current_domain";
+
+ default Mono<Connection> getConnection(Domain domain) {
+ return getConnection(Optional.ofNullable(domain));
+ }
+
+ Mono<Connection> getConnection(Optional<Domain> domain);
+}
diff --git
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
new file mode 100644
index 0000000000..edfba85ce9
--- /dev/null
+++
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/SimpleJamesPostgresConnectionFactory.java
@@ -0,0 +1,83 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres.utils;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.core.Domain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
+import reactor.core.publisher.Mono;
+
+public class SimpleJamesPostgresConnectionFactory implements
JamesPostgresConnectionFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleJamesPostgresConnectionFactory.class);
+ private static final Domain DEFAULT = Domain.of("default");
+
+ private final ConnectionFactory connectionFactory;
+ private final Map<Domain, Connection> mapDomainToConnection = new
ConcurrentHashMap<>();
+
+ public SimpleJamesPostgresConnectionFactory(ConnectionFactory
connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ public Mono<Connection> getConnection(Optional<Domain> maybeDomain) {
+ return maybeDomain.map(this::getConnectionForDomain)
+ .orElse(getConnectionForDomain(DEFAULT));
+ }
+
+ private Mono<Connection> getConnectionForDomain(Domain domain) {
+ return Mono.just(domain)
+ .flatMap(domainValue -> Mono.fromCallable(() ->
mapDomainToConnection.get(domainValue))
+ .switchIfEmpty(create(domainValue)));
+ }
+
+ private Mono<Connection> create(Domain domain) {
+ return Mono.from(connectionFactory.create())
+ .doOnError(e -> LOGGER.error("Error while creating connection for
domain {}", domain, e))
+ .flatMap(newConnection -> getAndSetConnection(domain,
newConnection));
+ }
+
+ private Mono<Connection> getAndSetConnection(Domain domain, Connection
newConnection) {
+ return Mono.justOrEmpty(mapDomainToConnection.putIfAbsent(domain,
newConnection))
+ .map(postgresqlConnection -> {
+ //close redundant connection
+ Mono.from(newConnection.close())
+ .doOnError(e -> LOGGER.error("Error while closing
connection for domain {}", domain, e))
+ .subscribe();
+ return postgresqlConnection;
+ }).switchIfEmpty(setDomainAttributeForConnection(domain,
newConnection));
+ }
+
+ private static Mono<Connection> setDomainAttributeForConnection(Domain
domain, Connection newConnection) {
+ if (DEFAULT.equals(domain)) {
+ return Mono.just(newConnection);
+ } else {
+ return Mono.from(newConnection.createStatement("SET " +
DOMAIN_ATTRIBUTE + " TO '" + domain.asString() + "'") // It should be set value
via Bind, but it doesn't work
+ .execute())
+ .doOnError(e -> LOGGER.error("Error while setting domain
attribute for domain {}", domain, e))
+ .then(Mono.just(newConnection));
+ }
+ }
+}
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
new file mode 100644
index 0000000000..20eedcee4d
--- /dev/null
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/ConnectionThreadSafetyTest.java
@@ -0,0 +1,219 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import
org.apache.james.backends.postgres.utils.SimpleJamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import io.r2dbc.postgresql.api.PostgresqlConnection;
+import io.r2dbc.postgresql.api.PostgresqlResult;
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.Result;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ConnectionThreadSafetyTest {
+ static final int NUMBER_OF_THREAD = 100;
+ static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS
person (\n" +
+ "\tid serial PRIMARY KEY,\n" +
+ "\tname VARCHAR ( 50 ) UNIQUE NOT NULL\n" +
+ ");";
+
+ @RegisterExtension
+ static PostgresExtension postgresExtension = new PostgresExtension();
+
+ private static PostgresqlConnection postgresqlConnection;
+ private static SimpleJamesPostgresConnectionFactory
jamesPostgresConnectionFactory;
+
+ @BeforeAll
+ static void beforeAll() {
+ jamesPostgresConnectionFactory = new
SimpleJamesPostgresConnectionFactory(postgresExtension.getConnectionFactory());
+ postgresqlConnection = (PostgresqlConnection)
postgresExtension.getConnection().block();
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ postgresqlConnection.createStatement(CREATE_TABLE_STATEMENT)
+ .execute()
+ .flatMap(PostgresqlResult::getRowsUpdated)
+ .then()
+ .block();
+ }
+
+ @AfterEach
+ void afterEach() {
+ postgresqlConnection.createStatement("DROP TABLE person")
+ .execute()
+ .flatMap(PostgresqlResult::getRowsUpdated)
+ .then()
+ .block();
+ }
+
+ @Test
+ void
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndAllQueriesAreSelect()
throws Exception {
+ createData(NUMBER_OF_THREAD);
+
+ Connection connection =
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+ List<String> actual = new Vector<>();
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> getData(connection,
threadNumber)
+ .doOnNext(s -> actual.add(s))
+ .then())
+ .threadCount(NUMBER_OF_THREAD)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ Set<String> expected = Stream.iterate(0, i -> i +
1).limit(NUMBER_OF_THREAD).map(i -> i + "|Peter" +
i).collect(ImmutableSet.toImmutableSet());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ void
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndAllQueriesAreInsert()
throws Exception {
+ Connection connection =
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> createData(connection,
threadNumber))
+ .threadCount(NUMBER_OF_THREAD)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ List<String> actual = getData(0, NUMBER_OF_THREAD);
+ Set<String> expected = Stream.iterate(0, i -> i +
1).limit(NUMBER_OF_THREAD).map(i -> i + "|Peter" +
i).collect(ImmutableSet.toImmutableSet());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ void
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndInsertQueriesAreDuplicated()
throws Exception {
+ Connection connection =
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+
+ AtomicInteger numberOfSuccess = new AtomicInteger(0);
+ AtomicInteger numberOfFail = new AtomicInteger(0);
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> createData(connection,
threadNumber % 10)
+ .then(Mono.fromCallable(() ->
numberOfSuccess.incrementAndGet()))
+ .then()
+ .onErrorResume(throwable -> {
+ if (throwable.getMessage().contains("duplicate key value
violates unique constraint")) {
+ numberOfFail.incrementAndGet();
+ }
+ return Mono.empty();
+ }))
+ .threadCount(100)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ List<String> actual = getData(0, 100);
+ Set<String> expected = Stream.iterate(0, i -> i + 1).limit(10).map(i
-> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ assertThat(numberOfSuccess.get()).isEqualTo(10);
+ assertThat(numberOfFail.get()).isEqualTo(90);
+ }
+
+ @Test
+ void
connectionShouldWorkWellWhenItIsUsedByMultipleThreadsAndQueriesIncludeBothSelectAndInsert()
throws Exception {
+ createData(50);
+
+ Connection connection =
jamesPostgresConnectionFactory.getConnection(Optional.empty()).block();
+
+ List<String> actualSelect = new Vector<>();
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> {
+ if (threadNumber < 50) {
+ return getData(connection, threadNumber)
+ .doOnNext(s -> actualSelect.add(s))
+ .then();
+ } else {
+ return createData(connection, threadNumber);
+ }
+ })
+ .threadCount(NUMBER_OF_THREAD)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ List<String> actualInsert = getData(50, 100);
+
+ Set<String> expectedSelect = Stream.iterate(0, i -> i +
1).limit(50).map(i -> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+ Set<String> expectedInsert = Stream.iterate(50, i -> i +
1).limit(50).map(i -> i + "|Peter" + i).collect(ImmutableSet.toImmutableSet());
+
+
assertThat(actualSelect).containsExactlyInAnyOrderElementsOf(expectedSelect);
+
assertThat(actualInsert).containsExactlyInAnyOrderElementsOf(expectedInsert);
+ }
+
+ private Flux<String> getData(Connection connection, int threadNumber) {
+ return Flux.from(connection.createStatement("SELECT id, name FROM
PERSON WHERE id = $1")
+ .bind("$1", threadNumber)
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get("id",
Long.class) + "|" + row.get("name", String.class)));
+ }
+
+ @NotNull
+ private Mono<Void> createData(Connection connection, int threadNumber) {
+ return Flux.from(connection.createStatement("INSERT INTO person (id,
name) VALUES ($1, $2)")
+ .bind("$1", threadNumber)
+ .bind("$2", "Peter" + threadNumber)
+ .execute())
+ .flatMap(Result::getRowsUpdated)
+ .then();
+ }
+
+ private List<String> getData(int lowerBound, int upperBound) {
+ return Flux.from(postgresqlConnection.createStatement("SELECT id, name
FROM person WHERE id >= $1 AND id < $2")
+ .bind("$1", lowerBound)
+ .bind("$2", upperBound)
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get("id",
Long.class) + "|" + row.get("name", String.class)))
+ .collect(ImmutableList.toImmutableList()).block();
+ }
+
+ private void createData(int upperBound) {
+ for (int i = 0; i < upperBound; i++) {
+ postgresqlConnection.createStatement("INSERT INTO person (id,
name) VALUES ($1, $2)")
+ .bind("$1", i)
+ .bind("$2", "Peter" + i)
+ .execute().flatMap(PostgresqlResult::getRowsUpdated)
+ .then()
+ .block();
+ }
+ }
+}
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
new file mode 100644
index 0000000000..ab68dd611a
--- /dev/null
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/JamesPostgresConnectionFactoryTest.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Optional;
+
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public abstract class JamesPostgresConnectionFactoryTest {
+
+ abstract JamesPostgresConnectionFactory jamesPostgresConnectionFactory();
+
+ @Test
+ void getConnectionShouldWork() {
+ Connection connection =
jamesPostgresConnectionFactory().getConnection(Optional.empty()).block();
+ String actual = Flux.from(connection.createStatement("SELECT 1")
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get(0,
String.class)))
+ .collect(ImmutableList.toImmutableList())
+ .block().get(0);
+
+ assertThat(actual).isEqualTo("1");
+ }
+
+ @Test
+ void getConnectionWithDomainShouldWork() {
+ Connection connection =
jamesPostgresConnectionFactory().getConnection(Domain.of("james")).block();
+ String actual = Flux.from(connection.createStatement("SELECT 1")
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get(0,
String.class)))
+ .collect(ImmutableList.toImmutableList())
+ .block().get(0);
+
+ assertThat(actual).isEqualTo("1");
+ }
+
+ @Test
+ void getConnectionShouldSetCurrentDomainAttribute() {
+ Domain domain = Domain.of("james");
+ Connection connection =
jamesPostgresConnectionFactory().getConnection(domain).block();
+ String actual = getDomainAttributeValue(connection);
+
+ assertThat(actual).isEqualTo(domain.asString());
+ }
+
+ @Test
+ void getConnectionWithoutDomainShouldNotSetCurrentDomainAttribute() {
+ Connection connection =
jamesPostgresConnectionFactory().getConnection(Optional.empty()).block();
+
+ String message = Flux.from(connection.createStatement("show " +
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get(0,
String.class)))
+ .collect(ImmutableList.toImmutableList())
+ .map(strings -> "")
+ .onErrorResume(throwable -> Mono.just(throwable.getMessage()))
+ .block();
+
+ assertThat(message).isEqualTo("unrecognized configuration parameter
\"" + JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE + "\"");
+ }
+
+ String getDomainAttributeValue(Connection connection) {
+ return Flux.from(connection.createStatement("show " +
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get(0,
String.class)))
+ .collect(ImmutableList.toImmutableList())
+ .block().get(0);
+ }
+
+}
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 35606c4f8e..fd3d8ef1e1 100644
---
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -28,11 +28,13 @@ import com.google.inject.Module;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
import reactor.core.publisher.Mono;
public class PostgresExtension implements GuiceModuleTestExtension {
private final PostgresModule postgresModule;
private PostgresExecutor postgresExecutor;
+ private PostgresqlConnectionFactory connectionFactory;
public PostgresExtension(PostgresModule postgresModule) {
this.postgresModule = postgresModule;
@@ -51,7 +53,7 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
}
private void initPostgresSession() {
- PostgresqlConnectionFactory connectionFactory = new
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
+ connectionFactory = new
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host(getHost())
.port(getMappedPort())
.username(PostgresFixture.Database.DB_USER)
@@ -84,6 +86,12 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
resetSchema();
}
+ public void restartContainer() {
+ DockerPostgresSingleton.SINGLETON.stop();
+ DockerPostgresSingleton.SINGLETON.start();
+ initPostgresSession();
+ }
+
@Override
public Module getModule() {
// TODO: return PostgresConfiguration bean when doing
https://github.com/linagora/james-project/issues/4910
@@ -105,6 +113,9 @@ public class PostgresExtension implements
GuiceModuleTestExtension {
public PostgresExecutor getPostgresExecutor() {
return postgresExecutor;
}
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
private void initTablesAndIndexes() {
PostgresTableManager postgresTableManager = new
PostgresTableManager(postgresExecutor, postgresModule);
diff --git
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
new file mode 100644
index 0000000000..1ebf19ba35
--- /dev/null
+++
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/SimpleJamesPostgresConnectionFactoryTest.java
@@ -0,0 +1,146 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.james.backends.postgres.utils.JamesPostgresConnectionFactory;
+import
org.apache.james.backends.postgres.utils.SimpleJamesPostgresConnectionFactory;
+import org.apache.james.core.Domain;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+
+import io.r2dbc.postgresql.api.PostgresqlConnection;
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class SimpleJamesPostgresConnectionFactoryTest extends
JamesPostgresConnectionFactoryTest {
+ @RegisterExtension
+ static PostgresExtension postgresExtension = new PostgresExtension();
+
+ private PostgresqlConnection postgresqlConnection;
+ private SimpleJamesPostgresConnectionFactory
jamesPostgresConnectionFactory;
+
+ JamesPostgresConnectionFactory jamesPostgresConnectionFactory() {
+ return jamesPostgresConnectionFactory;
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ jamesPostgresConnectionFactory = new
SimpleJamesPostgresConnectionFactory(postgresExtension.getConnectionFactory());
+ postgresqlConnection = (PostgresqlConnection)
postgresExtension.getConnection().block();
+ }
+
+ @AfterEach
+ void afterEach() {
+ postgresExtension.restartContainer();
+ }
+
+ @Test
+ void factoryShouldCreateCorrectNumberOfConnections() {
+ Integer previousDbActiveNumberOfConnections = getNumberOfConnections();
+
+ // create 50 connections
+ Flux.range(1, 50)
+ .flatMap(i ->
jamesPostgresConnectionFactory.getConnection(Domain.of("james" + i)))
+ .last()
+ .block();
+
+ Integer dbActiveNumberOfConnections = getNumberOfConnections();
+
+ assertThat(dbActiveNumberOfConnections -
previousDbActiveNumberOfConnections).isEqualTo(50);
+ }
+
+ @Nullable
+ private Integer getNumberOfConnections() {
+ return Mono.from(postgresqlConnection.createStatement("SELECT count(*)
from pg_stat_activity where usename = $1;")
+ .bind("$1", PostgresFixture.Database.DB_USER)
+ .execute()).flatMap(result -> Mono.from(result.map((row,
rowMetadata) -> row.get(0, Integer.class)))).block();
+ }
+
+ @Test
+ void factoryShouldNotCreateNewConnectionWhenDomainsAreTheSame() {
+ Domain domain = Domain.of("james");
+ Connection connectionOne =
jamesPostgresConnectionFactory.getConnection(domain).block();
+ Connection connectionTwo =
jamesPostgresConnectionFactory.getConnection(domain).block();
+
+ assertThat(connectionOne == connectionTwo).isTrue();
+ }
+
+ @Test
+ void factoryShouldCreateNewConnectionWhenDomainsAreDifferent() {
+ Connection connectionOne =
jamesPostgresConnectionFactory.getConnection(Domain.of("james")).block();
+ Connection connectionTwo =
jamesPostgresConnectionFactory.getConnection(Domain.of("lin")).block();
+
+ String domainOne = getDomainAttributeValue(connectionOne);
+
+ String domainTwo = Flux.from(connectionTwo.createStatement("show " +
JamesPostgresConnectionFactory.DOMAIN_ATTRIBUTE)
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> row.get(0,
String.class)))
+ .collect(ImmutableList.toImmutableList())
+ .block().get(0);
+
+ assertThat(connectionOne).isNotEqualTo(connectionTwo);
+ assertThat(domainOne).isNotEqualTo(domainTwo);
+ }
+
+ @Test
+ void
factoryShouldNotCreateNewConnectionWhenDomainsAreTheSameAndRequestsAreFromDifferentThreads()
throws Exception {
+ Set<Connection> connectionSet = ConcurrentHashMap.newKeySet();
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) ->
jamesPostgresConnectionFactory.getConnection(Domain.of("james"))
+ .doOnNext(connectionSet::add)
+ .then())
+ .threadCount(50)
+ .operationCount(10)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(connectionSet).hasSize(1);
+ }
+
+ @Test
+ void factoryShouldCreateOnlyOneDefaultConnection() throws Exception {
+ Set<Connection> connectionSet = ConcurrentHashMap.newKeySet();
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) ->
jamesPostgresConnectionFactory.getConnection(Optional.empty())
+ .doOnNext(connectionSet::add)
+ .then())
+ .threadCount(50)
+ .operationCount(10)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ assertThat(connectionSet).hasSize(1);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]