This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 691fdaa94e JAMES-2586 Implement PostgresMailboxMessageDAO (#1812)
691fdaa94e is described below

commit 691fdaa94e2efbbcff603f3d92f083aa59ef69b4
Author: vttran <vtt...@linagora.com>
AuthorDate: Wed Nov 29 17:47:32 2023 +0700

    JAMES-2586 Implement PostgresMailboxMessageDAO (#1812)
---
 .../james/backends/postgres/PostgresCommons.java   |  70 ++++
 .../backends/postgres/utils/PostgresExecutor.java  |   7 +
 mailbox/postgres/pom.xml                           |  14 +
 .../postgres/PostgresMailboxAggregateModule.java   |   4 +-
 .../james/mailbox/postgres/PostgresMessageId.java  |  88 ++++
 .../postgres/mail/PostgresMessageMapper.java       | 448 +++++++++++++++++++++
 .../postgres/mail/PostgresMessageModule.java       | 164 ++++++++
 .../postgres/mail/dao/PostgresMailboxDAO.java      |  11 +
 .../mail/dao/PostgresMailboxMessageDAO.java        | 378 +++++++++++++++++
 .../mail/dao/PostgresMailboxMessageDAOUtils.java   | 186 +++++++++
 .../postgres/mail/dao/PostgresMessageDAO.java      |  91 +++++
 .../postgres/mail/JpaMessageMapperTest.java        | 156 -------
 .../postgres/mail/PostgresMapperProvider.java      | 135 +++++++
 ...oveTest.java => PostgresMessageMapperTest.java} |  28 +-
 ...eMoveTest.java => PostgresMessageMoveTest.java} |  21 +-
 .../store/mail/model/MessageMapperTest.java        |   2 +-
 16 files changed, 1620 insertions(+), 183 deletions(-)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
new file mode 100644
index 0000000000..ae4b8ebf5e
--- /dev/null
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.postgres;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Date;
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.jooq.DataType;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.jooq.impl.DSL;
+import org.jooq.impl.DefaultDataType;
+import org.jooq.impl.SQLDataType;
+import org.jooq.postgres.extensions.bindings.HstoreBinding;
+import org.jooq.postgres.extensions.types.Hstore;
+
+public class PostgresCommons {
+
+    public interface DataTypes {
+
+        // hstore
+        DataType<Hstore> HSTORE = 
DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new 
HstoreBinding());
+
+        // timestamp(6)
+        DataType<LocalDateTime> TIMESTAMP = SQLDataType.LOCALDATETIME(6);
+
+        // text[]
+        DataType<String[]> STRING_ARRAY = SQLDataType.CLOB.getArrayDataType();
+    }
+
+    public interface SimpleTableField {
+        Field<Object> of(Table<Record> table, Field<?> field);
+    }
+
+    public static final SimpleTableField TABLE_FIELD = (table, field) -> 
DSL.field(table.getName() + "." + field.getName());
+
+    public static final Function<Date, LocalDateTime> DATE_TO_LOCAL_DATE_TIME 
= date -> Optional.ofNullable(date)
+        .map(value -> LocalDateTime.ofInstant(value.toInstant(), 
ZoneOffset.UTC))
+        .orElse(null);
+    public static final Function<LocalDateTime, Date> 
LOCAL_DATE_TIME_DATE_FUNCTION = localDateTime -> 
Optional.ofNullable(localDateTime)
+        .map(value -> value.toInstant(ZoneOffset.UTC))
+        .map(Date::from)
+        .orElse(null);
+
+    public static final Function<Field<?>, Field<?>> UNNEST_FIELD = field -> 
DSL.function("unnest", field.getType().getComponentType(), field);
+
+    public static final int IN_CLAUSE_MAX_SIZE = 32;
+
+}
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index 7a6485108f..05a3556ad0 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -28,6 +28,7 @@ import javax.inject.Inject;
 import org.apache.james.core.Domain;
 import org.jooq.DSLContext;
 import org.jooq.Record;
+import org.jooq.Record1;
 import org.jooq.SQLDialect;
 import org.jooq.conf.Settings;
 import org.jooq.conf.StatementType;
@@ -96,6 +97,12 @@ public class PostgresExecutor {
             .flatMap(queryFunction);
     }
 
+    public Mono<Integer> executeCount(Function<DSLContext, 
Mono<Record1<Integer>>> queryFunction) {
+        return dslContext()
+            .flatMap(queryFunction)
+            .map(Record1::value1);
+    }
+
     public Mono<Connection> connection() {
         return connection;
     }
diff --git a/mailbox/postgres/pom.xml b/mailbox/postgres/pom.xml
index 690389fab5..1c63841273 100644
--- a/mailbox/postgres/pom.xml
+++ b/mailbox/postgres/pom.xml
@@ -83,6 +83,20 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>event-bus-api</artifactId>
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java
index db208dd975..9ec68fd6fd 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java
@@ -21,11 +21,13 @@ package org.apache.james.mailbox.postgres;
 
 import org.apache.james.backends.postgres.PostgresModule;
 import org.apache.james.mailbox.postgres.mail.PostgresMailboxModule;
+import org.apache.james.mailbox.postgres.mail.PostgresMessageModule;
 import org.apache.james.mailbox.postgres.user.PostgresSubscriptionModule;
 
 public interface PostgresMailboxAggregateModule {
 
     PostgresModule MODULE = PostgresModule.aggregateModules(
         PostgresMailboxModule.MODULE,
-        PostgresSubscriptionModule.MODULE);
+        PostgresSubscriptionModule.MODULE,
+        PostgresMessageModule.MODULE);
 }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java
