This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 35bd908641 JAMES-2586 Implement BlobReferenceSource(s) for postgres-app
35bd908641 is described below

commit 35bd90864133354ff2b0810839ee005e26292b4c
Author: hung phan <[email protected]>
AuthorDate: Thu Jan 4 17:11:37 2024 +0700

    JAMES-2586 Implement BlobReferenceSource(s) for postgres-app
---
 .../backends/postgres/PostgresConfiguration.java   | 177 ++++++++----
 .../utils/JamesPostgresConnectionFactory.java      |   1 +
 .../backends/postgres/utils/PostgresExecutor.java  |   1 +
 .../postgres/PostgresConfigurationTest.java        | 115 ++++----
 .../james/backends/postgres/PostgresExtension.java |  21 +-
 .../mail/PostgresMessageBlobReferenceSource.java   |  25 +-
 .../postgres/mail/dao/PostgresMessageDAO.java      |  11 +-
 .../PostgresMessageBlobReferenceSourceTest.java    | 100 +++++++
 .../sample-configuration/postgres.properties       |  21 +-
 .../org/apache/james/PostgresJamesServerMain.java  |   8 -
 .../modules/mailbox/PostgresMailboxModule.java     |   8 +
 .../james/modules/data/PostgresCommonModule.java   |  45 ++-
 .../modules/data/PostgresMailRepositoryModule.java |   7 +
 .../postgres/PostgresMailRepository.java           | 301 +--------------------
 .../PostgresMailRepositoryBlobReferenceSource.java |  24 +-
 ....java => PostgresMailRepositoryContentDAO.java} |  78 +++---
 .../postgres/PostgresMailRepositoryFactory.java    |   2 +-
 ...tgresMailRepositoryBlobReferenceSourceTest.java |  94 +++++++
 .../postgres/PostgresMailRepositoryTest.java       |   2 +-
 19 files changed, 554 insertions(+), 487 deletions(-)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
index 7ffeb8be40..82683044ff 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java
@@ -19,31 +19,34 @@
 
 package org.apache.james.backends.postgres;
 
-import java.net.URI;
-import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
 import org.apache.commons.configuration2.Configuration;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
 
 public class PostgresConfiguration {
-    public static final String URL = "url";
     public static final String DATABASE_NAME = "database.name";
     public static final String DATABASE_NAME_DEFAULT_VALUE = "postgres";
     public static final String DATABASE_SCHEMA = "database.schema";
     public static final String DATABASE_SCHEMA_DEFAULT_VALUE = "public";
+    public static final String HOST = "database.host";
+    public static final String HOST_DEFAULT_VALUE = "localhost";
+    public static final String PORT = "database.port";
+    public static final int PORT_DEFAULT_VALUE = 5432;
+    public static final String USERNAME = "database.username";
+    public static final String PASSWORD = "database.password";
+    public static final String NON_RLS_USERNAME = "database.non-rls.username";
+    public static final String NON_RLS_PASSWORD = "database.non-rls.password";
     public static final String RLS_ENABLED = "row.level.security.enabled";
 
     public static class Credential {
         private final String username;
         private final String password;
 
-        Credential(String username, String password) {
+
+        public Credential(String username, String password) {
             this.username = username;
             this.password = password;
         }
@@ -58,16 +61,16 @@ public class PostgresConfiguration {
     }
 
     public static class Builder {
-        private Optional<String> url = Optional.empty();
         private Optional<String> databaseName = Optional.empty();
         private Optional<String> databaseSchema = Optional.empty();
+        private Optional<String> host = Optional.empty();
+        private Optional<Integer> port = Optional.empty();
+        private Optional<String> username = Optional.empty();
+        private Optional<String> password = Optional.empty();
+        private Optional<String> nonRLSUser = Optional.empty();
+        private Optional<String> nonRLSPassword = Optional.empty();
         private Optional<Boolean> rowLevelSecurityEnabled = Optional.empty();
 
-        public Builder url(String url) {
-            this.url = Optional.of(url);
-            return this;
-        }
-
         public Builder databaseName(String databaseName) {
             this.databaseName = Optional.of(databaseName);
             return this;
@@ -88,6 +91,66 @@ public class PostgresConfiguration {
             return this;
         }
 
+        public Builder host(String host) {
+            this.host = Optional.of(host);
+            return this;
+        }
+
+        public Builder host(Optional<String> host) {
+            this.host = host;
+            return this;
+        }
+
+        public Builder port(Integer port) {
+            this.port = Optional.of(port);
+            return this;
+        }
+
+        public Builder port(Optional<Integer> port) {
+            this.port = port;
+            return this;
+        }
+
+        public Builder username(String username) {
+            this.username = Optional.of(username);
+            return this;
+        }
+
+        public Builder username(Optional<String> username) {
+            this.username = username;
+            return this;
+        }
+
+        public Builder password(String password) {
+            this.password = Optional.of(password);
+            return this;
+        }
+
+        public Builder password(Optional<String> password) {
+            this.password = password;
+            return this;
+        }
+
+        public Builder nonRLSUser(String nonRLSUser) {
+            this.nonRLSUser = Optional.of(nonRLSUser);
+            return this;
+        }
+
+        public Builder nonRLSUser(Optional<String> nonRLSUser) {
+            this.nonRLSUser = nonRLSUser;
+            return this;
+        }
+
+        public Builder nonRLSPassword(String nonRLSPassword) {
+            this.nonRLSPassword = Optional.of(nonRLSPassword);
+            return this;
+        }
+
+        public Builder nonRLSPassword(Optional<String> nonRLSPassword) {
+            this.nonRLSPassword = nonRLSPassword;
+            return this;
+        }
+
         public Builder rowLevelSecurityEnabled(boolean rlsEnabled) {
             this.rowLevelSecurityEnabled = Optional.of(rlsEnabled);
             return this;
@@ -99,36 +162,22 @@ public class PostgresConfiguration {
         }
 
         public PostgresConfiguration build() {
-            Preconditions.checkArgument(url.isPresent() && 
!url.get().isBlank(), "You need to specify Postgres URI");
-            URI postgresURI = asURI(url.get());
+            Preconditions.checkArgument(username.isPresent() && 
!username.get().isBlank(), "You need to specify username");
+            Preconditions.checkArgument(password.isPresent() && 
!password.get().isBlank(), "You need to specify password");
+
+            if (rowLevelSecurityEnabled.isPresent() && 
rowLevelSecurityEnabled.get()) {
+                Preconditions.checkArgument(nonRLSUser.isPresent() && 
!nonRLSUser.get().isBlank(), "You need to specify nonRLSUser");
+                Preconditions.checkArgument(nonRLSPassword.isPresent() && 
!nonRLSPassword.get().isBlank(), "You need to specify nonRLSPassword");
+            }
 
-            return new PostgresConfiguration(postgresURI,
-                parseCredential(postgresURI),
+            return new PostgresConfiguration(host.orElse(HOST_DEFAULT_VALUE),
+                port.orElse(PORT_DEFAULT_VALUE),
                 databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE),
                 databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE),
+                new Credential(username.get(), password.get()),
+                new Credential(nonRLSUser.orElse(username.get()), 
nonRLSPassword.orElse(password.get())),
                 rowLevelSecurityEnabled.orElse(false));
         }
-
-        private Credential parseCredential(URI postgresURI) {
-            Preconditions.checkArgument(postgresURI.getUserInfo() != null, 
"Postgres URI need to contains user credential");
-            
Preconditions.checkArgument(postgresURI.getUserInfo().contains(":"), "User info 
needs a password part");
-
-            List<String> parts = Splitter.on(':')
-                .splitToList(postgresURI.getUserInfo());
-            ImmutableList<String> passwordParts = parts.stream()
-                .skip(1)
-                .collect(ImmutableList.toImmutableList());
-
-            return new Credential(parts.get(0), 
Joiner.on(':').join(passwordParts));
-        }
-
-        private URI asURI(String uri) {
-            try {
-                return URI.create(uri);
-            } catch (Exception e) {
-                throw new IllegalArgumentException("You need to specify a 
valid Postgres URI", e);
-            }
-        }
     }
 
     public static Builder builder() {
@@ -137,33 +186,43 @@ public class PostgresConfiguration {
 
     public static PostgresConfiguration from(Configuration 
propertiesConfiguration) {
         return builder()
-            .url(propertiesConfiguration.getString(URL, null))
             
.databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME)))
             
.databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA)))
+            .host(Optional.ofNullable(propertiesConfiguration.getString(HOST)))
+            .port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE))
+            
.username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME)))
+            
.password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD)))
+            
.nonRLSUser(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_USERNAME)))
+            
.nonRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_PASSWORD)))
             
.rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false))
             .build();
     }
 
-    private final URI uri;
-    private final Credential credential;
+    private final String host;
+    private final int port;
     private final String databaseName;
     private final String databaseSchema;
+    private final Credential credential;
+    private final Credential nonRLSCredential;
     private final boolean rowLevelSecurityEnabled;
 
-    private PostgresConfiguration(URI uri, Credential credential, String 
databaseName, String databaseSchema, boolean rowLevelSecurityEnabled) {
-        this.uri = uri;
-        this.credential = credential;
+    private PostgresConfiguration(String host, int port, String databaseName, 
String databaseSchema,
+                                  Credential credential, Credential 
nonRLSCredential, boolean rowLevelSecurityEnabled) {
+        this.host = host;
+        this.port = port;
         this.databaseName = databaseName;
         this.databaseSchema = databaseSchema;
+        this.credential = credential;
+        this.nonRLSCredential = nonRLSCredential;
         this.rowLevelSecurityEnabled = rowLevelSecurityEnabled;
     }
 
-    public URI getUri() {
-        return uri;
+    public String getHost() {
+        return host;
     }
 
-    public Credential getCredential() {
-        return credential;
+    public int getPort() {
+        return port;
     }
 
     public String getDatabaseName() {
@@ -174,26 +233,36 @@ public class PostgresConfiguration {
         return databaseSchema;
     }
 
+    public Credential getCredential() {
+        return credential;
+    }
+
+    public Credential getNonRLSCredential() {
+        return nonRLSCredential;
+    }
+
     public boolean rowLevelSecurityEnabled() {
         return rowLevelSecurityEnabled;
     }
 
+    @Override
+    public final int hashCode() {
+        return Objects.hash(host, port, databaseName, databaseSchema, 
credential, nonRLSCredential, rowLevelSecurityEnabled);
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof PostgresConfiguration) {
             PostgresConfiguration that = (PostgresConfiguration) o;
 
             return Objects.equals(this.rowLevelSecurityEnabled, 
that.rowLevelSecurityEnabled)
-                && Objects.equals(this.uri, that.uri)
+                && Objects.equals(this.host, that.host)
+                && Objects.equals(this.port, that.port)
                 && Objects.equals(this.credential, that.credential)
+                && Objects.equals(this.nonRLSCredential, that.nonRLSCredential)
                 && Objects.equals(this.databaseName, that.databaseName)
                 && Objects.equals(this.databaseSchema, that.databaseSchema);
         }
         return false;
     }
-
-    @Override
-    public final int hashCode() {
-        return Objects.hash(uri, credential, databaseName, databaseSchema, 
rowLevelSecurityEnabled);
-    }
 }
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
index 8d8391e209..c196f80642 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
@@ -28,6 +28,7 @@ import reactor.core.publisher.Mono;
 
 public interface JamesPostgresConnectionFactory {
     String DOMAIN_ATTRIBUTE = "app.current_domain";
+    String NON_RLS_INJECT = "non_rls";
 
     default Mono<Connection> getConnection(Domain domain) {
         return getConnection(Optional.ofNullable(domain));
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 67f6c2067b..268e14a08a 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -47,6 +47,7 @@ import reactor.util.retry.Retry;
 public class PostgresExecutor {
 
     public static final String DEFAULT_INJECT = "default";
+    public static final String NON_RLS_INJECT = "non_rls";
     public static final int MAX_RETRY_ATTEMPTS = 5;
     public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
 
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
index 248eb0dd66..b47f66abe4 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresConfigurationTest.java
@@ -22,89 +22,98 @@ package org.apache.james.backends.postgres;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.Test;
 
 class PostgresConfigurationTest {
 
     @Test
-    void shouldThrowWhenMissingPostgresURI() {
+    void shouldReturnCorrespondingProperties() {
+        PostgresConfiguration configuration = PostgresConfiguration.builder()
+            .host("1.1.1.1")
+            .port(1111)
+            .databaseName("db")
+            .databaseSchema("sc")
+            .username("james")
+            .password("1")
+            .nonRLSUser("nonrlsjames")
+            .nonRLSPassword("2")
+            .rowLevelSecurityEnabled()
+            .build();
+
+        assertThat(configuration.getHost()).isEqualTo("1.1.1.1");
+        assertThat(configuration.getPort()).isEqualTo(1111);
+        assertThat(configuration.getDatabaseName()).isEqualTo("db");
+        assertThat(configuration.getDatabaseSchema()).isEqualTo("sc");
+        
assertThat(configuration.getCredential().getUsername()).isEqualTo("james");
+        assertThat(configuration.getCredential().getPassword()).isEqualTo("1");
+        
assertThat(configuration.getNonRLSCredential().getUsername()).isEqualTo("nonrlsjames");
+        
assertThat(configuration.getNonRLSCredential().getPassword()).isEqualTo("2");
+        assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(true);
+    }
+
+    @Test
+    void shouldUseDefaultValues() {
+        PostgresConfiguration configuration = PostgresConfiguration.builder()
+            .username("james")
+            .password("1")
+            .build();
+
+        
assertThat(configuration.getHost()).isEqualTo(PostgresConfiguration.HOST_DEFAULT_VALUE);
+        
assertThat(configuration.getPort()).isEqualTo(PostgresConfiguration.PORT_DEFAULT_VALUE);
+        
assertThat(configuration.getDatabaseName()).isEqualTo(PostgresConfiguration.DATABASE_NAME_DEFAULT_VALUE);
+        
assertThat(configuration.getDatabaseSchema()).isEqualTo(PostgresConfiguration.DATABASE_SCHEMA_DEFAULT_VALUE);
+        
assertThat(configuration.getNonRLSCredential().getUsername()).isEqualTo("james");
+        
assertThat(configuration.getNonRLSCredential().getPassword()).isEqualTo("1");
+        assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(false);
+    }
+
+    @Test
+    void shouldThrowWhenMissingUsername() {
         assertThatThrownBy(() -> PostgresConfiguration.builder()
             .build())
             .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("You need to specify Postgres URI");
+            .hasMessage("You need to specify username");
     }
 
     @Test
-    void shouldThrowWhenInvalidURI() {
+    void shouldThrowWhenMissingPassword() {
         assertThatThrownBy(() -> PostgresConfiguration.builder()
-            .url(":invalid")
+            .username("james")
             .build())
             .isInstanceOf(IllegalArgumentException.class)
-            .hasMessage("You need to specify a valid Postgres URI");
+            .hasMessage("You need to specify password");
     }
 
     @Test
-    void shouldThrowWhenURIMissingCredential() {
+    void shouldThrowWhenMissingNonRLSUserAndRLSIsEnabled() {
         assertThatThrownBy(() -> PostgresConfiguration.builder()
-            .url("postgresql://localhost:5432")
+            .username("james")
+            .password("1")
+            .rowLevelSecurityEnabled()
             .build())
             .isInstanceOf(IllegalArgumentException.class)
-            .hasMessage("Postgres URI need to contains user credential");
+            .hasMessage("You need to specify nonRLSUser");
     }
 
     @Test
-    void shouldParseValidURI() {
-        PostgresConfiguration configuration = PostgresConfiguration.builder()
-            .url("postgresql://username:password@postgreshost:5672")
-            .build();
-
-        assertThat(configuration.getUri().getHost()).isEqualTo("postgreshost");
-        assertThat(configuration.getUri().getPort()).isEqualTo(5672);
-        
assertThat(configuration.getCredential().getUsername()).isEqualTo("username");
-        
assertThat(configuration.getCredential().getPassword()).isEqualTo("password");
+    void shouldThrowWhenMissingNonRLSPasswordAndRLSIsEnabled() {
+        assertThatThrownBy(() -> PostgresConfiguration.builder()
+            .username("james")
+            .password("1")
+            .nonRLSUser("nonrlsjames")
+            .rowLevelSecurityEnabled()
+            .build())
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("You need to specify nonRLSPassword");
     }
 
     @Test
     void rowLevelSecurityShouldBeDisabledByDefault() {
         PostgresConfiguration configuration = PostgresConfiguration.builder()
-            .url("postgresql://username:password@postgreshost:5672")
+            .username("james")
+            .password("1")
             .build();
 
         assertThat(configuration.rowLevelSecurityEnabled()).isFalse();
     }
-
-    @Test
-    void databaseNameShouldFallbackToDefaultWhenNotSet() {
-        PostgresConfiguration configuration = PostgresConfiguration.builder()
-            .url("postgresql://username:password@postgreshost:5672")
-            .build();
-
-        assertThat(configuration.getDatabaseName()).isEqualTo("postgres");
-    }
-
-    @Test
-    void databaseSchemaShouldFallbackToDefaultWhenNotSet() {
-        PostgresConfiguration configuration = PostgresConfiguration.builder()
-            .url("postgresql://username:password@postgreshost:5672")
-            .build();
-
-        assertThat(configuration.getDatabaseSchema()).isEqualTo("public");
-    }
-
-    @Test
-    void shouldReturnCorrespondingProperties() {
-        PostgresConfiguration configuration = PostgresConfiguration.builder()
-            .url("postgresql://username:password@postgreshost:5672")
-            .rowLevelSecurityEnabled()
-            .databaseName("databaseName")
-            .databaseSchema("databaseSchema")
-            .build();
-
-        SoftAssertions.assertSoftly(softly -> {
-            
softly.assertThat(configuration.rowLevelSecurityEnabled()).isEqualTo(true);
-            
softly.assertThat(configuration.getDatabaseName()).isEqualTo("databaseName");
-            
softly.assertThat(configuration.getDatabaseSchema()).isEqualTo("databaseSchema");
-        });
-    }
 }
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index 672a770d6e..2a2c6b9a33 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -25,9 +25,9 @@ import static 
org.apache.james.backends.postgres.PostgresFixture.Database.ROW_LE
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
-import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.GuiceModuleTestExtension;
 import 
org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
@@ -114,23 +114,22 @@ public class PostgresExtension implements 
GuiceModuleTestExtension {
         PG_CONTAINER.execInContainer("psql", "-U", selectedDatabase.dbUser(), 
selectedDatabase.dbName(), "-c", String.format("CREATE EXTENSION IF NOT EXISTS 
hstore SCHEMA %s;", selectedDatabase.schema()));
     }
 
-    private void initPostgresSession() throws URISyntaxException {
+    private void initPostgresSession() {
         postgresConfiguration = PostgresConfiguration.builder()
-            .url(new URIBuilder()
-                .setScheme("postgresql")
-                .setHost(getHost())
-                .setPort(getMappedPort())
-                .setUserInfo(selectedDatabase.dbUser(), 
selectedDatabase.dbPassword())
-                .build()
-                .toString())
             .databaseName(selectedDatabase.dbName())
             .databaseSchema(selectedDatabase.schema())
+            .host(getHost())
+            .port(getMappedPort())
+            .username(selectedDatabase.dbUser())
+            .password(selectedDatabase.dbPassword())
+            .nonRLSUser(DEFAULT_DATABASE.dbUser())
+            .nonRLSPassword(DEFAULT_DATABASE.dbPassword())
             .rowLevelSecurityEnabled(rlsEnabled)
             .build();
 
         connectionFactory = new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
-            .host(postgresConfiguration.getUri().getHost())
-            .port(postgresConfiguration.getUri().getPort())
+            .host(postgresConfiguration.getHost())
+            .port(postgresConfiguration.getPort())
             .username(postgresConfiguration.getCredential().getUsername())
             .password(postgresConfiguration.getCredential().getPassword())
             .database(postgresConfiguration.getDatabaseName())
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
similarity index 64%
copy from 
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
copy to 
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
index 8d8391e209..d4136a081e 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSource.java
@@ -17,21 +17,26 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.mailbox.postgres.mail;
 
-import java.util.Optional;
+import javax.inject.Inject;
 
-import org.apache.james.core.Domain;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
 
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
 
-public interface JamesPostgresConnectionFactory {
-    String DOMAIN_ATTRIBUTE = "app.current_domain";
+public class PostgresMessageBlobReferenceSource implements BlobReferenceSource 
{
+    private PostgresMessageDAO postgresMessageDAO;
 
-    default Mono<Connection> getConnection(Domain domain) {
-        return getConnection(Optional.ofNullable(domain));
+    @Inject
+    public PostgresMessageBlobReferenceSource(PostgresMessageDAO 
postgresMessageDAO) {
+        this.postgresMessageDAO = postgresMessageDAO;
     }
 
-    Mono<Connection> getConnection(Optional<Domain> domain);
+    @Override
+    public Flux<BlobId> listReferencedBlobs() {
+        return postgresMessageDAO.listBlobs();
+    }
 }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
index c68b0e3792..d4aca8b5a9 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
@@ -46,6 +46,7 @@ import java.time.LocalDateTime;
 import java.util.Optional;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 import javax.inject.Singleton;
 
 import org.apache.commons.io.IOUtils;
@@ -60,6 +61,7 @@ import 
org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.jooq.Record;
 import org.jooq.postgres.extensions.types.Hstore;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -85,7 +87,8 @@ public class PostgresMessageDAO {
     private final PostgresExecutor postgresExecutor;
     private final BlobId.Factory blobIdFactory;
 
-    public PostgresMessageDAO(PostgresExecutor postgresExecutor, 
BlobId.Factory blobIdFactory) {
+    @Inject
+    public PostgresMessageDAO(@Named(PostgresExecutor.NON_RLS_INJECT) 
PostgresExecutor postgresExecutor, BlobId.Factory blobIdFactory) {
         this.postgresExecutor = postgresExecutor;
         this.blobIdFactory = blobIdFactory;
     }
@@ -144,4 +147,10 @@ public class PostgresMessageDAO {
             .map(record -> blobIdFactory.from(record.get(BODY_BLOB_ID)));
     }
 
+    public Flux<BlobId> listBlobs() {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(BODY_BLOB_ID)
+            .from(TABLE_NAME)))
+            .map(record -> blobIdFactory.from(record.get(BODY_BLOB_ID)));
+    }
+
 }
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
new file mode 100644
index 0000000000..37b5a91117
--- /dev/null
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageBlobReferenceSourceTest.java
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.UUID;
+
+import javax.mail.Flags;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.ByteContent;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PostgresMessageBlobReferenceSourceTest {
+    private static final int BODY_START = 16;
+    private static final PostgresMailboxId MAILBOX_ID = 
PostgresMailboxId.generate();
+    private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+    private static final String CONTENT_2 = "Subject: Test3 \n\nBody23\n.\n";
+    private static final MessageUid MESSAGE_UID = MessageUid.of(1);
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
+
+    PostgresMessageBlobReferenceSource blobReferenceSource;
+    PostgresMessageDAO postgresMessageDAO;
+
+    @BeforeEach
+    void beforeEach() {
+        postgresMessageDAO = new 
PostgresMessageDAO(postgresExtension.getPostgresExecutor(), new 
HashBlobId.Factory());
+        blobReferenceSource = new 
PostgresMessageBlobReferenceSource(postgresMessageDAO);
+    }
+
+    @Test
+    void blobReferencesShouldBeEmptyByDefault() {
+        
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void blobReferencesShouldReturnAllBlobs() {
+        MessageId messageId1 = PostgresMessageId.Factory.of(UUID.randomUUID());
+        SimpleMailboxMessage message = createMessage(messageId1, 
ThreadId.fromBaseMessageId(messageId1),  CONTENT, BODY_START, new 
PropertyBuilder());
+        MessageId messageId2 = PostgresMessageId.Factory.of(UUID.randomUUID());
+        MailboxMessage message2 = createMessage(messageId2, 
ThreadId.fromBaseMessageId(messageId2),  CONTENT_2, BODY_START, new 
PropertyBuilder());
+        postgresMessageDAO.insert(message, "1") .block();
+        postgresMessageDAO.insert(message2, "2") .block();
+
+        
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
+            .hasSize(2);
+    }
+
+    private SimpleMailboxMessage createMessage(MessageId messageId, ThreadId 
threadId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
+        return SimpleMailboxMessage.builder()
+            .messageId(messageId)
+            .threadId(threadId)
+            .mailboxId(MAILBOX_ID)
+            .uid(MESSAGE_UID)
+            .internalDate(new Date())
+            .bodyStartOctet(bodyStart)
+            .size(content.length())
+            .content(new ByteContent(content.getBytes(StandardCharsets.UTF_8)))
+            .flags(new Flags())
+            .properties(propertyBuilder)
+            .build();
+    }
+
+}
diff --git a/server/apps/postgres-app/sample-configuration/postgres.properties 
b/server/apps/postgres-app/sample-configuration/postgres.properties
index 0bfe376f4d..b93071532e 100644
--- a/server/apps/postgres-app/sample-configuration/postgres.properties
+++ b/server/apps/postgres-app/sample-configuration/postgres.properties
@@ -1,11 +1,26 @@
-# String. Required. PostgreSQL URI in the format 
postgresql://username:password@host:port
-url=postgresql://james:secret1@postgres:5432
-
 # String. Optional, default to 'postgres'. Database name.
 database.name=james
 
 # String. Optional, default to 'public'. Database schema.
 database.schema=public
 
+# String. Optional, default to 'localhost'. Database host.
+database.host=postgres
+
+# Integer. Optional, default to 5432. Database port.
+database.port=5432
+
+# String. Required. Database username.
+database.username=james
+
+# String. Required. Database password of the user.
+database.password=secret1
+
+# String. It is required when row.level.security.enabled is true. Database 
username with the permission of bypassing RLS.
+database.non-rls.username=nonrlsjames
+
+# String. It is required when row.level.security.enabled is true. Database 
password of non-rls user.
+database.non-rls.password=secret1
+
 # Boolean. Optional, default to false. Whether to enable row level security.
 row.level.security.enabled=true
diff --git 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 5da7524604..76f6244de2 100644
--- 
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++ 
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -21,7 +21,6 @@ package org.apache.james;
 
 import java.util.List;
 
-import org.apache.james.blob.api.BlobReferenceSource;
 import org.apache.james.data.UsersRepositoryModuleChooser;
 import org.apache.james.modules.BlobExportMechanismModule;
 import org.apache.james.modules.MailboxModule;
@@ -63,12 +62,10 @@ import org.apache.james.modules.server.TaskManagerModule;
 import 
org.apache.james.modules.server.WebAdminReIndexingTaskSerializationModule;
 import org.apache.james.modules.server.WebAdminServerModule;
 import org.apache.james.modules.vault.DeletedMessageVaultRoutesModule;
-import org.apache.james.server.blob.deduplication.StorageStrategy;
 import org.apache.james.vault.VaultConfiguration;
 
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Module;
-import com.google.inject.multibindings.Multibinder;
 import com.google.inject.util.Modules;
 
 public class PostgresJamesServerMain implements JamesServerMain {
@@ -145,11 +142,6 @@ public class PostgresJamesServerMain implements 
JamesServerMain {
             
.addAll(BlobStoreModulesChooser.chooseModules(configuration.blobStoreConfiguration()))
             .add(new BlobStoreCacheModulesChooser.CacheDisabledModule());
 
-        // should remove this after 
https://github.com/linagora/james-project/issues/4998
-        if 
(configuration.blobStoreConfiguration().storageStrategy().equals(StorageStrategy.DEDUPLICATION))
 {
-            builder.add(binder -> Multibinder.newSetBinder(binder, 
BlobReferenceSource.class));
-        }
-
         return builder.build();
     }
 
diff --git 
a/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
 
b/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
index 97f4716a4a..b1a955f6ac 100644
--- 
a/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
+++ 
b/server/container/guice/mailbox-postgres/src/main/java/org/apache/james/modules/mailbox/PostgresMailboxModule.java
@@ -29,6 +29,7 @@ import 
org.apache.james.adapter.mailbox.QuotaUsernameChangeTaskStep;
 import org.apache.james.adapter.mailbox.UserRepositoryAuthenticator;
 import org.apache.james.adapter.mailbox.UserRepositoryAuthorizator;
 import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobReferenceSource;
 import org.apache.james.events.EventListener;
 import org.apache.james.mailbox.AttachmentContentLoader;
 import org.apache.james.mailbox.Authenticator;
@@ -49,6 +50,8 @@ import org.apache.james.mailbox.postgres.PostgresMailboxId;
 import org.apache.james.mailbox.postgres.PostgresMailboxSessionMapperFactory;
 import org.apache.james.mailbox.postgres.PostgresMessageId;
 import org.apache.james.mailbox.postgres.mail.PostgresMailboxManager;
+import 
org.apache.james.mailbox.postgres.mail.PostgresMessageBlobReferenceSource;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
 import org.apache.james.mailbox.store.MailboxManagerConfiguration;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.NoMailboxPathLocker;
@@ -117,6 +120,8 @@ public class PostgresMailboxModule extends AbstractModule {
 
         bind(ReIndexer.class).to(ReIndexerImpl.class);
 
+        bind(PostgresMessageDAO.class).in(Scopes.SINGLETON);
+
         Multibinder.newSetBinder(binder(), 
MailboxManagerDefinition.class).addBinding().to(PostgresMailboxManagerDefinition.class);
 
         Multibinder.newSetBinder(binder(), 
EventListener.GroupEventListener.class)
@@ -141,6 +146,9 @@ public class PostgresMailboxModule extends AbstractModule {
 
         Multibinder<DeleteUserDataTaskStep> deleteUserDataTaskStepMultibinder 
= Multibinder.newSetBinder(binder(), DeleteUserDataTaskStep.class);
         
deleteUserDataTaskStepMultibinder.addBinding().to(MailboxUserDeletionTaskStep.class);
+
+        Multibinder<BlobReferenceSource> blobReferenceSourceMultibinder = 
Multibinder.newSetBinder(binder(), BlobReferenceSource.class);
+        
blobReferenceSourceMultibinder.addBinding().to(PostgresMessageBlobReferenceSource.class);
     }
 
     @Singleton
diff --git 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
index e5f849cebb..5a2950e484 100644
--- 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
+++ 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresCommonModule.java
@@ -73,13 +73,24 @@ public class PostgresCommonModule extends AbstractModule {
 
     @Provides
     @Singleton
-    JamesPostgresConnectionFactory 
provideJamesPostgresConnectionFactory(PostgresConfiguration 
postgresConfiguration, ConnectionFactory connectionFactory) {
+    JamesPostgresConnectionFactory 
provideJamesPostgresConnectionFactory(PostgresConfiguration 
postgresConfiguration,
+                                                                         
ConnectionFactory connectionFactory,
+                                                                         
@Named(JamesPostgresConnectionFactory.NON_RLS_INJECT) 
JamesPostgresConnectionFactory singlePostgresConnectionFactory) {
         if (postgresConfiguration.rowLevelSecurityEnabled()) {
             LOGGER.info("PostgreSQL row level security enabled");
             LOGGER.info("Implementation for PostgreSQL connection factory: 
{}", DomainImplPostgresConnectionFactory.class.getName());
             return new DomainImplPostgresConnectionFactory(connectionFactory);
         }
         LOGGER.info("Implementation for PostgreSQL connection factory: {}", 
SinglePostgresConnectionFactory.class.getName());
+        return singlePostgresConnectionFactory;
+    }
+
+    @Provides
+    @Named(JamesPostgresConnectionFactory.NON_RLS_INJECT)
+    @Singleton
+    JamesPostgresConnectionFactory 
provideJamesPostgresConnectionFactoryWithRLSBypass(PostgresConfiguration 
postgresConfiguration,
+                                                                               
       @Named(JamesPostgresConnectionFactory.NON_RLS_INJECT) ConnectionFactory 
connectionFactory) {
+        LOGGER.info("Implementation for PostgreSQL connection factory: {}", 
SinglePostgresConnectionFactory.class.getName());
         return new 
SinglePostgresConnectionFactory(Mono.from(connectionFactory.create()).block());
     }
 
@@ -87,8 +98,8 @@ public class PostgresCommonModule extends AbstractModule {
     @Singleton
     ConnectionFactory postgresqlConnectionFactory(PostgresConfiguration 
postgresConfiguration) {
         return new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
-            .host(postgresConfiguration.getUri().getHost())
-            .port(postgresConfiguration.getUri().getPort())
+            .host(postgresConfiguration.getHost())
+            .port(postgresConfiguration.getPort())
             .username(postgresConfiguration.getCredential().getUsername())
             .password(postgresConfiguration.getCredential().getPassword())
             .database(postgresConfiguration.getDatabaseName())
@@ -96,6 +107,20 @@ public class PostgresCommonModule extends AbstractModule {
             .build());
     }
 
+    @Provides
+    @Named(JamesPostgresConnectionFactory.NON_RLS_INJECT)
+    @Singleton
+    ConnectionFactory 
postgresqlConnectionFactoryRLSBypass(PostgresConfiguration 
postgresConfiguration) {
+        return new 
PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
+            .host(postgresConfiguration.getHost())
+            .port(postgresConfiguration.getPort())
+            
.username(postgresConfiguration.getNonRLSCredential().getUsername())
+            
.password(postgresConfiguration.getNonRLSCredential().getPassword())
+            .database(postgresConfiguration.getDatabaseName())
+            .schema(postgresConfiguration.getDatabaseSchema())
+            .build());
+    }
+
     @Provides
     @Singleton
     PostgresModule composePostgresDataDefinitions(Set<PostgresModule> modules) 
{
@@ -110,6 +135,13 @@ public class PostgresCommonModule extends AbstractModule {
         return new PostgresTableManager(postgresExecutor, postgresModule, 
postgresConfiguration);
     }
 
+    @Provides
+    @Named(PostgresExecutor.NON_RLS_INJECT)
+    @Singleton
+    PostgresExecutor.Factory 
postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) 
JamesPostgresConnectionFactory singlePostgresConnectionFactory) {
+        return new PostgresExecutor.Factory(singlePostgresConnectionFactory);
+    }
+
     @Provides
     @Named(DEFAULT_INJECT)
     @Singleton
@@ -117,6 +149,13 @@ public class PostgresCommonModule extends AbstractModule {
         return factory.create();
     }
 
+    @Provides
+    @Named(PostgresExecutor.NON_RLS_INJECT)
+    @Singleton
+    PostgresExecutor 
postgresExecutorWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) 
PostgresExecutor.Factory factory) {
+        return factory.create();
+    }
+
     @Provides
     @Singleton
     PostgresExecutor postgresExecutor(@Named(DEFAULT_INJECT) PostgresExecutor 
postgresExecutor) {
diff --git 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
index f0bbbfa3ae..550fb7c8cf 100644
--- 
a/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
+++ 
b/server/container/guice/postgres-common/src/main/java/org/apache/james/modules/data/PostgresMailRepositoryModule.java
@@ -21,11 +21,14 @@ package org.apache.james.modules.data;
 
 import org.apache.commons.configuration2.BaseHierarchicalConfiguration;
 import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobReferenceSource;
 import org.apache.james.mailrepository.api.MailRepositoryFactory;
 import org.apache.james.mailrepository.api.MailRepositoryUrlStore;
 import org.apache.james.mailrepository.api.Protocol;
 import org.apache.james.mailrepository.memory.MailRepositoryStoreConfiguration;
 import org.apache.james.mailrepository.postgres.PostgresMailRepository;
+import 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryBlobReferenceSource;
+import 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryContentDAO;
 import org.apache.james.mailrepository.postgres.PostgresMailRepositoryFactory;
 import org.apache.james.mailrepository.postgres.PostgresMailRepositoryUrlStore;
 
@@ -37,6 +40,7 @@ import com.google.inject.multibindings.Multibinder;
 public class PostgresMailRepositoryModule extends AbstractModule {
     @Override
     protected void configure() {
+        bind(PostgresMailRepositoryContentDAO.class).in(Scopes.SINGLETON);
         bind(PostgresMailRepositoryUrlStore.class).in(Scopes.SINGLETON);
 
         
bind(MailRepositoryUrlStore.class).to(PostgresMailRepositoryUrlStore.class);
@@ -51,5 +55,8 @@ public class PostgresMailRepositoryModule extends 
AbstractModule {
             .addBinding().to(PostgresMailRepositoryFactory.class);
         Multibinder.newSetBinder(binder(), PostgresModule.class)
             
.addBinding().toInstance(org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.MODULE);
+
+        Multibinder<BlobReferenceSource> blobReferenceSourceMultibinder = 
Multibinder.newSetBinder(binder(), BlobReferenceSource.class);
+        
blobReferenceSourceMultibinder.addBinding().to(PostgresMailRepositoryBlobReferenceSource.class);
     }
 }
diff --git 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
index 241fb21536..1f9da8f4c7 100644
--- 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
+++ 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
@@ -19,344 +19,67 @@
 
 package org.apache.james.mailrepository.postgres;
 
-import static 
org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME;
-import static 
org.apache.james.backends.postgres.PostgresCommons.LOCAL_DATE_TIME_DATE_FUNCTION;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.ATTRIBUTES;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.BODY_BLOB_ID;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.ERROR;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.HEADER_BLOB_ID;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.KEY;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.LAST_UPDATED;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.PER_RECIPIENT_SPECIFIC_HEADERS;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.RECIPIENTS;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.REMOTE_ADDRESS;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.REMOTE_HOST;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.SENDER;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.STATE;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.TABLE_NAME;
-import static 
org.apache.james.mailrepository.postgres.PostgresMailRepositoryModule.PostgresMailRepositoryContentTable.URL;
-import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
-
-import java.time.LocalDateTime;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
-import javax.mail.internet.MimeMessage;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.postgres.utils.PostgresExecutor;
-import org.apache.james.backends.postgres.utils.PostgresUtils;
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.Store;
-import org.apache.james.blob.mail.MimeMessagePartsId;
-import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.core.MailAddress;
-import org.apache.james.core.MaybeSender;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
-import org.apache.james.server.core.MailImpl;
-import org.apache.james.server.core.MimeMessageWrapper;
-import org.apache.james.util.AuditTrail;
-import org.apache.mailet.Attribute;
-import org.apache.mailet.AttributeName;
-import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
-import org.apache.mailet.PerRecipientHeaders;
-import org.jooq.Record;
-import org.jooq.postgres.extensions.types.Hstore;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimap;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class PostgresMailRepository implements MailRepository {
-    private static final String HEADERS_SEPARATOR = ";  ";
-
-    private final PostgresExecutor postgresExecutor;
     private final MailRepositoryUrl url;
-    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
-    private final BlobId.Factory blobIdFactory;
+    private final PostgresMailRepositoryContentDAO 
postgresMailRepositoryContentDAO;
 
     @Inject
-    public PostgresMailRepository(PostgresExecutor postgresExecutor,
-                                  MailRepositoryUrl url,
-                                  MimeMessageStore.Factory 
mimeMessageStoreFactory,
-                                  BlobId.Factory blobIdFactory) {
-        this.postgresExecutor = postgresExecutor;
+    public PostgresMailRepository(MailRepositoryUrl url,
+                                  PostgresMailRepositoryContentDAO 
postgresMailRepositoryContentDAO) {
         this.url = url;
-        this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
-        this.blobIdFactory = blobIdFactory;
+        this.postgresMailRepositoryContentDAO = 
postgresMailRepositoryContentDAO;
     }
 
     @Override
     public long size() throws MessagingException {
-        return sizeReactive().block();
+        return postgresMailRepositoryContentDAO.size(url);
     }
 
     @Override
     public Mono<Long> sizeReactive() {
-        return postgresExecutor.executeCount(context -> 
Mono.from(context.selectCount()
-                .from(TABLE_NAME)
-                .where(URL.eq(url.asString()))))
-            .map(Integer::longValue);
+        return postgresMailRepositoryContentDAO.sizeReactive(url);
     }
 
     @Override
     public MailKey store(Mail mail) throws MessagingException {
-        MailKey mailKey = MailKey.forMail(mail);
-
-        return storeMailBlob(mail)
-            .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey, 
mimeMessagePartsId)
-                .doOnSuccess(auditTrailStoredMail(mail))
-                
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> 
Mono.from(mimeMessageStore.delete(mimeMessagePartsId))
-                    .thenReturn(mailKey)))
-            .block();
-    }
-
-    private Mono<MimeMessagePartsId> storeMailBlob(Mail mail) throws 
MessagingException {
-        return mimeMessageStore.save(mail.getMessage());
-    }
-
-    private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey, 
MimeMessagePartsId mimeMessagePartsId) {
-        return postgresExecutor.executeVoid(context -> 
Mono.from(context.insertInto(TABLE_NAME)
-                .set(URL, url.asString())
-                .set(KEY, mailKey.asString())
-                .set(HEADER_BLOB_ID, 
mimeMessagePartsId.getHeaderBlobId().asString())
-                .set(BODY_BLOB_ID, 
mimeMessagePartsId.getBodyBlobId().asString())
-                .set(STATE, mail.getState())
-                .set(ERROR, mail.getErrorMessage())
-                .set(SENDER, mail.getMaybeSender().asString())
-                .set(RECIPIENTS, asStringArray(mail.getRecipients()))
-                .set(REMOTE_ADDRESS, mail.getRemoteAddr())
-                .set(REMOTE_HOST, mail.getRemoteHost())
-                .set(LAST_UPDATED, 
DATE_TO_LOCAL_DATE_TIME.apply(mail.getLastUpdated()))
-                .set(ATTRIBUTES, asHstore(mail.attributes()))
-                .set(PER_RECIPIENT_SPECIFIC_HEADERS, 
asHstore(mail.getPerRecipientSpecificHeaders().getHeadersByRecipient()))
-                .onConflict(URL, KEY)
-                .doUpdate()
-                .set(HEADER_BLOB_ID, 
mimeMessagePartsId.getHeaderBlobId().asString())
-                .set(BODY_BLOB_ID, 
mimeMessagePartsId.getBodyBlobId().asString())
-                .set(STATE, mail.getState())
-                .set(ERROR, mail.getErrorMessage())
-                .set(SENDER, mail.getMaybeSender().asString())
-                .set(RECIPIENTS, asStringArray(mail.getRecipients()))
-                .set(REMOTE_ADDRESS, mail.getRemoteAddr())
-                .set(REMOTE_HOST, mail.getRemoteHost())
-                .set(LAST_UPDATED, 
DATE_TO_LOCAL_DATE_TIME.apply(mail.getLastUpdated()))
-                .set(ATTRIBUTES, asHstore(mail.attributes()))
-                .set(PER_RECIPIENT_SPECIFIC_HEADERS, 
asHstore(mail.getPerRecipientSpecificHeaders().getHeadersByRecipient()))
-            ))
-            .thenReturn(mailKey);
-    }
-
-    private Consumer<MailKey> auditTrailStoredMail(Mail mail) {
-        return Throwing.consumer(any -> AuditTrail.entry()
-            .protocol("mailrepository")
-            .action("store")
-            .parameters(Throwing.supplier(() -> ImmutableMap.of("mailId", 
mail.getName(),
-                "mimeMessageId", Optional.ofNullable(mail.getMessage())
-                    .map(Throwing.function(MimeMessage::getMessageID))
-                    .orElse(""),
-                "sender", mail.getMaybeSender().asString(),
-                "recipients", StringUtils.join(mail.getRecipients()))))
-            .log("PostgresMailRepository stored mail."));
-    }
-
-    private String[] asStringArray(Collection<MailAddress> mailAddresses) {
-        return mailAddresses.stream()
-            .map(MailAddress::asString)
-            .toArray(String[]::new);
-    }
-
-    private Hstore asHstore(Multimap<MailAddress, PerRecipientHeaders.Header> 
multimap) {
-        return Hstore.hstore(multimap
-            .asMap()
-            .entrySet()
-            .stream()
-            .map(recipientToHeaders -> 
Pair.of(recipientToHeaders.getKey().asString(),
-                asString(recipientToHeaders.getValue())))
-            .collect(ImmutableMap.toImmutableMap(Pair::getLeft, 
Pair::getRight)));
-    }
-
-    private String asString(Collection<PerRecipientHeaders.Header> headers) {
-        return StringUtils.join(headers.stream()
-            .map(PerRecipientHeaders.Header::asString)
-            .collect(ImmutableList.toImmutableList()), HEADERS_SEPARATOR);
-    }
-
-    private Hstore asHstore(Stream<Attribute> attributes) {
-        return Hstore.hstore(attributes
-            .flatMap(attribute -> attribute.getValue()
-                .toJson()
-                .map(JsonNode::toString)
-                .map(value -> Pair.of(attribute.getName().asString(), 
value)).stream())
-            .collect(ImmutableMap.toImmutableMap(Pair::getLeft, 
Pair::getRight)));
+        return postgresMailRepositoryContentDAO.store(mail, url);
     }
 
     @Override
     public Iterator<MailKey> list() throws MessagingException {
-        return listMailKeys()
-            .toStream()
-            .iterator();
-    }
-
-    private Flux<MailKey> listMailKeys() {
-        return postgresExecutor.executeRows(context -> 
Flux.from(context.select(KEY)
-                .from(TABLE_NAME)
-                .where(URL.eq(url.asString()))))
-            .map(record -> new MailKey(record.get(KEY)));
+        return postgresMailRepositoryContentDAO.list(url);
     }
 
     @Override
     public Mail retrieve(MailKey key) {
-        return postgresExecutor.executeRow(context -> 
Mono.from(context.select()
-                .from(TABLE_NAME)
-                .where(URL.eq(url.asString()))
-                .and(KEY.eq(key.asString()))))
-            .flatMap(this::toMail)
-            .blockOptional()
-            .orElse(null);
-    }
-
-    private Mono<Mail> toMail(Record record) {
-        return mimeMessageStore.read(toMimeMessagePartsId(record))
-            .map(Throwing.function(mimeMessage -> toMail(record, 
mimeMessage)));
-    }
-
-    private Mail toMail(Record record, MimeMessage mimeMessage) throws 
MessagingException {
-        List<MailAddress> recipients = Arrays.stream(record.get(RECIPIENTS))
-            .map(Throwing.function(MailAddress::new))
-            .collect(ImmutableList.toImmutableList());
-
-        PerRecipientHeaders perRecipientHeaders = 
getPerRecipientHeaders(record);
-
-        List<Attribute> attributes = Hstore.hstore(record.get(ATTRIBUTES, 
LinkedHashMap.class))
-            .data()
-            .entrySet()
-            .stream()
-            .map(Throwing.function(entry -> new 
Attribute(AttributeName.of(entry.getKey()),
-                AttributeValue.fromJsonString(entry.getValue()))))
-            .collect(ImmutableList.toImmutableList());
-
-        MailImpl mail = MailImpl.builder()
-            .name(record.get(KEY))
-            .sender(MaybeSender.getMailSender(record.get(SENDER)))
-            .addRecipients(recipients)
-            
.lastUpdated(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(LAST_UPDATED, 
LocalDateTime.class)))
-            .errorMessage(record.get(ERROR))
-            .remoteHost(record.get(REMOTE_HOST))
-            .remoteAddr(record.get(REMOTE_ADDRESS))
-            .state(record.get(STATE))
-            .addAllHeadersForRecipients(perRecipientHeaders)
-            .addAttributes(attributes)
-            .build();
-
-        if (mimeMessage instanceof MimeMessageWrapper) {
-            mail.setMessageNoCopy((MimeMessageWrapper) mimeMessage);
-        } else {
-            mail.setMessage(mimeMessage);
-        }
-
-        return mail;
-    }
-
-    private PerRecipientHeaders getPerRecipientHeaders(Record record) {
-        PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
-
-        Hstore.hstore(record.get(PER_RECIPIENT_SPECIFIC_HEADERS, 
LinkedHashMap.class))
-            .data()
-            .entrySet()
-            .stream()
-            .flatMap(this::recipientToHeaderStream)
-            .forEach(recipientToHeaderPair -> 
perRecipientHeaders.addHeaderForRecipient(
-                recipientToHeaderPair.getRight(),
-                recipientToHeaderPair.getLeft()));
-
-        return perRecipientHeaders;
-    }
-
-    private Stream<Pair<MailAddress, PerRecipientHeaders.Header>> 
recipientToHeaderStream(Map.Entry<String, String> recipientToHeadersString) {
-        List<String> headers = Splitter.on(HEADERS_SEPARATOR)
-            .splitToList(recipientToHeadersString.getValue());
-
-        return headers
-            .stream()
-            .map(headerAsString -> Pair.of(
-                    asMailAddress(recipientToHeadersString.getKey()),
-                PerRecipientHeaders.Header.fromString(headerAsString)));
-    }
-
-    private MailAddress asMailAddress(String mailAddress) {
-        return Throwing.supplier(() -> new MailAddress(mailAddress))
-            .get();
-    }
-
-    private MimeMessagePartsId toMimeMessagePartsId(Record record) {
-        return MimeMessagePartsId.builder()
-            .headerBlobId(blobIdFactory.from(record.get(HEADER_BLOB_ID)))
-            .bodyBlobId(blobIdFactory.from(record.get(BODY_BLOB_ID)))
-            .build();
+        return postgresMailRepositoryContentDAO.retrieve(key, url);
     }
 
     @Override
     public void remove(MailKey key) {
-        removeReactive(key).block();
-    }
-
-    private Mono<Void> removeReactive(MailKey key) {
-        return getMimeMessagePartsId(key)
-            .flatMap(mimeMessagePartsId -> deleteMailMetadata(key)
-                .then(deleteMailBlob(mimeMessagePartsId)));
-    }
-
-    private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key) {
-        return postgresExecutor.executeRow(context -> 
Mono.from(context.select(HEADER_BLOB_ID, BODY_BLOB_ID)
-                .from(TABLE_NAME)
-                .where(URL.eq(url.asString()))
-                .and(KEY.eq(key.asString()))))
-            .map(this::toMimeMessagePartsId);
-    }
-
-    private Mono<Void> deleteMailMetadata(MailKey key) {
-        return postgresExecutor.executeVoid(context -> 
Mono.from(context.deleteFrom(TABLE_NAME)
-            .where(URL.eq(url.asString()))
-            .and(KEY.eq(key.asString()))));
-    }
-
-    private Mono<Void> deleteMailBlob(MimeMessagePartsId mimeMessagePartsId) {
-        return Mono.from(mimeMessageStore.delete(mimeMessagePartsId));
+        postgresMailRepositoryContentDAO.remove(key, url);
     }
 
     @Override
     public void remove(Collection<MailKey> keys) {
-        Flux.fromIterable(keys)
-            .concatMap(this::removeReactive)
-            .then()
-            .block();
+        postgresMailRepositoryContentDAO.remove(keys, url);
     }
 
     @Override
     public void removeAll() {
-        listMailKeys()
-            .flatMap(this::removeReactive, DEFAULT_CONCURRENCY)
-            .then()
-            .block();
+        postgresMailRepositoryContentDAO.removeAll(url);
     }
 }
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
similarity index 62%
copy from 
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
copy to 
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
index 8d8391e209..bd5a39f8f3 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/JamesPostgresConnectionFactory.java
+++ 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSource.java
@@ -17,21 +17,25 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.backends.postgres.utils;
+package org.apache.james.mailrepository.postgres;
 
-import java.util.Optional;
+import javax.inject.Inject;
 
-import org.apache.james.core.Domain;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobReferenceSource;
 
-import io.r2dbc.spi.Connection;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
 
-public interface JamesPostgresConnectionFactory {
-    String DOMAIN_ATTRIBUTE = "app.current_domain";
+public class PostgresMailRepositoryBlobReferenceSource implements 
BlobReferenceSource {
+    private final PostgresMailRepositoryContentDAO 
postgresMailRepositoryContentDAO;
 
-    default Mono<Connection> getConnection(Domain domain) {
-        return getConnection(Optional.ofNullable(domain));
+    @Inject
+    public 
PostgresMailRepositoryBlobReferenceSource(PostgresMailRepositoryContentDAO 
postgresMailRepositoryContentDAO) {
+        this.postgresMailRepositoryContentDAO = 
postgresMailRepositoryContentDAO;
     }
 
-    Mono<Connection> getConnection(Optional<Domain> domain);
+    @Override
+    public Flux<BlobId> listReferencedBlobs() {
+        return postgresMailRepositoryContentDAO.listBlobs();
+    }
 }
diff --git 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
similarity index 87%
copy from 
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
copy to 
server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
index 241fb21536..2a52d4cb60 100644
--- 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepository.java
+++ 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryContentDAO.java
@@ -63,7 +63,6 @@ import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.MaybeSender;
 import org.apache.james.mailrepository.api.MailKey;
-import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.server.core.MimeMessageWrapper;
@@ -86,44 +85,38 @@ import com.google.common.collect.Multimap;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-public class PostgresMailRepository implements MailRepository {
+public class PostgresMailRepositoryContentDAO {
     private static final String HEADERS_SEPARATOR = ";  ";
 
     private final PostgresExecutor postgresExecutor;
-    private final MailRepositoryUrl url;
     private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final BlobId.Factory blobIdFactory;
 
     @Inject
-    public PostgresMailRepository(PostgresExecutor postgresExecutor,
-                                  MailRepositoryUrl url,
-                                  MimeMessageStore.Factory 
mimeMessageStoreFactory,
-                                  BlobId.Factory blobIdFactory) {
+    public PostgresMailRepositoryContentDAO(PostgresExecutor postgresExecutor,
+                                            MimeMessageStore.Factory 
mimeMessageStoreFactory,
+                                            BlobId.Factory blobIdFactory) {
         this.postgresExecutor = postgresExecutor;
-        this.url = url;
         this.mimeMessageStore = mimeMessageStoreFactory.mimeMessageStore();
         this.blobIdFactory = blobIdFactory;
     }
 
-    @Override
-    public long size() throws MessagingException {
-        return sizeReactive().block();
+    public long size(MailRepositoryUrl url) throws MessagingException {
+        return sizeReactive(url).block();
     }
 
-    @Override
-    public Mono<Long> sizeReactive() {
+    public Mono<Long> sizeReactive(MailRepositoryUrl url) {
         return postgresExecutor.executeCount(context -> 
Mono.from(context.selectCount()
                 .from(TABLE_NAME)
                 .where(URL.eq(url.asString()))))
             .map(Integer::longValue);
     }
 
-    @Override
-    public MailKey store(Mail mail) throws MessagingException {
+    public MailKey store(Mail mail, MailRepositoryUrl url) throws 
MessagingException {
         MailKey mailKey = MailKey.forMail(mail);
 
         return storeMailBlob(mail)
-            .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey, 
mimeMessagePartsId)
+            .flatMap(mimeMessagePartsId -> storeMailMetadata(mail, mailKey, 
mimeMessagePartsId, url)
                 .doOnSuccess(auditTrailStoredMail(mail))
                 
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> 
Mono.from(mimeMessageStore.delete(mimeMessagePartsId))
                     .thenReturn(mailKey)))
@@ -134,7 +127,7 @@ public class PostgresMailRepository implements 
MailRepository {
         return mimeMessageStore.save(mail.getMessage());
     }
 
-    private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey, 
MimeMessagePartsId mimeMessagePartsId) {
+    private Mono<MailKey> storeMailMetadata(Mail mail, MailKey mailKey, 
MimeMessagePartsId mimeMessagePartsId, MailRepositoryUrl url) {
         return postgresExecutor.executeVoid(context -> 
Mono.from(context.insertInto(TABLE_NAME)
                 .set(URL, url.asString())
                 .set(KEY, mailKey.asString())
@@ -210,22 +203,20 @@ public class PostgresMailRepository implements 
MailRepository {
             .collect(ImmutableMap.toImmutableMap(Pair::getLeft, 
Pair::getRight)));
     }
 
-    @Override
-    public Iterator<MailKey> list() throws MessagingException {
-        return listMailKeys()
+    public Iterator<MailKey> list(MailRepositoryUrl url) throws 
MessagingException {
+        return listMailKeys(url)
             .toStream()
             .iterator();
     }
 
-    private Flux<MailKey> listMailKeys() {
+    private Flux<MailKey> listMailKeys(MailRepositoryUrl url) {
         return postgresExecutor.executeRows(context -> 
Flux.from(context.select(KEY)
                 .from(TABLE_NAME)
                 .where(URL.eq(url.asString()))))
             .map(record -> new MailKey(record.get(KEY)));
     }
 
-    @Override
-    public Mail retrieve(MailKey key) {
+    public Mail retrieve(MailKey key, MailRepositoryUrl url) {
         return postgresExecutor.executeRow(context -> 
Mono.from(context.select()
                 .from(TABLE_NAME)
                 .where(URL.eq(url.asString()))
@@ -247,8 +238,7 @@ public class PostgresMailRepository implements 
MailRepository {
 
         PerRecipientHeaders perRecipientHeaders = 
getPerRecipientHeaders(record);
 
-        List<Attribute> attributes = Hstore.hstore(record.get(ATTRIBUTES, 
LinkedHashMap.class))
-            .data()
+        List<Attribute> attributes = ((LinkedHashMap<String, String>) 
record.get(ATTRIBUTES, LinkedHashMap.class))
             .entrySet()
             .stream()
             .map(Throwing.function(entry -> new 
Attribute(AttributeName.of(entry.getKey()),
@@ -280,8 +270,7 @@ public class PostgresMailRepository implements 
MailRepository {
     private PerRecipientHeaders getPerRecipientHeaders(Record record) {
         PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
 
-        Hstore.hstore(record.get(PER_RECIPIENT_SPECIFIC_HEADERS, 
LinkedHashMap.class))
-            .data()
+        ((LinkedHashMap<String, String>) 
record.get(PER_RECIPIENT_SPECIFIC_HEADERS, LinkedHashMap.class))
             .entrySet()
             .stream()
             .flatMap(this::recipientToHeaderStream)
@@ -299,7 +288,7 @@ public class PostgresMailRepository implements 
MailRepository {
         return headers
             .stream()
             .map(headerAsString -> Pair.of(
-                    asMailAddress(recipientToHeadersString.getKey()),
+                asMailAddress(recipientToHeadersString.getKey()),
                 PerRecipientHeaders.Header.fromString(headerAsString)));
     }
 
@@ -315,18 +304,17 @@ public class PostgresMailRepository implements 
MailRepository {
             .build();
     }
 
-    @Override
-    public void remove(MailKey key) {
-        removeReactive(key).block();
+    public void remove(MailKey key, MailRepositoryUrl url) {
+        removeReactive(key, url).block();
     }
 
-    private Mono<Void> removeReactive(MailKey key) {
-        return getMimeMessagePartsId(key)
-            .flatMap(mimeMessagePartsId -> deleteMailMetadata(key)
+    private Mono<Void> removeReactive(MailKey key, MailRepositoryUrl url) {
+        return getMimeMessagePartsId(key, url)
+            .flatMap(mimeMessagePartsId -> deleteMailMetadata(key, url)
                 .then(deleteMailBlob(mimeMessagePartsId)));
     }
 
-    private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key) {
+    private Mono<MimeMessagePartsId> getMimeMessagePartsId(MailKey key, 
MailRepositoryUrl url) {
         return postgresExecutor.executeRow(context -> 
Mono.from(context.select(HEADER_BLOB_ID, BODY_BLOB_ID)
                 .from(TABLE_NAME)
                 .where(URL.eq(url.asString()))
@@ -334,7 +322,7 @@ public class PostgresMailRepository implements 
MailRepository {
             .map(this::toMimeMessagePartsId);
     }
 
-    private Mono<Void> deleteMailMetadata(MailKey key) {
+    private Mono<Void> deleteMailMetadata(MailKey key, MailRepositoryUrl url) {
         return postgresExecutor.executeVoid(context -> 
Mono.from(context.deleteFrom(TABLE_NAME)
             .where(URL.eq(url.asString()))
             .and(KEY.eq(key.asString()))));
@@ -344,19 +332,23 @@ public class PostgresMailRepository implements 
MailRepository {
         return Mono.from(mimeMessageStore.delete(mimeMessagePartsId));
     }
 
-    @Override
-    public void remove(Collection<MailKey> keys) {
+    public void remove(Collection<MailKey> keys, MailRepositoryUrl url) {
         Flux.fromIterable(keys)
-            .concatMap(this::removeReactive)
+            .concatMap(mailKey -> removeReactive(mailKey, url))
             .then()
             .block();
     }
 
-    @Override
-    public void removeAll() {
-        listMailKeys()
-            .flatMap(this::removeReactive, DEFAULT_CONCURRENCY)
+    public void removeAll(MailRepositoryUrl url) {
+        listMailKeys(url)
+            .flatMap(mailKey -> removeReactive(mailKey, url), 
DEFAULT_CONCURRENCY)
             .then()
             .block();
     }
+
+    public Flux<BlobId> listBlobs() {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(HEADER_BLOB_ID, BODY_BLOB_ID)
+                .from(TABLE_NAME)))
+            .flatMapIterable(record -> 
ImmutableList.of(blobIdFactory.from(record.get(HEADER_BLOB_ID)), 
blobIdFactory.from(record.get(BODY_BLOB_ID))));
+    }
 }
diff --git 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
index d947775d9b..5b85e7b043 100644
--- 
a/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
+++ 
b/server/data/data-postgres/src/main/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryFactory.java
@@ -47,6 +47,6 @@ public class PostgresMailRepositoryFactory implements 
MailRepositoryFactory {
 
     @Override
     public MailRepository create(MailRepositoryUrl url) {
-        return new PostgresMailRepository(executor, url, 
mimeMessageStoreFactory, blobIdFactory);
+        return new PostgresMailRepository(url, new 
PostgresMailRepositoryContentDAO(executor, mimeMessageStoreFactory, 
blobIdFactory));
     }
 }
diff --git 
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
 
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
new file mode 100644
index 0000000000..93b6fa513a
--- /dev/null
+++ 
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryBlobReferenceSourceTest.java
@@ -0,0 +1,94 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.postgres;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import javax.mail.MessagingException;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.blob.memory.MemoryBlobStoreFactory;
+import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.mailrepository.api.MailKey;
+import org.apache.james.mailrepository.api.MailRepositoryPath;
+import org.apache.james.mailrepository.api.MailRepositoryUrl;
+import org.apache.james.mailrepository.api.Protocol;
+import org.apache.james.server.core.MailImpl;
+import org.apache.mailet.Attribute;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PostgresMailRepositoryBlobReferenceSourceTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresModule.aggregateModules(PostgresMailRepositoryModule.MODULE));
+
+    private static final MailRepositoryUrl URL = 
MailRepositoryUrl.fromPathAndProtocol(new Protocol("postgres"), 
MailRepositoryPath.from("testrepo"));
+
+    PostgresMailRepositoryContentDAO postgresMailRepositoryContentDAO;
+    PostgresMailRepositoryBlobReferenceSource 
postgresMailRepositoryBlobReferenceSource;
+
+    @BeforeEach
+    void beforeEach() {
+        BlobId.Factory factory = new HashBlobId.Factory();
+        BlobStore blobStore = MemoryBlobStoreFactory.builder()
+            .blobIdFactory(factory)
+            .defaultBucketName()
+            .passthrough();
+        postgresMailRepositoryContentDAO = new 
PostgresMailRepositoryContentDAO(postgresExtension.getPostgresExecutor(), 
MimeMessageStore.factory(blobStore), factory);
+        postgresMailRepositoryBlobReferenceSource = new 
PostgresMailRepositoryBlobReferenceSource(postgresMailRepositoryContentDAO);
+    }
+
+    @Test
+    void blobReferencesShouldBeEmptyByDefault() {
+        
assertThat(postgresMailRepositoryBlobReferenceSource.listReferencedBlobs().collectList().block())
+            .isEmpty();
+    }
+
+    @Test
+    void blobReferencesShouldReturnAllBlobs() throws Exception {
+        postgresMailRepositoryContentDAO.store(createMail(new 
MailKey("mail1")), URL);
+        postgresMailRepositoryContentDAO.store(createMail(new 
MailKey("mail2")), URL);
+
+        
assertThat(postgresMailRepositoryBlobReferenceSource.listReferencedBlobs().collectList().block())
+            .hasSize(4);
+    }
+
+    private MailImpl createMail(MailKey key) throws MessagingException {
+        return MailImpl.builder()
+            .name(key.asString())
+            .sender("sender@localhost")
+            .addRecipient("[email protected]")
+            .addRecipient("[email protected]")
+            .addAttribute(Attribute.convertToAttribute("testAttribute", 
"testValue"))
+            .mimeMessage(MimeMessageBuilder
+                .mimeMessageBuilder()
+                .setSubject("test")
+                .setText("original body")
+                .build())
+            .build();
+    }
+
+}
diff --git 
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
 
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
index 71ba41f5de..35a17357d9 100644
--- 
a/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
+++ 
b/server/data/data-postgres/src/test/java/org/apache/james/mailrepository/postgres/PostgresMailRepositoryTest.java
@@ -58,6 +58,6 @@ public class PostgresMailRepositoryTest implements 
MailRepositoryContract {
             .blobIdFactory(BLOB_ID_FACTORY)
             .defaultBucketName()
             .passthrough();
-        return new 
PostgresMailRepository(postgresExtension.getPostgresExecutor(), url, 
MimeMessageStore.factory(blobStore), BLOB_ID_FACTORY);
+        return new PostgresMailRepository(url, new 
PostgresMailRepositoryContentDAO(postgresExtension.getPostgresExecutor(), 
MimeMessageStore.factory(blobStore), BLOB_ID_FACTORY));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to