This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 16576107cd JAMES-2586 Introduce PostgresExtension
16576107cd is described below

commit 16576107cd494a5e062cf3342b859d81d0a21289
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Wed Nov 1 17:12:49 2023 +0700

    JAMES-2586 Introduce PostgresExtension
---
 backends-common/postgres/pom.xml                   |   6 +
 .../backends/postgres/utils/PostgresExecutor.java  |   7 ++
 .../postgres/DockerPostgresSingleton.java}         |  38 +++---
 .../postgres/PostgresClusterExtension.java         |  67 -----------
 .../james/backends/postgres/PostgresExtension.java | 119 +++++++++++++++++++
 .../backends/postgres/PostgresExtensionTest.java   | 102 ++++++++++++++++
 .../postgres/PostgresTableManagerTest.java         | 130 +++++++--------------
 7 files changed, 290 insertions(+), 179 deletions(-)

diff --git a/backends-common/postgres/pom.xml b/backends-common/postgres/pom.xml
index 7587adcf5c..3cf5b72327 100644
--- a/backends-common/postgres/pom.xml
+++ b/backends-common/postgres/pom.xml
@@ -39,6 +39,12 @@
             <groupId>${james.groupId}</groupId>
             <artifactId>james-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-guice-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 81a8cc8d2c..78636dc186 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -26,6 +26,8 @@ import org.jooq.SQLDialect;
 import org.jooq.conf.Settings;
 import org.jooq.impl.DSL;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.r2dbc.spi.Connection;
 import reactor.core.publisher.Mono;
 
@@ -48,4 +50,9 @@ public class PostgresExecutor {
     public Mono<Connection> connection() {
         return connection;
     }
+
+    @VisibleForTesting
+    public Mono<Void> dispose() {
+        return connection.flatMap(con -> Mono.from(con.close()));
+    }
 }
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DockerPostgresSingleton.java
similarity index 59%
copy from 
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
copy to 
backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DockerPostgresSingleton.java
index 81a8cc8d2c..21046eb72f 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/DockerPostgresSingleton.java
@@ -17,35 +17,23 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.backends.postgres;
 
-import javax.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.OutputFrame;
 