new file mode 100644
index 0000000000..c4012f1999
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java
@@ -0,0 +1,88 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.james.mailbox.model.MessageId;
+
+import com.google.common.base.MoreObjects;
+
+public class PostgresMessageId implements MessageId {
+
+    public static class Factory implements MessageId.Factory {
+
+        @Override
+        public PostgresMessageId generate() {
+            return of(UUID.randomUUID());
+        }
+
+        public static PostgresMessageId of(UUID uuid) {
+            return new PostgresMessageId(uuid);
+        }
+
+        @Override
+        public PostgresMessageId fromString(String serialized) {
+            return of(UUID.fromString(serialized));
+        }
+    }
+
+    private final UUID uuid;
+
+    private PostgresMessageId(UUID uuid) {
+        this.uuid = uuid;
+    }
+
+    @Override
+    public String serialize() {
+        return uuid.toString();
+    }
+
+    public UUID asUuid() {
+        return uuid;
+    }
+
+    @Override
+    public boolean isSerializable() {
+        return true;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof PostgresMessageId) {
+            PostgresMessageId other = (PostgresMessageId) o;
+            return Objects.equals(uuid, other.uuid);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(uuid);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("uuid", uuid)
+            .toString();
+    }
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
new file mode 100644
index 0000000000..620d48df7c
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -0,0 +1,448 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Clock;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import javax.mail.Flags;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.mailbox.ApplicableFlagBuilder;
+import org.apache.james.mailbox.FlagsBuilder;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxCounters;
+import org.apache.james.mailbox.model.MessageMetaData;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
+import org.apache.james.mailbox.store.FlagsUpdateCalculator;
+import org.apache.james.mailbox.store.MailboxReactorUtils;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.streams.Limit;
+
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresMessageMapper implements MessageMapper {
+
+    private static final Function<MailboxMessage, ByteSource> 
MESSAGE_FULL_CONTENT_LOADER = (mailboxMessage) -> new ByteSource() {
+        @Override
+        public InputStream openStream() {
+            try {
+                return mailboxMessage.getFullContent();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public long size() {
+            return mailboxMessage.metaData().getSize();
+        }
+    };
+
+
+    private final PostgresMessageDAO messageDAO;
+    private final PostgresMailboxMessageDAO mailboxMessageDAO;
+    private final PostgresMailboxDAO mailboxDAO;
+    private final PostgresModSeqProvider modSeqProvider;
+    private final PostgresUidProvider uidProvider;
+    private final BlobStore blobStore;
+    private final Clock clock;
+    private final BlobId.Factory blobIdFactory;
+
+    public PostgresMessageMapper(PostgresExecutor postgresExecutor,
+                                 PostgresModSeqProvider modSeqProvider,
+                                 PostgresUidProvider uidProvider,
+                                 BlobStore blobStore,
+                                 Clock clock,
+                                 BlobId.Factory blobIdFactory) {
+        this.messageDAO = new PostgresMessageDAO(postgresExecutor);
+        this.mailboxMessageDAO = new 
PostgresMailboxMessageDAO(postgresExecutor);
+        this.mailboxDAO = new PostgresMailboxDAO(postgresExecutor);
+        this.modSeqProvider = modSeqProvider;
+        this.uidProvider = uidProvider;
+        this.blobStore = blobStore;
+        this.clock = clock;
+        this.blobIdFactory = blobIdFactory;
+    }
+
+
+    @Override
+    public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, 
MessageRange set, FetchType type, int limit) {
+        return findInMailboxReactive(mailbox, set, type, limit)
+            .toIterable()
+            .iterator();
+    }
+
+    @Override
+    public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(Mailbox 
mailbox, MessageRange set) {
+        return mailboxMessageDAO.findMessagesMetadata((PostgresMailboxId) 
mailbox.getMailboxId(), set);
+    }
+
+    @Override
+    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, 
MessageRange messageRange, FetchType fetchType, int limitAsInt) {
+        return Mono.just(messageRange)
+            .flatMapMany(range -> {
+                Limit limit = Limit.from(limitAsInt);
+                switch (messageRange.getType()) {
+                    case ALL:
+                        return 
mailboxMessageDAO.findMessagesByMailboxId((PostgresMailboxId) 
mailbox.getMailboxId(), limit);
+                    case FROM:
+                        return 
mailboxMessageDAO.findMessagesByMailboxIdAndAfterUID((PostgresMailboxId) 
mailbox.getMailboxId(), range.getUidFrom(), limit);
+                    case ONE:
+                        return 
mailboxMessageDAO.findMessageByMailboxIdAndUid((PostgresMailboxId) 
mailbox.getMailboxId(), range.getUidFrom())
+                            .flatMapMany(Flux::just);
+                    case RANGE:
+                        return 
mailboxMessageDAO.findMessagesByMailboxIdAndBetweenUIDs((PostgresMailboxId) 
mailbox.getMailboxId(), range.getUidFrom(), range.getUidTo(), limit);
+                    default:
+                        throw new RuntimeException("Unknown MessageRange range 
" + range.getType());
+                }
+            }).flatMap(messageBuilderAndBlobId -> {
+                SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndBlobId.getLeft();
+                String blobIdAsString = messageBuilderAndBlobId.getRight();
+                switch (fetchType) {
+                    case METADATA:
+                    case ATTACHMENTS_METADATA:
+                    case HEADERS:
+                        return Mono.just(messageBuilder.build());
+                    case FULL:
+                        return retrieveFullContent(blobIdAsString)
+                            .map(content -> 
messageBuilder.content(content).build());
+                    default:
+                        return Flux.error(new RuntimeException("Unknown 
FetchType " + fetchType));
+                }
+            });
+    }
+
+    private Mono<Content> retrieveFullContent(String blobIdString) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
blobIdFactory.from(blobIdString), SIZE_BASED))
+            .map(contentAsBytes -> new Content() {
+                @Override
+                public InputStream getInputStream() {
+                    return new ByteArrayInputStream(contentAsBytes);
+                }
+
+                @Override
+                public long size() {
+                    return contentAsBytes.length;
+                }
+            });
+    }
+
+    @Override
+    public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, 
MessageRange messageRange) {
+        return retrieveMessagesMarkedForDeletionReactive(mailbox, messageRange)
+            .collectList()
+            .block();
+    }
+
+    @Override
+    public Flux<MessageUid> retrieveMessagesMarkedForDeletionReactive(Mailbox 
mailbox, MessageRange messageRange) {
+        return Mono.just(messageRange)
+            .flatMapMany(range -> {
+                switch (messageRange.getType()) {
+                    case ALL:
+                        return 
mailboxMessageDAO.findDeletedMessagesByMailboxId((PostgresMailboxId) 
mailbox.getMailboxId());
+                    case FROM:
+                        return 
mailboxMessageDAO.findDeletedMessagesByMailboxIdAndAfterUID((PostgresMailboxId) 
mailbox.getMailboxId(), range.getUidFrom());
+                    case ONE:
+                        return 
mailboxMessageDAO.findDeletedMessageByMailboxIdAndUid((PostgresMailboxId) 
mailbox.getMailboxId(), range.getUidFrom())
+                            .flatMapMany(Flux::just);
+                    case RANGE:
+                        return 
mailboxMessageDAO.findDeletedMessagesByMailboxIdAndBetweenUIDs((PostgresMailboxId)
 mailbox.getMailboxId(), range.getUidFrom(), range.getUidTo());
+                    default:
+                        throw new RuntimeException("Unknown MessageRange type 
" + range.getType());
+                }
+            });
+    }
+
+    @Override
+    public long countMessagesInMailbox(Mailbox mailbox) {
+        return 
mailboxMessageDAO.countTotalMessagesByMailboxId((PostgresMailboxId) 
mailbox.getMailboxId())
+            .block();
+    }
+
+    @Override
+    public MailboxCounters getMailboxCounters(Mailbox mailbox) {
+        return getMailboxCountersReactive(mailbox).block();
+    }
+
+    @Override
+    public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) {
+        return 
mailboxMessageDAO.countTotalMessagesByMailboxId((PostgresMailboxId) 
mailbox.getMailboxId())
+            .flatMap(totalMessage -> 
mailboxMessageDAO.countUnseenMessagesByMailboxId((PostgresMailboxId) 
mailbox.getMailboxId())
+                .map(unseenMessage -> MailboxCounters.builder()
+                    .mailboxId(mailbox.getMailboxId())
+                    .count(totalMessage)
+                    .unseen(unseenMessage)
+                    .build()));
+    }
+
+    @Override
+    public void delete(Mailbox mailbox, MailboxMessage message) throws 
MailboxException {
+        deleteMessages(mailbox, List.of(message.getUid()));
+    }
+
+    @Override
+    public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, 
List<MessageUid> uids) {
+        return deleteMessagesReactive(mailbox, uids).block();
+    }
+
+    @Override
+    public Mono<Map<MessageUid, MessageMetaData>> 
deleteMessagesReactive(Mailbox mailbox, List<MessageUid> uids) {
+        return 
mailboxMessageDAO.findMessagesByMailboxIdAndUIDs((PostgresMailboxId) 
mailbox.getMailboxId(), uids)
+            .map(SimpleMailboxMessage.Builder::build)
+            .collectMap(MailboxMessage::getUid, MailboxMessage::metaData)
+            .flatMap(map -> 
mailboxMessageDAO.deleteByMailboxIdAndMessageUids((PostgresMailboxId) 
mailbox.getMailboxId(), uids)
+                .then(Mono.just(map)));
+    }
+
+    @Override
+    public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) {
+        return mailboxMessageDAO.findFirstUnseenMessageUid((PostgresMailboxId) 
mailbox.getMailboxId()).block();
+    }
+
+    @Override
+    public Mono<Optional<MessageUid>> 
findFirstUnseenMessageUidReactive(Mailbox mailbox) {
+        return mailboxMessageDAO.findFirstUnseenMessageUid((PostgresMailboxId) 
mailbox.getMailboxId())
+            .map(Optional::of)
+            .switchIfEmpty(Mono.just(Optional.empty()));
+    }
+
+    @Override
+    public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) {
+        return findRecentMessageUidsInMailboxReactive(mailbox).block();
+    }
+
+    @Override
+    public Mono<List<MessageUid>> 
findRecentMessageUidsInMailboxReactive(Mailbox mailbox) {
+        return mailboxMessageDAO.findAllRecentMessageUid((PostgresMailboxId) 
mailbox.getMailboxId())
+            .collectList();
+    }
+
+    @Override
+    public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws 
MailboxException {
+        return addReactive(mailbox, message).block();
+    }
+
+    @Override
+    public Mono<MessageMetaData> addReactive(Mailbox mailbox, MailboxMessage 
message) {
+        return Mono.fromCallable(() -> {
+                message.setSaveDate(Date.from(clock.instant()));
+                return message;
+            })
+            .flatMap(this::setNewUidAndModSeq)
+            .then(saveFullContent(message)
+                .flatMap(blobId -> messageDAO.insert(message, 
blobId.asString())))
+            .then(Mono.defer(() -> mailboxMessageDAO.insert(message)))
+            .then(Mono.fromCallable(message::metaData));
+    }
+
+    private Mono<BlobId> saveFullContent(MailboxMessage message) {
+        return Mono.fromCallable(() -> 
MESSAGE_FULL_CONTENT_LOADER.apply(message))
+            .flatMap(bodyByteSource -> 
Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, 
LOW_COST)));
+    }
+
+    @Override
+    public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, 
FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) {
+        return updateFlagsPublisher(mailbox, flagsUpdateCalculator, range)
+            .toIterable()
+            .iterator();
+    }
+
+    @Override
+    public Mono<List<UpdatedFlags>> updateFlagsReactive(Mailbox mailbox, 
FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) {
+        return updateFlagsPublisher(mailbox, flagsUpdateCalculator, range)
+            .collectList();
+    }
+
+    private Flux<UpdatedFlags> updateFlagsPublisher(Mailbox mailbox, 
FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) {
+        return mailboxMessageDAO.findMessagesMetadata((PostgresMailboxId) 
mailbox.getMailboxId(), range)
+            .flatMap(currentMetaData -> 
modSeqProvider.nextModSeqReactive(mailbox.getMailboxId())
+                .flatMap(newModSeq -> updateFlags(currentMetaData, 
flagsUpdateCalculator, newModSeq)));
+    }
+
+    private Mono<UpdatedFlags> updateFlags(ComposedMessageIdWithMetaData 
currentMetaData,
+                                           FlagsUpdateCalculator 
flagsUpdateCalculator,
+                                           ModSeq newModSeq) {
+        Flags oldFlags = currentMetaData.getFlags();
+        Flags newFlags = flagsUpdateCalculator.buildNewFlags(oldFlags);
+
+        ComposedMessageId composedMessageId = 
currentMetaData.getComposedMessageId();
+
+        return Mono.just(UpdatedFlags.builder()
+                .messageId(composedMessageId.getMessageId())
+                .oldFlags(oldFlags)
+                .newFlags(newFlags)
+                .uid(composedMessageId.getUid()))
+            .flatMap(builder -> {
+                if (oldFlags.equals(newFlags)) {
+                    return 
Mono.just(builder.modSeq(currentMetaData.getModSeq())
+                        .build());
+                }
+                return Mono.fromCallable(() -> 
builder.modSeq(newModSeq).build())
+                    .flatMap(updatedFlags -> 
mailboxMessageDAO.updateFlag((PostgresMailboxId) 
composedMessageId.getMailboxId(), composedMessageId.getUid(), updatedFlags)
+                        .thenReturn(updatedFlags));
+            });
+    }
+
+    @Override
+    public List<UpdatedFlags> resetRecent(Mailbox mailbox) {
+        return resetRecentReactive(mailbox).block();
+    }
+
+    @Override
+    public Mono<List<UpdatedFlags>> resetRecentReactive(Mailbox mailbox) {
+        return 
mailboxMessageDAO.findAllRecentMessageMetadata((PostgresMailboxId) 
mailbox.getMailboxId())
+            .collectList()
+            .flatMapMany(mailboxMessageList -> 
resetRecentFlag((PostgresMailboxId) mailbox.getMailboxId(), mailboxMessageList))
+            .collectList();
+    }
+
+    private Flux<UpdatedFlags> resetRecentFlag(PostgresMailboxId mailboxId, 
List<ComposedMessageIdWithMetaData> messageIdWithMetaDataList) {
+        return Flux.fromIterable(messageIdWithMetaDataList)
+            .collectMap(m -> m.getComposedMessageId().getUid(), 
Function.identity())
+            .flatMapMany(uidMapping -> 
modSeqProvider.nextModSeqReactive(mailboxId)
+                .flatMapMany(newModSeq -> 
mailboxMessageDAO.resetRecentFlag(mailboxId, List.copyOf(uidMapping.keySet()), 
newModSeq))
+                .map(newMetaData -> UpdatedFlags.builder()
+                    .messageId(newMetaData.getMessageId())
+                    .modSeq(newMetaData.getModSeq())
+                    .oldFlags(uidMapping.get(newMetaData.getUid()).getFlags())
+                    .newFlags(newMetaData.getFlags())
+                    .uid(newMetaData.getUid())
+                    .build()));
+    }
+
+    @Override
+    public MessageMetaData copy(Mailbox mailbox, MailboxMessage original) 
throws MailboxException {
+        return copyReactive(mailbox, original).block();
+    }
+
+    private Mono<Void> setNewUidAndModSeq(MailboxMessage mailboxMessage) {
+        return 
mailboxDAO.incrementAndGetLastUidAndModSeq(mailboxMessage.getMailboxId())
+            .defaultIfEmpty(Pair.of(MessageUid.MIN_VALUE, ModSeq.first()))
+            .map(pair -> {
+                mailboxMessage.setUid(pair.getLeft());
+                mailboxMessage.setModSeq(pair.getRight());
+                return pair;
+            }).then();
+    }
+
+
+    @Override
+    public Mono<MessageMetaData> copyReactive(Mailbox mailbox, MailboxMessage 
original) {
+        return Mono.fromCallable(() -> {
+                MailboxMessage copiedMessage = original.copy(mailbox);
+                copiedMessage.setFlags(new 
FlagsBuilder().add(original.createFlags()).add(Flags.Flag.RECENT).build());
+                copiedMessage.setSaveDate(Date.from(clock.instant()));
+                return copiedMessage;
+            })
+            .flatMap(copiedMessage -> setNewUidAndModSeq(copiedMessage)
+                .then(Mono.defer(() -> mailboxMessageDAO.insert(copiedMessage))
+                    .thenReturn(copiedMessage))
+                .map(MailboxMessage::metaData));
+    }
+
+
+    @Override
+    public MessageMetaData move(Mailbox mailbox, MailboxMessage original) {
+        var t = moveReactive(mailbox, original).block();
+        return t;
+    }
+
+    @Override
+    public List<MessageMetaData> move(Mailbox mailbox, List<MailboxMessage> 
original) throws MailboxException {
+        return MailboxReactorUtils.block(moveReactive(mailbox, original));
+    }
+
+
+    @Override
+    public Mono<MessageMetaData> moveReactive(Mailbox mailbox, MailboxMessage 
original) {
+        return copyReactive(mailbox, original)
+            .flatMap(copiedResult -> 
mailboxMessageDAO.deleteByMailboxIdAndMessageUid((PostgresMailboxId) 
original.getMailboxId(), original.getUid())
+                .thenReturn(copiedResult));
+    }
+
+    @Override
+    public Optional<MessageUid> getLastUid(Mailbox mailbox) {
+        return uidProvider.lastUid(mailbox);
+    }
+
+    @Override
+    public Mono<Optional<MessageUid>> getLastUidReactive(Mailbox mailbox) {
+        return uidProvider.lastUidReactive(mailbox);
+    }
+
+    @Override
+    public ModSeq getHighestModSeq(Mailbox mailbox) {
+        return modSeqProvider.highestModSeq(mailbox);
+    }
+
+    @Override
+    public Mono<ModSeq> getHighestModSeqReactive(Mailbox mailbox) {
+        return modSeqProvider.highestModSeqReactive(mailbox);
+    }
+
+    @Override
+    public Flags getApplicableFlag(Mailbox mailbox) {
+        return getApplicableFlagReactive(mailbox).block();
+    }
+
+    @Override
+    public Mono<Flags> getApplicableFlagReactive(Mailbox mailbox) {
+        return mailboxMessageDAO.listDistinctUserFlags((PostgresMailboxId) 
mailbox.getMailboxId())
+            .map(flags -> ApplicableFlagBuilder.builder().add(flags).build());
+    }
+
+    @Override
+    public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
+        return mailboxMessageDAO.listAllMessageUid((PostgresMailboxId) 
mailbox.getMailboxId());
+    }
+
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java
new file mode 100644
index 0000000000..dd3cde8727
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java
@@ -0,0 +1,164 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.jooq.impl.DSL.foreignKey;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+import org.apache.james.backends.postgres.PostgresCommons.DataTypes;
+import org.apache.james.backends.postgres.PostgresIndex;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.backends.postgres.PostgresTable;
+import 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+import org.jooq.postgres.extensions.types.Hstore;
+
+public interface PostgresMessageModule {
+
+    Field<UUID> MESSAGE_ID = DSL.field("message_id", 
SQLDataType.UUID.notNull());
+    Field<LocalDateTime> INTERNAL_DATE = DSL.field("internal_date", 
DataTypes.TIMESTAMP);
+    Field<Long> SIZE = DSL.field("size", SQLDataType.BIGINT.notNull());
+
+    interface MessageTable {
+        Table<Record> TABLE_NAME = DSL.table("message");
+        Field<UUID> MESSAGE_ID = PostgresMessageModule.MESSAGE_ID;
+        Field<String> BLOB_ID = DSL.field("blob_id", 
SQLDataType.VARCHAR(200).notNull());
+        Field<String> MIME_TYPE = DSL.field("mime_type", 
SQLDataType.VARCHAR(200));
+        Field<String> MIME_SUBTYPE = DSL.field("mime_subtype", 
SQLDataType.VARCHAR(200));
+        Field<LocalDateTime> INTERNAL_DATE = 
PostgresMessageModule.INTERNAL_DATE;
+        Field<Long> SIZE = PostgresMessageModule.SIZE;
+        Field<Integer> BODY_START_OCTET = DSL.field("body_start_octet", 
SQLDataType.INTEGER.notNull());
+        Field<byte[]> HEADER_CONTENT = DSL.field("header_content", 
SQLDataType.BLOB.notNull());
+        Field<Integer> TEXTUAL_LINE_COUNT = DSL.field("textual_line_count", 
SQLDataType.INTEGER);
+
+        Field<String> CONTENT_DESCRIPTION = DSL.field("content_description", 
SQLDataType.VARCHAR(200));
+        Field<String> CONTENT_LOCATION = DSL.field("content_location", 
SQLDataType.VARCHAR(200));
+        Field<String> CONTENT_TRANSFER_ENCODING = 
DSL.field("content_transfer_encoding", SQLDataType.VARCHAR(200));
+        Field<String> CONTENT_DISPOSITION_TYPE = 
DSL.field("content_disposition_type", SQLDataType.VARCHAR(200));
+        Field<String> CONTENT_ID = DSL.field("content_id", 
SQLDataType.VARCHAR(200));
+        Field<String> CONTENT_MD5 = DSL.field("content_md5", 
SQLDataType.VARCHAR(200));
+        Field<String[]> CONTENT_LANGUAGE = DSL.field("content_language", 
DataTypes.STRING_ARRAY);
+        Field<Hstore> CONTENT_TYPE_PARAMETERS = 
DSL.field("content_type_parameters", DataTypes.HSTORE);
+        Field<Hstore> CONTENT_DISPOSITION_PARAMETERS = 
DSL.field("content_disposition_parameters", DataTypes.HSTORE);
+
+        PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
+            .createTableStep(((dsl, tableName) -> 
dsl.createTableIfNotExists(tableName)
+                .column(MESSAGE_ID)
+                .column(BLOB_ID)
+                .column(MIME_TYPE)
+                .column(MIME_SUBTYPE)
+                .column(INTERNAL_DATE)
+                .column(SIZE)
+                .column(BODY_START_OCTET)
+                .column(HEADER_CONTENT)
+                .column(TEXTUAL_LINE_COUNT)
+                .column(CONTENT_DESCRIPTION)
+                .column(CONTENT_LOCATION)
+                .column(CONTENT_TRANSFER_ENCODING)
+                .column(CONTENT_DISPOSITION_TYPE)
+                .column(CONTENT_ID)
+                .column(CONTENT_MD5)
+                .column(CONTENT_LANGUAGE)
+                .column(CONTENT_TYPE_PARAMETERS)
+                .column(CONTENT_DISPOSITION_PARAMETERS)
+                .constraint(DSL.primaryKey(MESSAGE_ID))
+                .comment("Holds the metadata of a mail")))
+            .supportsRowLevelSecurity();
+    }
+
+    interface MessageToMailboxTable {
+        Table<Record> TABLE_NAME = DSL.table("message_mailbox");
+        Field<UUID> MAILBOX_ID = DSL.field("mailbox_id", 
SQLDataType.UUID.notNull());
+        Field<Long> MESSAGE_UID = DSL.field("message_uid", 
SQLDataType.BIGINT.notNull());
+        Field<Long> MOD_SEQ = DSL.field("mod_seq", 
SQLDataType.BIGINT.notNull());
+        Field<UUID> MESSAGE_ID = PostgresMessageModule.MESSAGE_ID;
+        Field<UUID> THREAD_ID = DSL.field("thread_id", SQLDataType.UUID);
+        Field<LocalDateTime> INTERNAL_DATE = 
PostgresMessageModule.INTERNAL_DATE;
+        Field<Long> SIZE = PostgresMessageModule.SIZE;
+        Field<Boolean> IS_DELETED = DSL.field("is_deleted", 
SQLDataType.BOOLEAN.nullable(false)
+            .defaultValue(DSL.field("false", SQLDataType.BOOLEAN)));
+        Field<Boolean> IS_ANSWERED = DSL.field("is_answered", 
SQLDataType.BOOLEAN.nullable(false));
+        Field<Boolean> IS_DRAFT = DSL.field("is_draft", 
SQLDataType.BOOLEAN.nullable(false));
+        Field<Boolean> IS_FLAGGED = DSL.field("is_flagged", 
SQLDataType.BOOLEAN.nullable(false));
+        Field<Boolean> IS_RECENT = DSL.field("is_recent", 
SQLDataType.BOOLEAN.nullable(false));
+        Field<Boolean> IS_SEEN = DSL.field("is_seen", 
SQLDataType.BOOLEAN.nullable(false));
+        Field<String[]> USER_FLAGS = DSL.field("user_flags", 
DataTypes.STRING_ARRAY);
+        Field<LocalDateTime> SAVE_DATE = DSL.field("save_date", 
DataTypes.TIMESTAMP);
+
+
+        PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
+            .createTableStep(((dsl, tableName) -> 
dsl.createTableIfNotExists(tableName)
+                .column(MAILBOX_ID)
+                .column(MESSAGE_UID)
+                .column(MOD_SEQ)
+                .column(MESSAGE_ID)
+                .column(THREAD_ID)
+                .column(INTERNAL_DATE)
+                .column(SIZE)
+                .column(IS_DELETED)
+                .column(IS_ANSWERED)
+                .column(IS_DRAFT)
+                .column(IS_FLAGGED)
+                .column(IS_RECENT)
+                .column(IS_SEEN)
+                .column(USER_FLAGS)
+                .column(SAVE_DATE)
+                .constraints(DSL.primaryKey(MAILBOX_ID, MESSAGE_UID),
+                    
foreignKey(MAILBOX_ID).references(PostgresMailboxTable.TABLE_NAME, 
PostgresMailboxTable.MAILBOX_ID),
+                    foreignKey(MESSAGE_ID).references(MessageTable.TABLE_NAME, 
MessageTable.MESSAGE_ID))
+                .comment("Holds mailbox and flags for each message")))
+            .supportsRowLevelSecurity();
+
+        PostgresIndex MESSAGE_ID_INDEX = 
PostgresIndex.name("message_mailbox_message_id_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, MESSAGE_ID));
+
+        PostgresIndex MAILBOX_ID_MESSAGE_UID_INDEX = 
PostgresIndex.name("mailbox_id_mail_uid_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, MAILBOX_ID, MESSAGE_UID.asc()));
+        PostgresIndex MAILBOX_ID_IS_SEEN_MESSAGE_UID_INDEX = 
PostgresIndex.name("mailbox_id_is_seen_mail_uid_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, MAILBOX_ID, IS_SEEN, MESSAGE_UID.asc()));
+        PostgresIndex MAILBOX_ID_IS_RECENT_MESSAGE_UID_INDEX = 
PostgresIndex.name("mailbox_id_is_recent_mail_uid_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, MAILBOX_ID, IS_RECENT, MESSAGE_UID.asc()));
+        PostgresIndex MAILBOX_ID_IS_DELETE_MESSAGE_UID_INDEX = 
PostgresIndex.name("mailbox_id_is_delete_mail_uid_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, MAILBOX_ID, IS_DELETED, MESSAGE_UID.asc()));
+
+    }
+
+    PostgresModule MODULE = PostgresModule.builder()
+        .addTable(MessageTable.TABLE)
+        .addTable(MessageToMailboxTable.TABLE)
+        .addIndex(MessageToMailboxTable.MESSAGE_ID_INDEX)
+        .addIndex(MessageToMailboxTable.MAILBOX_ID_MESSAGE_UID_INDEX)
+        .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_SEEN_MESSAGE_UID_INDEX)
+        .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_RECENT_MESSAGE_UID_INDEX)
+        .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_DELETE_MESSAGE_UID_INDEX)
+        .build();
+
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index f820a2ce07..22ccdf9e04 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -39,6 +39,7 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MessageUid;
@@ -238,4 +239,14 @@ public class PostgresMailboxDAO {
             .map(record -> record.get(MAILBOX_HIGHEST_MODSEQ))
             .map(ModSeq::of);
     }
+
+    public Mono<Pair<MessageUid, ModSeq>> 
incrementAndGetLastUidAndModSeq(MailboxId mailboxId) {
+        int increment = 1;
+        return postgresExecutor.executeRow(dsl -> 
Mono.from(dsl.update(TABLE_NAME)
+                .set(MAILBOX_LAST_UID, coalesce(MAILBOX_LAST_UID, 
0L).add(increment))
+                .set(MAILBOX_HIGHEST_MODSEQ, coalesce(MAILBOX_HIGHEST_MODSEQ, 
0L).add(increment))
+                .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid()))
+                .returning(MAILBOX_LAST_UID, MAILBOX_HIGHEST_MODSEQ)))
+            .map(record -> 
Pair.of(MessageUid.of(record.get(MAILBOX_LAST_UID)), 
ModSeq.of(record.get(MAILBOX_HIGHEST_MODSEQ))));
+    }
 }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
new file mode 100644
index 0000000000..c55132b59f
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
@@ -0,0 +1,378 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail.dao;
+
+
+import static 
org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME;
+import static 
org.apache.james.backends.postgres.PostgresCommons.IN_CLAUSE_MAX_SIZE;
+import static org.apache.james.backends.postgres.PostgresCommons.TABLE_FIELD;
+import static org.apache.james.backends.postgres.PostgresCommons.UNNEST_FIELD;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BLOB_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DELETED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DRAFT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_FLAGGED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_RECENT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_SEEN;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MAILBOX_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_UID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MOD_SEQ;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.SAVE_DATE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.TABLE_NAME;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.THREAD_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.USER_FLAGS;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.BOOLEAN_FLAGS_MAPPING;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.MESSAGE_METADATA_FIELDS_REQUIRE;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_METADATA_FUNCTION;
+import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_UID_FUNCTION;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import javax.mail.Flags;
+import javax.mail.Flags.Flag;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageMetaData;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.streams.Limit;
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Record;
+import org.jooq.Record1;
+import org.jooq.SelectFinalStep;
+import org.jooq.SelectSeekStep1;
+import org.jooq.SortField;
+import org.jooq.TableOnConditionStep;
+import org.jooq.UpdateConditionStep;
+import org.jooq.UpdateSetStep;
+import org.jooq.impl.DSL;
+
+import com.google.common.collect.Iterables;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresMailboxMessageDAO {
+
+    private static final TableOnConditionStep<Record> 
MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP = 
TABLE_NAME.join(MessageTable.TABLE_NAME)
+        .on(TABLE_FIELD.of(TABLE_NAME, 
MESSAGE_ID).eq(TABLE_FIELD.of(MessageTable.TABLE_NAME, 
MessageTable.MESSAGE_ID)));
+
+    public static final SortField<Long> DEFAULT_SORT_ORDER_BY = 
MESSAGE_UID.asc();
+
+    private static SelectFinalStep<Record1<Long>> 
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, 
Condition extraCondition, Limit limit, DSLContext dslContext) {
+        SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = 
dslContext.select(MESSAGE_UID)
+            .from(TABLE_NAME)
+            .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+            .and(extraCondition)
+            .orderBy(MESSAGE_UID.asc());
+        return limit.getLimit().map(limitValue -> 
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
+            .orElse(queryWithoutLimit);
+    }
+
+    private final PostgresExecutor postgresExecutor;
+
+    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) {
+        this.postgresExecutor = postgresExecutor;
+    }
+
+    public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeRow(dslContext -> 
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
+                IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
+                IS_RECENT.eq(true), Limit.unlimited(), dslContext)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
+                DSL.noCondition(), Limit.unlimited(), dslContext)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Mono<MessageMetaData> 
deleteByMailboxIdAndMessageUid(PostgresMailboxId mailboxId, MessageUid 
messageUid) {
+        return postgresExecutor.executeRow(dslContext -> 
Mono.from(dslContext.deleteFrom(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(MESSAGE_UID.eq(messageUid.asLong()))
+                .returning(MESSAGE_METADATA_FIELDS_REQUIRE)))
+            .map(RECORD_TO_MESSAGE_METADATA_FUNCTION);
+    }
+
+    public Flux<MessageMetaData> 
deleteByMailboxIdAndMessageUids(PostgresMailboxId mailboxId, List<MessageUid> 
uids) {
+        Function<List<MessageUid>, Flux<MessageMetaData>> 
deletePublisherFunction = uidsToDelete -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.deleteFrom(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                
.and(MESSAGE_UID.in(uidsToDelete.stream().map(MessageUid::asLong).toArray(Long[]::new)))
+                .returning(MESSAGE_METADATA_FIELDS_REQUIRE)))
+            .map(RECORD_TO_MESSAGE_METADATA_FUNCTION);
+
+        if (uids.size() <= IN_CLAUSE_MAX_SIZE) {
+            return deletePublisherFunction.apply(uids);
+        } else {
+            return Flux.fromIterable(Iterables.partition(uids, 
IN_CLAUSE_MAX_SIZE))
+                .flatMap(deletePublisherFunction);
+        }
+    }
+
+    public Mono<Integer> countUnseenMessagesByMailboxId(PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeCount(dslContext -> 
Mono.from(dslContext.selectCount()
+            .from(TABLE_NAME)
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+            .and(IS_SEEN.eq(false))));
+    }
+
+    public Mono<Integer> countTotalMessagesByMailboxId(PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeCount(dslContext -> 
Mono.from(dslContext.selectCount()
+            .from(TABLE_NAME)
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))));
+    }
+
+    public Flux<Pair<SimpleMailboxMessage.Builder, String>> 
findMessagesByMailboxId(PostgresMailboxId mailboxId, Limit limit) {
+        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select()
+            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+            .orderBy(DEFAULT_SORT_ORDER_BY);
+
+        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
+                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
+                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
+            .map(record -> 
Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), 
record.get(BLOB_ID)));
+    }
+
+    public Flux<Pair<SimpleMailboxMessage.Builder, String>> 
findMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, MessageUid 
from, MessageUid to, Limit limit) {
+        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select()
+            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+            .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
+            .and(MESSAGE_UID.lessOrEqual(to.asLong()))
+            .orderBy(DEFAULT_SORT_ORDER_BY);
+
+        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
+                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
+                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
+            .map(record -> 
Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), 
record.get(BLOB_ID)));
+    }
+
+    public Mono<Pair<SimpleMailboxMessage.Builder, String>> 
findMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid uid) {
+        return postgresExecutor.executeRow(dslContext -> 
Mono.from(dslContext.select()
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(MESSAGE_UID.eq(uid.asLong()))))
+            .map(record -> 
Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), 
record.get(BLOB_ID)));
+    }
+
+    public Flux<Pair<SimpleMailboxMessage.Builder, String>> 
findMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, MessageUid 
from, Limit limit) {
+        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select()
+            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+            .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
+            .orderBy(DEFAULT_SORT_ORDER_BY);
+
+        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
+                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
+                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
+            .map(record -> 
Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), 
record.get(BLOB_ID)));
+    }
+
+    public Flux<SimpleMailboxMessage.Builder> 
findMessagesByMailboxIdAndUIDs(PostgresMailboxId mailboxId, List<MessageUid> 
uids) {
+        Function<List<MessageUid>, Flux<SimpleMailboxMessage.Builder>> 
queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select()
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                
.and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION);
+
+        if (uids.size() <= IN_CLAUSE_MAX_SIZE) {
+            return queryPublisherFunction.apply(uids);
+        } else {
+            return Flux.fromIterable(Iterables.partition(uids, 
IN_CLAUSE_MAX_SIZE))
+                .flatMap(queryPublisherFunction);
+        }
+    }
+
+    public Flux<MessageUid> findDeletedMessagesByMailboxId(PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(IS_DELETED.eq(true))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Flux<MessageUid> 
findDeletedMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, 
MessageUid from, MessageUid to) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(IS_DELETED.eq(true))
+                .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
+                .and(MESSAGE_UID.lessOrEqual(to.asLong()))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Flux<MessageUid> 
findDeletedMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, 
MessageUid from) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(IS_DELETED.eq(true))
+                .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Mono<MessageUid> 
findDeletedMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid 
uid) {
+        return postgresExecutor.executeRow(dslContext -> 
Mono.from(dslContext.select(MESSAGE_UID)
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(IS_DELETED.eq(true))
+                .and(MESSAGE_UID.eq(uid.asLong()))))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+    }
+
+    public Flux<ComposedMessageIdWithMetaData> 
findMessagesMetadata(PostgresMailboxId mailboxId, MessageRange range) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+    }
+
+    public Flux<ComposedMessageIdWithMetaData> 
findMessagesMetadata(PostgresMailboxId mailboxId, List<MessageUid> messageUids) 
{
+        Function<List<MessageUid>, Flux<ComposedMessageIdWithMetaData>> 
queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select()
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                
.and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+
+        if (messageUids.size() <= IN_CLAUSE_MAX_SIZE) {
+            return queryPublisherFunction.apply(messageUids);
+        } else {
+            return Flux.fromIterable(Iterables.partition(messageUids, 
IN_CLAUSE_MAX_SIZE))
+                .flatMap(queryPublisherFunction);
+        }
+    }
+
+    public Flux<ComposedMessageIdWithMetaData> 
findAllRecentMessageMetadata(PostgresMailboxId mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(IS_RECENT.eq(true))
+                .orderBy(DEFAULT_SORT_ORDER_BY)))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+    }
+
+    public Mono<Void> updateFlag(PostgresMailboxId mailboxId, MessageUid uid, 
UpdatedFlags updatedFlags) {
+        return postgresExecutor.executeVoid(dslContext ->
+            Mono.from(buildUpdateFlagStatement(dslContext, updatedFlags, 
mailboxId, uid)));
+    }
+
+    public Mono<Flags> listDistinctUserFlags(PostgresMailboxId mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.selectDistinct(UNNEST_FIELD.apply(USER_FLAGS))
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))))
+            .map(record -> record.get(0, String.class))
+            .collectList()
+            .map(flagList -> {
+                Flags flags = new Flags();
+                flagList.forEach(flags::add);
+                return flags;
+            });
+    }
+
+    private UpdateConditionStep<Record> buildUpdateFlagStatement(DSLContext 
dslContext, UpdatedFlags updatedFlags,
+                                                                 
PostgresMailboxId mailboxId, MessageUid uid) {
+        AtomicReference<UpdateSetStep<Record>> updateStatement = new 
AtomicReference<>(dslContext.update(TABLE_NAME));
+
+        BOOLEAN_FLAGS_MAPPING.forEach((flagColumn, flagMapped) -> {
+            if (updatedFlags.isChanged(flagMapped)) {
+                updateStatement.getAndUpdate(currentStatement -> {
+                    if (flagMapped.equals(Flag.RECENT)) {
+                        return currentStatement.set(flagColumn, 
updatedFlags.getNewFlags().contains(Flag.RECENT));
+                    }
+                    return currentStatement.set(flagColumn, 
updatedFlags.isModifiedToSet(flagMapped));
+                });
+            }
+        });
+
+        return updateStatement.get()
+            .set(USER_FLAGS, updatedFlags.getNewFlags().getUserFlags())
+            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+            .and(MESSAGE_UID.eq(uid.asLong()));
+    }
+
+    public Flux<MessageMetaData> resetRecentFlag(PostgresMailboxId mailboxId, 
List<MessageUid> uids, ModSeq newModSeq) {
+        Function<List<MessageUid>, Flux<MessageMetaData>> 
queryPublisherFunction = uidsMatching -> 
postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.update(TABLE_NAME)
+                .set(IS_RECENT, false)
+                .set(MOD_SEQ, newModSeq.asLong())
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                
.and(MESSAGE_UID.in(uidsMatching.stream().map(MessageUid::asLong).toArray(Long[]::new)))
+                .and(MOD_SEQ.notEqual(newModSeq.asLong()))
+                .returning(MESSAGE_METADATA_FIELDS_REQUIRE)))
+            .map(RECORD_TO_MESSAGE_METADATA_FUNCTION);
+        if (uids.size() <= IN_CLAUSE_MAX_SIZE) {
+            return queryPublisherFunction.apply(uids);
+        } else {
+            return Flux.fromIterable(Iterables.partition(uids, 
IN_CLAUSE_MAX_SIZE))
+                .flatMap(queryPublisherFunction);
+        }
+    }
+
+    public Mono<Void> insert(MailboxMessage mailboxMessage) {
+        return postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.insertInto(TABLE_NAME)
+            .set(MAILBOX_ID, ((PostgresMailboxId) 
mailboxMessage.getMailboxId()).asUuid())
+            .set(MESSAGE_UID, mailboxMessage.getUid().asLong())
+            .set(MOD_SEQ, mailboxMessage.getModSeq().asLong())
+            .set(MESSAGE_ID, ((PostgresMessageId) 
mailboxMessage.getMessageId()).asUuid())
+            .set(THREAD_ID, ((PostgresMessageId) 
mailboxMessage.getThreadId().getBaseMessageId()).asUuid())
+            .set(INTERNAL_DATE, 
DATE_TO_LOCAL_DATE_TIME.apply(mailboxMessage.getInternalDate()))
+            .set(SIZE, mailboxMessage.getFullContentOctets())
+            .set(IS_DELETED, mailboxMessage.isDeleted())
+            .set(IS_ANSWERED, mailboxMessage.isAnswered())
+            .set(IS_DRAFT, mailboxMessage.isDraft())
+            .set(IS_FLAGGED, mailboxMessage.isFlagged())
+            .set(IS_RECENT, mailboxMessage.isRecent())
+            .set(IS_SEEN, mailboxMessage.isSeen())
+            .set(USER_FLAGS, mailboxMessage.createFlags().getUserFlags())
+            .set(SAVE_DATE, 
mailboxMessage.getSaveDate().map(DATE_TO_LOCAL_DATE_TIME).orElse(null))));
+    }
+
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java
new file mode 100644
index 0000000000..1d832e20c5
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java
@@ -0,0 +1,186 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail.dao;
+
+import static 
org.apache.james.backends.postgres.PostgresCommons.LOCAL_DATE_TIME_DATE_FUNCTION;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_START_OCTET;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_PARAMETERS;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LANGUAGE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LOCATION;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_MD5;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TRANSFER_ENCODING;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TYPE_PARAMETERS;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DELETED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DRAFT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_FLAGGED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_RECENT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_SEEN;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MAILBOX_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_UID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MOD_SEQ;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.SAVE_DATE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.THREAD_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.USER_FLAGS;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import javax.mail.Flags;
+
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.MessageMetaData;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.PostgresMessageModule;
+import org.apache.james.mailbox.store.mail.model.impl.Properties;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.jooq.Field;
+import org.jooq.Record;
+
+interface PostgresMailboxMessageDAOUtils {
+    Map<Field<Boolean>, Flags.Flag> BOOLEAN_FLAGS_MAPPING = Map.of(
+        IS_ANSWERED, Flags.Flag.ANSWERED,
+        IS_DELETED, Flags.Flag.DELETED,
+        IS_DRAFT, Flags.Flag.DRAFT,
+        IS_FLAGGED, Flags.Flag.FLAGGED,
+        IS_RECENT, Flags.Flag.RECENT,
+        IS_SEEN, Flags.Flag.SEEN);
+    Function<Record, MessageUid> RECORD_TO_MESSAGE_UID_FUNCTION = record -> 
MessageUid.of(record.get(MESSAGE_UID));
+    Function<Record, Flags> RECORD_TO_FLAGS_FUNCTION = record -> {
+        Flags flags = new Flags();
+        BOOLEAN_FLAGS_MAPPING.forEach((flagColumn, flagMapped) -> {
+            if (record.get(flagColumn)) {
+                flags.add(flagMapped);
+            }
+        });
+
+        Optional.ofNullable(record.get(USER_FLAGS)).stream()
+            .flatMap(Arrays::stream)
+            .forEach(flags::add);
+        return flags;
+    };
+
+    Function<Record, ThreadId> RECORD_TO_THREAD_ID_FUNCTION = record -> 
Optional.ofNullable(record.get(THREAD_ID))
+        .map(threadIdAsUuid -> 
ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdAsUuid)))
+        
.orElse(ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(record.get(MESSAGE_ID))));
+
+
+    Field<?>[] MESSAGE_METADATA_FIELDS_REQUIRE = new Field[] {
+        MESSAGE_UID,
+        MOD_SEQ,
+        SIZE,
+        INTERNAL_DATE,
+        SAVE_DATE,
+        MESSAGE_ID,
+        THREAD_ID,
+        IS_ANSWERED,
+        IS_DELETED,
+        IS_DRAFT,
+        IS_FLAGGED,
+        IS_RECENT,
+        IS_SEEN,
+        USER_FLAGS
+    };
+
+    Function<Record, MessageMetaData> RECORD_TO_MESSAGE_METADATA_FUNCTION = 
record ->
+        new MessageMetaData(MessageUid.of(record.get(MESSAGE_UID)),
+            ModSeq.of(record.get(MOD_SEQ)),
+            RECORD_TO_FLAGS_FUNCTION.apply(record),
+            record.get(SIZE),
+            LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(INTERNAL_DATE)),
+            
Optional.ofNullable(record.get(SAVE_DATE)).map(LOCAL_DATE_TIME_DATE_FUNCTION),
+            PostgresMessageId.Factory.of(record.get(MESSAGE_ID)),
+            RECORD_TO_THREAD_ID_FUNCTION.apply(record));
+
+    Function<Record, ComposedMessageIdWithMetaData> 
RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION = record -> 
ComposedMessageIdWithMetaData
+        .builder()
+        .composedMessageId(new 
ComposedMessageId(PostgresMailboxId.of(record.get(MAILBOX_ID)),
+            PostgresMessageId.Factory.of(record.get(MESSAGE_ID)),
+            MessageUid.of(record.get(MESSAGE_UID))))
+        .threadId(RECORD_TO_THREAD_ID_FUNCTION.apply(record))
+        .flags(RECORD_TO_FLAGS_FUNCTION.apply(record))
+        .modSeq(ModSeq.of(record.get(MOD_SEQ)))
+        .build();
+
+    Function<Record, Properties> RECORD_TO_PROPERTIES_FUNCTION = record -> {
+        PropertyBuilder property = new PropertyBuilder();
+
+        
property.setMediaType(record.get(PostgresMessageModule.MessageTable.MIME_TYPE));
+        
property.setSubType(record.get(PostgresMessageModule.MessageTable.MIME_SUBTYPE));
+        
property.setTextualLineCount(Optional.ofNullable(record.get(PostgresMessageModule.MessageTable.TEXTUAL_LINE_COUNT))
+            .map(Long::valueOf)
+            .orElse(null));
+
+        property.setContentID(record.get(CONTENT_ID));
+        property.setContentMD5(record.get(CONTENT_MD5));
+        
property.setContentTransferEncoding(record.get(CONTENT_TRANSFER_ENCODING));
+        property.setContentLocation(record.get(CONTENT_LOCATION));
+        
property.setContentLanguage(Optional.ofNullable(record.get(CONTENT_LANGUAGE)).map(List::of).orElse(null));
+        
property.setContentDispositionParameters(record.get(CONTENT_DISPOSITION_PARAMETERS,
 LinkedHashMap.class));
+        property.setContentTypeParameters(record.get(CONTENT_TYPE_PARAMETERS, 
LinkedHashMap.class));
+        return property.build();
+    };
+
+    Function<byte[], Content> BYTE_TO_CONTENT_FUNCTION = contentAsBytes -> new 
Content() {
+        @Override
+        public InputStream getInputStream() {
+            return new ByteArrayInputStream(contentAsBytes);
+        }
+
+        @Override
+        public long size() {
+            return contentAsBytes.length;
+        }
+    };
+
+    Function<Record, SimpleMailboxMessage.Builder> 
RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION = record -> 
SimpleMailboxMessage.builder()
+        .messageId(PostgresMessageId.Factory.of(record.get(MESSAGE_ID)))
+        .mailboxId(PostgresMailboxId.of(record.get(MAILBOX_ID)))
+        .uid(MessageUid.of(record.get(MESSAGE_UID)))
+        .threadId(RECORD_TO_THREAD_ID_FUNCTION.apply(record))
+        
.internalDate(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(PostgresMessageModule.MessageTable.INTERNAL_DATE,
 LocalDateTime.class)))