-import org.jooq.DSLContext;
-import org.jooq.SQLDialect;
-import org.jooq.conf.Settings;
-import org.jooq.impl.DSL;
-
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
-
-public class PostgresExecutor {
-
-    private static final SQLDialect PGSQL_DIALECT = SQLDialect.POSTGRES;
-    private static final Settings SETTINGS = new Settings()
-        .withRenderFormatted(true);
-    private final Mono<Connection> connection;
-
-    @Inject
-    public PostgresExecutor(Mono<Connection> connection) {
-        this.connection = connection;
+public class DockerPostgresSingleton {
+    private static void displayDockerLog(OutputFrame outputFrame) {
+        LOGGER.info(outputFrame.getUtf8String().trim());
     }
 
-    public Mono<DSLContext> dslContext() {
-        return connection.map(con -> DSL.using(con, PGSQL_DIALECT, SETTINGS));
-    }
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DockerPostgresSingleton.class);
+    public static final PostgreSQLContainer SINGLETON = 
PostgresFixture.PG_CONTAINER.get()
+        .withLogConsumer(DockerPostgresSingleton::displayDockerLog);
 
-    public Mono<Connection> connection() {
-        return connection;
+    static {
+        SINGLETON.start();
     }
 }
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresClusterExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresClusterExtension.java
deleted file mode 100644
index bd2be62669..0000000000
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresClusterExtension.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.postgres;
-
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.PostgreSQLContainer;
-
-public class PostgresClusterExtension implements BeforeAllCallback, 
BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
-
-    //  TODO
-    private GenericContainer<?> container = new 
PostgreSQLContainer("postgres:11.1");
-
-    @Override
-    public void afterAll(ExtensionContext extensionContext) throws Exception {
-
-    }
-
-    @Override
-    public void afterEach(ExtensionContext extensionContext) throws Exception {
-
-    }
-
-    @Override
-    public void beforeAll(ExtensionContext extensionContext) throws Exception {
-
-    }
-
-    @Override
-    public void beforeEach(ExtensionContext extensionContext) throws Exception 
{
-
-    }
-
-    @Override
-    public boolean supportsParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
-        return false;
-    }
-
-    @Override
-    public Object resolveParameter(ParameterContext parameterContext, 
ExtensionContext extensionContext) throws ParameterResolutionException {
-        return null;
-    }
-}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
new file mode 100644
index 0000000000..0e4ab28fa3
--- /dev/null
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -0,0 +1,119 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import org.apache.james.GuiceModuleTestExtension;
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import com.google.inject.Module;
+
+import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
+import io.r2dbc.postgresql.PostgresqlConnectionFactory;
+import io.r2dbc.spi.Connection;
+import reactor.core.publisher.Mono;
+
+public class PostgresExtension implements GuiceModuleTestExtension {
+    private final PostgresModule postgresModule;
+    private PostgresExecutor postgresExecutor;
+
+    public PostgresExtension(PostgresModule postgresModule) {
+        this.postgresModule = postgresModule;
+    }
+
+    public PostgresExtension() {
+        this(PostgresModule.EMPTY_MODULE);
+    }
+
+    @Override
+    public void beforeAll(ExtensionContext extensionContext) {
+        if (!DockerPostgresSingleton.SINGLETON.isRunning()) {
+            DockerPostgresSingleton.SINGLETON.start();
+        }
+        initPostgresSession();
+    }
+
+    private void initPostgresSession() {
+        PostgresqlConnectionFactory connectionFactory = new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
+            .host(getHost())
+            .port(getMappedPort())
+            .username(PostgresFixture.Database.DB_USER)
+            .password(PostgresFixture.Database.DB_PASSWORD)
+            .database(PostgresFixture.Database.DB_NAME)
+            .schema(PostgresFixture.Database.SCHEMA)
+            .build());
+
+        postgresExecutor = new PostgresExecutor(connectionFactory.create()
+            .cache()
+            .cast(Connection.class));
+    }
+
+    @Override
+    public void afterAll(ExtensionContext extensionContext) {
+        disposePostgresSession();
+    }
+
+    private void disposePostgresSession() {
+        postgresExecutor.dispose().block();
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext extensionContext) {
+        initTablesAndIndexes();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext extensionContext) {
+        resetSchema();
+    }
+
+    @Override
+    public Module getModule() {
+        // TODO: return PostgresConfiguration bean when doing 
https://github.com/linagora/james-project/issues/4910
+        return GuiceModuleTestExtension.super.getModule();
+    }
+
+    public String getHost() {
+        return DockerPostgresSingleton.SINGLETON.getHost();
+    }
+
+    public Integer getMappedPort() {
+        return 
DockerPostgresSingleton.SINGLETON.getMappedPort(PostgresFixture.PORT);
+    }
+
+    public Mono<Connection> getConnection() {
+        return postgresExecutor.connection();
+    }
+
+    private void initTablesAndIndexes() {
+        PostgresTableManager postgresTableManager = new 
PostgresTableManager(postgresExecutor, postgresModule);
+        postgresTableManager.initializeTables().block();
+        postgresTableManager.initializeTableIndexes().block();
+    }
+
+    private void resetSchema() {
+        getConnection()
+            .flatMapMany(connection -> 
Mono.from(connection.createStatement("DROP SCHEMA " + 
PostgresFixture.Database.SCHEMA + " CASCADE").execute())
+                .then(Mono.from(connection.createStatement("CREATE SCHEMA " + 
PostgresFixture.Database.SCHEMA).execute()))
+                .flatMap(result -> Mono.from(result.getRowsUpdated())))
+            .collectList()
+            .block();
+    }
+}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtensionTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtensionTest.java
new file mode 100644
index 0000000000..f1593fef21
--- /dev/null
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtensionTest.java
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class PostgresExtensionTest {
+    static PostgresTable TABLE_1 = PostgresTable.name("table1")
+        .createTableStep((dslContext, tableName) -> 
dslContext.createTable(tableName)
+            .column("column1", SQLDataType.UUID.notNull())
+            .column("column2", SQLDataType.INTEGER)
+            .column("column3", SQLDataType.VARCHAR(255).notNull()))
+        .noRLS();
+
+    static PostgresIndex INDEX_1 = PostgresIndex.name("index1")
+        .createIndexStep((dslContext, indexName) -> 
dslContext.createIndex(indexName)
+            .on(DSL.table("table1"), DSL.field("column1").asc()));
+
+    static PostgresTable TABLE_2 = PostgresTable.name("table2")
+        .createTableStep((dslContext, tableName) -> 
dslContext.createTable(tableName)
+            .column("column1", SQLDataType.INTEGER))
+        .noRLS();
+
+    static PostgresIndex INDEX_2 = PostgresIndex.name("index2")
+        .createIndexStep((dslContext, indexName) -> 
dslContext.createIndex(indexName)
+            .on(DSL.table("table2"), DSL.field("column1").desc()));
+
+    static PostgresModule POSTGRES_MODULE = PostgresModule.builder()
+        .addTable(TABLE_1, TABLE_2)
+        .addIndex(INDEX_1, INDEX_2)
+        .build();
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = new 
PostgresExtension(POSTGRES_MODULE);
+
+    @Test
+    void postgresExtensionShouldProvisionTablesAndIndexes() {
+        assertThat(getColumnNameAndDataType("table1"))
+            .containsExactlyInAnyOrder(
+                Pair.of("column1", "uuid"),
+                Pair.of("column2", "integer"),
+                Pair.of("column3", "character varying"));
+
+        assertThat(getColumnNameAndDataType("table2"))
+            .containsExactlyInAnyOrder(Pair.of("column1", "integer"));
+
+        assertThat(listIndexToTableMappings())
+            .contains(
+                Pair.of("index1", "table1"),
+                Pair.of("index2", "table2"));
+    }
+
+    private List<Pair<String, String>> getColumnNameAndDataType(String 
tableName) {
+        return postgresExtension.getConnection()
+                .flatMapMany(connection -> 
Flux.from(Mono.from(connection.createStatement("SELECT table_name, column_name, 
data_type FROM information_schema.columns WHERE table_name = $1;")
+                        .bind("$1", tableName)
+                        .execute())
+                    .flatMapMany(result -> result.map((row, rowMetadata) ->
+                        Pair.of(row.get("column_name", String.class), 
row.get("data_type", String.class))))))
+            .collectList()
+            .block();
+    }
+
+    private List<Pair<String, String>> listIndexToTableMappings() {
+        return postgresExtension.getConnection()
+            .flatMapMany(connection -> 
Mono.from(connection.createStatement("SELECT indexname, tablename FROM 
pg_indexes;")
+                    .execute())
+                .flatMapMany(result ->
+                    result.map((row, rowMetadata) ->
+                        Pair.of(row.get("indexname", String.class), 
row.get("tablename", String.class)))))
+            .collectList()
+            .block();
+    }
+}
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresTableManagerTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresTableManagerTest.java
index 62eae38316..007ff246a5 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresTableManagerTest.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresTableManagerTest.java
@@ -30,54 +30,19 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
 import org.jooq.impl.DSL;
 import org.jooq.impl.SQLDataType;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-
-import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
-import io.r2dbc.postgresql.PostgresqlConnectionFactory;
-import io.r2dbc.postgresql.api.PostgresqlResult;
-import io.r2dbc.spi.Connection;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-@Testcontainers
-public class PostgresTableManagerTest {
-
-    @Container
-    private static final GenericContainer<?> pgContainer = 
PostgresFixture.PG_CONTAINER.get();
-
-    private PostgresqlConnectionFactory connectionFactory;
-
-    @BeforeEach
-    void beforeAll() {
-        connectionFactory = new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
-            .host(pgContainer.getHost())
-            .port(pgContainer.getMappedPort(PostgresFixture.PORT))
-            .username(PostgresFixture.Database.DB_USER)
-            .password(PostgresFixture.Database.DB_PASSWORD)
-            .database(PostgresFixture.Database.DB_NAME)
-            .schema(PostgresFixture.Database.SCHEMA)
-            .build());
-    }
+class PostgresTableManagerTest {
 
-    @AfterEach
-    void afterEach() {
-        // clean data
-        Flux.usingWhen(connectionFactory.create(),
-                connection -> Mono.from(connection.createStatement("DROP 
SCHEMA " + PostgresFixture.Database.SCHEMA + " CASCADE").execute())
-                    .then(Mono.from(connection.createStatement("CREATE SCHEMA 
" + PostgresFixture.Database.SCHEMA).execute()))
-                    .flatMap(PostgresqlResult::getRowsUpdated),
-                Connection::close)
-            .collectList()
-            .block();
-    }
+    @RegisterExtension
+    static PostgresExtension postgresExtension = new PostgresExtension();
 
-    Function<PostgresModule, PostgresTableManager> tableManagerFactory = 
module -> new PostgresTableManager(new 
PostgresExecutor(connectionFactory.create()
-        .map(c -> c)), module);
+    Function<PostgresModule, PostgresTableManager> tableManagerFactory =
+        module -> new PostgresTableManager(new 
PostgresExecutor(postgresExtension.getConnection()), module);
 
     @Test
     void initializeTableShouldSuccessWhenModuleHasSingleTable() {
@@ -198,7 +163,7 @@ public class PostgresTableManagerTest {
 
         testee.initializeTableIndexes().block();
 
-        List<Pair<String, String>> listIndexes = listIndexes();
+        List<Pair<String, String>> listIndexes = listIndexToTableMappings();
 
         assertThat(listIndexes)
             .contains(Pair.of(indexName, tableName));
@@ -236,7 +201,7 @@ public class PostgresTableManagerTest {
 
         testee.initializeTableIndexes().block();
 
-        List<Pair<String, String>> listIndexes = listIndexes();
+        List<Pair<String, String>> listIndexes = listIndexToTableMappings();
 
         assertThat(listIndexes)
             .contains(Pair.of(indexName1, tableName), Pair.of(indexName2, 
tableName));
@@ -286,24 +251,21 @@ public class PostgresTableManagerTest {
             .block();
 
         // insert data
-        Flux.usingWhen(connectionFactory.create(),
-                connection -> Flux.range(0, 10)
-                    .flatMap(i -> Mono.from(connection.createStatement("INSERT 
INTO " + tableName1 + " (column1) VALUES ($1);")
-                            .bind("$1", i)
-                            .execute())
-                        .flatMap(PostgresqlResult::getRowsUpdated))
-                    .last(),
-                Connection::close)
+        postgresExtension.getConnection()
+            .flatMapMany(connection -> Flux.range(0, 10)
+                .flatMap(i -> Mono.from(connection.createStatement("INSERT 
INTO " + tableName1 + " (column1) VALUES ($1);")
+                    .bind("$1", i)
+                    .execute())
+                    .flatMap(result -> Mono.from(result.getRowsUpdated())))
+                .last())
             .collectList()
             .block();
 
-
-        Supplier<Long> getTotalRecordInDB = () -> 
Flux.usingWhen(connectionFactory.create(),
-                connection -> Mono.from(connection.createStatement("select 
count(*) FROM " + tableName1)
-                        .execute())
-                    .flatMapMany(result ->
-                        result.map((row, rowMetadata) -> row.get("count", 
Long.class))),
-                Connection::close)
+        Supplier<Long> getTotalRecordInDB = () -> 
postgresExtension.getConnection()
+            .flatMapMany(connection -> 
Mono.from(connection.createStatement("select count(*) FROM " + tableName1)
+                    .execute())
+                .flatMapMany(result ->
+                    result.map((row, rowMetadata) -> row.get("count", 
Long.class))))
             .last()
             .block();
 
@@ -339,16 +301,15 @@ public class PostgresTableManagerTest {
                 Pair.of("clm2", "character varying"),
                 Pair.of("domain", "character varying"));
 
-        List<Pair<String, Boolean>> pgClassCheckResult = 
Flux.usingWhen(connectionFactory.create(),
-                connection -> Mono.from(connection.createStatement("select 
relname, relrowsecurity " +
-                            "from pg_class " +
-                            "where oid = 'tbn1'::regclass;;")
-                        .execute())
-                    .flatMapMany(result ->
-                        result.map((row, rowMetadata) ->
-                            Pair.of(row.get("relname", String.class),
-                                row.get("relrowsecurity", Boolean.class)))),
-                Connection::close)
+        List<Pair<String, Boolean>> pgClassCheckResult = 
postgresExtension.getConnection()
+            .flatMapMany(connection -> 
Mono.from(connection.createStatement("select relname, relrowsecurity " +
+                    "from pg_class " +
+                    "where oid = 'tbn1'::regclass;;")
+                .execute())
+                .flatMapMany(result ->
+                    result.map((row, rowMetadata) ->
+                        Pair.of(row.get("relname", String.class),
+                            row.get("relrowsecurity", Boolean.class)))))
             .collectList()
             .block();
 
@@ -357,30 +318,25 @@ public class PostgresTableManagerTest {
                 Pair.of("tbn1", true));
     }
 
-
     private List<Pair<String, String>> getColumnNameAndDataType(String 
tableName) {
-        return Flux.usingWhen(connectionFactory.create(),
-                connection -> Mono.from(connection.createStatement("SELECT 
table_name, column_name, data_type FROM information_schema.columns WHERE 
table_name = $1;")
-                        .bind("$1", tableName)
-                        .execute())
-                    .flatMapMany(result ->
-                        result.map((row, rowMetadata) ->
-                            Pair.of(row.get("column_name", String.class),
-                                row.get("data_type", String.class)))),
-                Connection::close)
+        return postgresExtension.getConnection()
+            .flatMapMany(connection -> 
Flux.from(Mono.from(connection.createStatement("SELECT table_name, column_name, 
data_type FROM information_schema.columns WHERE table_name = $1;")
+                    .bind("$1", tableName)
+                    .execute())
+                .flatMapMany(result -> result.map((row, rowMetadata) ->
+                    Pair.of(row.get("column_name", String.class), 
row.get("data_type", String.class))))))
             .collectList()
             .block();
     }
 
     // return list<pair<indexName, tableName>>
-    private List<Pair<String, String>> listIndexes() {
-        return Flux.usingWhen(connectionFactory.create(),
-                connection -> Mono.from(connection.createStatement("SELECT 
indexname, tablename FROM pg_indexes;")
-                        .execute())
-                    .flatMapMany(result ->
-                        result.map((row, rowMetadata) ->
-                            Pair.of(row.get("indexname", String.class), 
row.get("tablename", String.class)))),
-                Connection::close)
+    private List<Pair<String, String>> listIndexToTableMappings() {
+        return postgresExtension.getConnection()
+            .flatMapMany(connection -> 
Mono.from(connection.createStatement("SELECT indexname, tablename FROM 
pg_indexes;")
+                    .execute())
+                .flatMapMany(result ->
+                    result.map((row, rowMetadata) ->
+                        Pair.of(row.get("indexname", String.class), 
row.get("tablename", String.class)))))
             .collectList()
             .block();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to