+        .saveDate(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(SAVE_DATE, 
LocalDateTime.class)))
+        .flags(RECORD_TO_FLAGS_FUNCTION.apply(record))
+        .size(record.get(PostgresMessageModule.MessageTable.SIZE))
+        .bodyStartOctet(record.get(BODY_START_OCTET))
+        .content(BYTE_TO_CONTENT_FUNCTION.apply(record.get(HEADER_CONTENT)))
+        .properties(RECORD_TO_PROPERTIES_FUNCTION.apply(record));
+
+
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
new file mode 100644
index 0000000000..5437307788
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail.dao;
+
+import static 
org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BLOB_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_START_OCTET;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DESCRIPTION;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_PARAMETERS;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_TYPE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LANGUAGE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LOCATION;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_MD5;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TRANSFER_ENCODING;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TYPE_PARAMETERS;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MESSAGE_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MIME_SUBTYPE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MIME_TYPE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.TABLE_NAME;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.TEXTUAL_LINE_COUNT;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.jooq.postgres.extensions.types.Hstore;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class PostgresMessageDAO {
+    public static final long DEFAULT_LONG_VALUE = 0L;
+    private final PostgresExecutor postgresExecutor;
+
+    public PostgresMessageDAO(PostgresExecutor postgresExecutor) {
+        this.postgresExecutor = postgresExecutor;
+    }
+
+    public Mono<Void> insert(MailboxMessage message, String blobId) {
+        return Mono.fromCallable(() -> 
IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets()))
+            .subscribeOn(Schedulers.boundedElastic())
+            .flatMap(headerContentAsByte -> 
postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.insertInto(TABLE_NAME)
+                .set(MESSAGE_ID, ((PostgresMessageId) 
message.getMessageId()).asUuid())
+                .set(BLOB_ID, blobId)
+                .set(MIME_TYPE, message.getMediaType())
+                .set(MIME_SUBTYPE, message.getSubType())
+                .set(INTERNAL_DATE, 
DATE_TO_LOCAL_DATE_TIME.apply(message.getInternalDate()))
+                .set(SIZE, message.getFullContentOctets())
+                .set(BODY_START_OCTET, (int) (message.getFullContentOctets() - 
message.getBodyOctets()))
+                .set(TEXTUAL_LINE_COUNT, 
Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE).intValue())
+                .set(CONTENT_DESCRIPTION, 
message.getProperties().getContentDescription())
+                .set(CONTENT_DISPOSITION_TYPE, 
message.getProperties().getContentDispositionType())
+                .set(CONTENT_ID, message.getProperties().getContentID())
+                .set(CONTENT_MD5, message.getProperties().getContentMD5())
+                .set(CONTENT_LANGUAGE, 
message.getProperties().getContentLanguage().toArray(new String[0]))
+                .set(CONTENT_LOCATION, 
message.getProperties().getContentLocation())
+                .set(CONTENT_TRANSFER_ENCODING, 
message.getProperties().getContentTransferEncoding())
+                .set(CONTENT_TYPE_PARAMETERS, 
Hstore.hstore(message.getProperties().getContentTypeParameters()))
+                .set(CONTENT_DISPOSITION_PARAMETERS, 
Hstore.hstore(message.getProperties().getContentDispositionParameters()))
+                .set(HEADER_CONTENT, headerContentAsByte))));
+    }
+
+    public Mono<Void> deleteByMessageId(PostgresMessageId messageId) {
+        return postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.deleteFrom(TABLE_NAME)
+            .where(MESSAGE_ID.eq(messageId.asUuid()))));
+    }
+
+}
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java
deleted file mode 100644
index 5041b74302..0000000000
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.postgres.mail;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Optional;
-
-import javax.mail.Flags;
-
-import org.apache.james.backends.jpa.JpaTestCluster;
-import org.apache.james.mailbox.FlagsBuilder;
-import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.ModSeq;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.postgres.JPAMailboxFixture;
-import org.apache.james.mailbox.model.UpdatedFlags;
-import org.apache.james.mailbox.store.FlagsUpdateCalculator;
-import org.apache.james.mailbox.store.mail.model.MapperProvider;
-import org.apache.james.mailbox.store.mail.model.MessageMapperTest;
-import org.apache.james.utils.UpdatableTickingClock;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
-
-class JpaMessageMapperTest extends MessageMapperTest {
-
-    static final JpaTestCluster JPA_TEST_CLUSTER = 
JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES);
-    
-    @Override
-    protected MapperProvider createMapperProvider() {
-        return new JPAMapperProvider(JPA_TEST_CLUSTER);
-    }
-
-    @Override
-    protected UpdatableTickingClock updatableTickingClock() {
-        return null;
-    }
-
-    @AfterEach
-    void cleanUp() {
-        JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES);
-    }
-
-    @Test
-    @Override
-    public void 
flagsAdditionShouldReturnAnUpdatedFlagHighlightingTheAddition() throws 
MailboxException {
-        saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new 
FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), 
MessageManager.FlagsUpdateMode.REPLACE));
-        ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-
-        // JPA does not support MessageId
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, 
message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), 
MessageManager.FlagsUpdateMode.ADD)))
-            .contains(UpdatedFlags.builder()
-                .uid(message1.getUid())
-                .modSeq(modSeq.next())
-                .oldFlags(new Flags(Flags.Flag.FLAGGED))
-                .newFlags(new FlagsBuilder().add(Flags.Flag.SEEN, 
Flags.Flag.FLAGGED).build())
-                .build());
-    }
-
-    @Test
-    @Override
-    public void 
flagsReplacementShouldReturnAnUpdatedFlagHighlightingTheReplacement() throws 
MailboxException {
-        saveMessages();
-        ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-        Optional<UpdatedFlags> updatedFlags = 
messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(),
-            new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), 
MessageManager.FlagsUpdateMode.REPLACE));
-
-        // JPA does not support MessageId
-        assertThat(updatedFlags)
-            .contains(UpdatedFlags.builder()
-                .uid(message1.getUid())
-                .modSeq(modSeq.next())
-                .oldFlags(new Flags())
-                .newFlags(new Flags(Flags.Flag.FLAGGED))
-                .build());
-    }
-
-    @Test
-    @Override
-    public void flagsRemovalShouldReturnAnUpdatedFlagHighlightingTheRemoval() 
throws MailboxException {
-        saveMessages();
-        messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new 
FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, 
Flags.Flag.SEEN).build(), MessageManager.FlagsUpdateMode.REPLACE));
-        ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-
-        // JPA does not support MessageId
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, 
message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), 
MessageManager.FlagsUpdateMode.REMOVE)))
-            .contains(
-                UpdatedFlags.builder()
-                    .uid(message1.getUid())
-                    .modSeq(modSeq.next())
-                    .oldFlags(new FlagsBuilder().add(Flags.Flag.SEEN, 
Flags.Flag.FLAGGED).build())
-                    .newFlags(new Flags(Flags.Flag.FLAGGED))
-                    .build());
-    }
-
-    @Test
-    @Override
-    public void userFlagsUpdateShouldReturnCorrectUpdatedFlags() throws 
MailboxException {
-        saveMessages();
-        ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox);
-
-        // JPA does not support MessageId
-        assertThat(messageMapper.updateFlags(benwaInboxMailbox, 
message1.getUid(), new FlagsUpdateCalculator(new Flags(USER_FLAG), 
MessageManager.FlagsUpdateMode.ADD)))
-            .contains(
-                UpdatedFlags.builder()
-                    .uid(message1.getUid())
-                    .modSeq(modSeq.next())
-                    .oldFlags(new Flags())
-                    .newFlags(new Flags(USER_FLAG))
-                    .build());
-    }
-
-    @Test
-    @Override
-    public void userFlagsUpdateShouldReturnCorrectUpdatedFlagsWhenNoop() 
throws MailboxException {
-        saveMessages();
-
-        // JPA does not support MessageId
-        assertThat(
-            messageMapper.updateFlags(benwaInboxMailbox,message1.getUid(),
-                new FlagsUpdateCalculator(new Flags(USER_FLAG), 
MessageManager.FlagsUpdateMode.REMOVE)))
-            .contains(
-                UpdatedFlags.builder()
-                    .uid(message1.getUid())
-                    .modSeq(message1.getModSeq())
-                    .oldFlags(new Flags())
-                    .newFlags(new Flags())
-                    .build());
-    }
-
-    @Nested
-    @Disabled("JPA does not support saveDate.")
-    class SaveDateTests {
-
-    }
-}
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
new file mode 100644
index 0000000000..7258ba7a19
--- /dev/null
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
@@ -0,0 +1,135 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.store.mail.AttachmentMapper;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.mailbox.store.mail.MessageIdMapper;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MapperProvider;
+import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
+import org.apache.james.utils.UpdatableTickingClock;
+
+import com.google.common.collect.ImmutableList;
+
+public class PostgresMapperProvider implements MapperProvider {
+
+    private final PostgresMessageId.Factory messageIdFactory;
+    private final PostgresExtension postgresExtension;
+    private final UpdatableTickingClock updatableTickingClock;
+    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+
+    public PostgresMapperProvider(PostgresExtension postgresExtension) {
+        this.postgresExtension = postgresExtension;
+        this.updatableTickingClock = new UpdatableTickingClock(Instant.now());
+        this.messageIdFactory = new PostgresMessageId.Factory();
+        this.blobIdFactory = new HashBlobId.Factory();
+        this.blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), 
BucketName.DEFAULT, blobIdFactory);
+    }
+
+    @Override
+    public List<Capabilities> getSupportedCapabilities() {
+        return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, 
Capabilities.MESSAGE, Capabilities.MOVE, Capabilities.ATTACHMENT);
+    }
+
+    @Override
+    public MailboxMapper createMailboxMapper() {
+        return new PostgresMailboxMapper(new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor()));
+    }
+
+    @Override
+    public MessageMapper createMessageMapper() {
+        PostgresMailboxDAO mailboxDAO = new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor());
+
+        PostgresModSeqProvider modSeqProvider = new 
PostgresModSeqProvider(mailboxDAO);
+        PostgresUidProvider uidProvider = new PostgresUidProvider(mailboxDAO);
+
+        return new PostgresMessageMapper(
+            postgresExtension.getPostgresExecutor(),
+            modSeqProvider,
+            uidProvider,
+            blobStore,
+            updatableTickingClock,
+            blobIdFactory);
+    }
+
+    @Override
+    public MessageIdMapper createMessageIdMapper() {
+        throw new NotImplementedException("not implemented");
+    }
+
+    @Override
+    public AttachmentMapper createAttachmentMapper() {
+        throw new NotImplementedException("not implemented");
+    }
+
+    @Override
+    public MailboxId generateId() {
+        return PostgresMailboxId.generate();
+    }
+
+    @Override
+    public MessageUid generateMessageUid() {
+        throw new NotImplementedException("not implemented");
+    }
+
+    @Override
+    public ModSeq generateModSeq(Mailbox mailbox) {
+        throw new NotImplementedException("not implemented");
+    }
+
+    @Override
+    public ModSeq highestModSeq(Mailbox mailbox) {
+        throw new NotImplementedException("not implemented");
+    }
+
+    @Override
+    public boolean supportPartialAttachmentFetch() {
+        return false;
+    }
+
+    @Override
+    public MessageId generateMessageId() {
+        return messageIdFactory.generate();
+    }
+
+    public UpdatableTickingClock getUpdatableTickingClock() {
+        return updatableTickingClock;
+    }
+}
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java
similarity index 61%
copy from 
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
copy to 
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java
index a8499468f1..55a6864e88 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java
@@ -19,24 +19,28 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
-import org.apache.james.backends.jpa.JpaTestCluster;
-import org.apache.james.mailbox.postgres.JPAMailboxFixture;
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
 import org.apache.james.mailbox.store.mail.model.MapperProvider;
-import org.apache.james.mailbox.store.mail.model.MessageMoveTest;
-import org.junit.jupiter.api.AfterEach;
+import org.apache.james.mailbox.store.mail.model.MessageMapperTest;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-class JpaMessageMoveTest extends MessageMoveTest {
-    
-    static final JpaTestCluster JPA_TEST_CLUSTER = 
JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES);
+public class PostgresMessageMapperTest extends MessageMapperTest {
 
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
+
+    private PostgresMapperProvider postgresMapperProvider;
     @Override
     protected MapperProvider createMapperProvider() {
-        return new JPAMapperProvider(JPA_TEST_CLUSTER);
+        postgresMapperProvider = new PostgresMapperProvider(postgresExtension);
+        return postgresMapperProvider;
     }
-    
-    @AfterEach
-    void cleanUp() {
-        JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES);
+
+    @Override
+    protected UpdatableTickingClock updatableTickingClock() {
+        return postgresMapperProvider.getUpdatableTickingClock();
     }
 
 }
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java
similarity index 74%
rename from 
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
rename to 
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java
index a8499468f1..b9c87c578f 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java
@@ -19,24 +19,19 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
-import org.apache.james.backends.jpa.JpaTestCluster;
-import org.apache.james.mailbox.postgres.JPAMailboxFixture;
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
 import org.apache.james.mailbox.store.mail.model.MapperProvider;
 import org.apache.james.mailbox.store.mail.model.MessageMoveTest;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-class JpaMessageMoveTest extends MessageMoveTest {
-    
-    static final JpaTestCluster JPA_TEST_CLUSTER = 
JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES);
+class PostgresMessageMoveTest extends MessageMoveTest {
+
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
 
     @Override
     protected MapperProvider createMapperProvider() {
-        return new JPAMapperProvider(JPA_TEST_CLUSTER);
-    }
-    
-    @AfterEach
-    void cleanUp() {
-        JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES);
+        return new PostgresMapperProvider(postgresExtension);
     }
-
 }
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index ae93906c49..1eb901f3be 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -1217,7 +1217,7 @@ public abstract class MessageMapperTest {
 
         messageMapper.updateFlags(benwaInboxMailbox,
             new FlagsUpdateCalculator(new Flags(Flag.DELETED), 
FlagsUpdateMode.ADD),
-            MessageRange.range(message2.getUid(), message4.getUid()));
+            MessageRange.range(message2.getUid(), 
message4.getUid())).forEachRemaining(any -> {});
         List<MessageUid> uids = 
messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, 
MessageRange.all());
         messageMapper.deleteMessages(benwaInboxMailbox, uids);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to