This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push: new 691fdaa94e JAMES-2586 Implement PostgresMailboxMessageDAO (#1812) 691fdaa94e is described below commit 691fdaa94e2efbbcff603f3d92f083aa59ef69b4 Author: vttran <vtt...@linagora.com> AuthorDate: Wed Nov 29 17:47:32 2023 +0700 JAMES-2586 Implement PostgresMailboxMessageDAO (#1812) --- .../james/backends/postgres/PostgresCommons.java | 70 ++++ .../backends/postgres/utils/PostgresExecutor.java | 7 + mailbox/postgres/pom.xml | 14 + .../postgres/PostgresMailboxAggregateModule.java | 4 +- .../james/mailbox/postgres/PostgresMessageId.java | 88 ++++ .../postgres/mail/PostgresMessageMapper.java | 448 +++++++++++++++++++++ .../postgres/mail/PostgresMessageModule.java | 164 ++++++++ .../postgres/mail/dao/PostgresMailboxDAO.java | 11 + .../mail/dao/PostgresMailboxMessageDAO.java | 378 +++++++++++++++++ .../mail/dao/PostgresMailboxMessageDAOUtils.java | 186 +++++++++ .../postgres/mail/dao/PostgresMessageDAO.java | 91 +++++ .../postgres/mail/JpaMessageMapperTest.java | 156 ------- .../postgres/mail/PostgresMapperProvider.java | 135 +++++++ ...oveTest.java => PostgresMessageMapperTest.java} | 28 +- ...eMoveTest.java => PostgresMessageMoveTest.java} | 21 +- .../store/mail/model/MessageMapperTest.java | 2 +- 16 files changed, 1620 insertions(+), 183 deletions(-) diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java new file mode 100644 index 0000000000..ae4b8ebf5e --- /dev/null +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java @@ -0,0 +1,70 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.postgres; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.Optional; +import java.util.function.Function; + +import org.jooq.DataType; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Table; +import org.jooq.impl.DSL; +import org.jooq.impl.DefaultDataType; +import org.jooq.impl.SQLDataType; +import org.jooq.postgres.extensions.bindings.HstoreBinding; +import org.jooq.postgres.extensions.types.Hstore; + +public class PostgresCommons { + + public interface DataTypes { + + // hstore + DataType<Hstore> HSTORE = DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new HstoreBinding()); + + // timestamp(6) + DataType<LocalDateTime> TIMESTAMP = SQLDataType.LOCALDATETIME(6); + + // text[] + DataType<String[]> STRING_ARRAY = SQLDataType.CLOB.getArrayDataType(); + } + + public interface SimpleTableField { + Field<Object> of(Table<Record> table, Field<?> field); + } + + public static final SimpleTableField TABLE_FIELD = (table, field) -> DSL.field(table.getName() + "." + field.getName()); + + public static final Function<Date, LocalDateTime> DATE_TO_LOCAL_DATE_TIME = date -> Optional.ofNullable(date) + .map(value -> LocalDateTime.ofInstant(value.toInstant(), ZoneOffset.UTC)) + .orElse(null); + public static final Function<LocalDateTime, Date> LOCAL_DATE_TIME_DATE_FUNCTION = localDateTime -> Optional.ofNullable(localDateTime) + .map(value -> value.toInstant(ZoneOffset.UTC)) + .map(Date::from) + .orElse(null); + + public static final Function<Field<?>, Field<?>> UNNEST_FIELD = field -> DSL.function("unnest", field.getType().getComponentType(), field); + + public static final int IN_CLAUSE_MAX_SIZE = 32; + +} diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java index 7a6485108f..05a3556ad0 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java @@ -28,6 +28,7 @@ import javax.inject.Inject; import org.apache.james.core.Domain; import org.jooq.DSLContext; import org.jooq.Record; +import org.jooq.Record1; import org.jooq.SQLDialect; import org.jooq.conf.Settings; import org.jooq.conf.StatementType; @@ -96,6 +97,12 @@ public class PostgresExecutor { .flatMap(queryFunction); } + public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) { + return dslContext() + .flatMap(queryFunction) + .map(Record1::value1); + } + public Mono<Connection> connection() { return connection; } diff --git a/mailbox/postgres/pom.xml b/mailbox/postgres/pom.xml index 690389fab5..1c63841273 100644 --- a/mailbox/postgres/pom.xml +++ b/mailbox/postgres/pom.xml @@ -83,6 +83,20 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-bus-api</artifactId> diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java index db208dd975..9ec68fd6fd 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxAggregateModule.java @@ -21,11 +21,13 @@ package org.apache.james.mailbox.postgres; import org.apache.james.backends.postgres.PostgresModule; import org.apache.james.mailbox.postgres.mail.PostgresMailboxModule; +import org.apache.james.mailbox.postgres.mail.PostgresMessageModule; import org.apache.james.mailbox.postgres.user.PostgresSubscriptionModule; public interface PostgresMailboxAggregateModule { PostgresModule MODULE = PostgresModule.aggregateModules( PostgresMailboxModule.MODULE, - PostgresSubscriptionModule.MODULE); + PostgresSubscriptionModule.MODULE, + PostgresMessageModule.MODULE); } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java new file mode 100644 index 0000000000..c4012f1999 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageId.java @@ -0,0 +1,88 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres; + +import java.util.Objects; +import java.util.UUID; + +import org.apache.james.mailbox.model.MessageId; + +import com.google.common.base.MoreObjects; + +public class PostgresMessageId implements MessageId { + + public static class Factory implements MessageId.Factory { + + @Override + public PostgresMessageId generate() { + return of(UUID.randomUUID()); + } + + public static PostgresMessageId of(UUID uuid) { + return new PostgresMessageId(uuid); + } + + @Override + public PostgresMessageId fromString(String serialized) { + return of(UUID.fromString(serialized)); + } + } + + private final UUID uuid; + + private PostgresMessageId(UUID uuid) { + this.uuid = uuid; + } + + @Override + public String serialize() { + return uuid.toString(); + } + + public UUID asUuid() { + return uuid; + } + + @Override + public boolean isSerializable() { + return true; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof PostgresMessageId) { + PostgresMessageId other = (PostgresMessageId) o; + return Objects.equals(uuid, other.uuid); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(uuid); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("uuid", uuid) + .toString(); + } +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java new file mode 100644 index 0000000000..620d48df7c --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java @@ -0,0 +1,448 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Clock; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import javax.mail.Flags; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.mailbox.ApplicableFlagBuilder; +import org.apache.james.mailbox.FlagsBuilder; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.Content; +import org.apache.james.mailbox.model.Mailbox; +import org.apache.james.mailbox.model.MailboxCounters; +import org.apache.james.mailbox.model.MessageMetaData; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.UpdatedFlags; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO; +import org.apache.james.mailbox.store.FlagsUpdateCalculator; +import org.apache.james.mailbox.store.MailboxReactorUtils; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.streams.Limit; + +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresMessageMapper implements MessageMapper { + + private static final Function<MailboxMessage, ByteSource> MESSAGE_FULL_CONTENT_LOADER = (mailboxMessage) -> new ByteSource() { + @Override + public InputStream openStream() { + try { + return mailboxMessage.getFullContent(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long size() { + return mailboxMessage.metaData().getSize(); + } + }; + + + private final PostgresMessageDAO messageDAO; + private final PostgresMailboxMessageDAO mailboxMessageDAO; + private final PostgresMailboxDAO mailboxDAO; + private final PostgresModSeqProvider modSeqProvider; + private final PostgresUidProvider uidProvider; + private final BlobStore blobStore; + private final Clock clock; + private final BlobId.Factory blobIdFactory; + + public PostgresMessageMapper(PostgresExecutor postgresExecutor, + PostgresModSeqProvider modSeqProvider, + PostgresUidProvider uidProvider, + BlobStore blobStore, + Clock clock, + BlobId.Factory blobIdFactory) { + this.messageDAO = new PostgresMessageDAO(postgresExecutor); + this.mailboxMessageDAO = new PostgresMailboxMessageDAO(postgresExecutor); + this.mailboxDAO = new PostgresMailboxDAO(postgresExecutor); + this.modSeqProvider = modSeqProvider; + this.uidProvider = uidProvider; + this.blobStore = blobStore; + this.clock = clock; + this.blobIdFactory = blobIdFactory; + } + + + @Override + public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType type, int limit) { + return findInMailboxReactive(mailbox, set, type, limit) + .toIterable() + .iterator(); + } + + @Override + public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(Mailbox mailbox, MessageRange set) { + return mailboxMessageDAO.findMessagesMetadata((PostgresMailboxId) mailbox.getMailboxId(), set); + } + + @Override + public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType fetchType, int limitAsInt) { + return Mono.just(messageRange) + .flatMapMany(range -> { + Limit limit = Limit.from(limitAsInt); + switch (messageRange.getType()) { + case ALL: + return mailboxMessageDAO.findMessagesByMailboxId((PostgresMailboxId) mailbox.getMailboxId(), limit); + case FROM: + return mailboxMessageDAO.findMessagesByMailboxIdAndAfterUID((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom(), limit); + case ONE: + return mailboxMessageDAO.findMessageByMailboxIdAndUid((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom()) + .flatMapMany(Flux::just); + case RANGE: + return mailboxMessageDAO.findMessagesByMailboxIdAndBetweenUIDs((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom(), range.getUidTo(), limit); + default: + throw new RuntimeException("Unknown MessageRange range " + range.getType()); + } + }).flatMap(messageBuilderAndBlobId -> { + SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndBlobId.getLeft(); + String blobIdAsString = messageBuilderAndBlobId.getRight(); + switch (fetchType) { + case METADATA: + case ATTACHMENTS_METADATA: + case HEADERS: + return Mono.just(messageBuilder.build()); + case FULL: + return retrieveFullContent(blobIdAsString) + .map(content -> messageBuilder.content(content).build()); + default: + return Flux.error(new RuntimeException("Unknown FetchType " + fetchType)); + } + }); + } + + private Mono<Content> retrieveFullContent(String blobIdString) { + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(blobIdString), SIZE_BASED)) + .map(contentAsBytes -> new Content() { + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(contentAsBytes); + } + + @Override + public long size() { + return contentAsBytes.length; + } + }); + } + + @Override + public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) { + return retrieveMessagesMarkedForDeletionReactive(mailbox, messageRange) + .collectList() + .block(); + } + + @Override + public Flux<MessageUid> retrieveMessagesMarkedForDeletionReactive(Mailbox mailbox, MessageRange messageRange) { + return Mono.just(messageRange) + .flatMapMany(range -> { + switch (messageRange.getType()) { + case ALL: + return mailboxMessageDAO.findDeletedMessagesByMailboxId((PostgresMailboxId) mailbox.getMailboxId()); + case FROM: + return mailboxMessageDAO.findDeletedMessagesByMailboxIdAndAfterUID((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom()); + case ONE: + return mailboxMessageDAO.findDeletedMessageByMailboxIdAndUid((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom()) + .flatMapMany(Flux::just); + case RANGE: + return mailboxMessageDAO.findDeletedMessagesByMailboxIdAndBetweenUIDs((PostgresMailboxId) mailbox.getMailboxId(), range.getUidFrom(), range.getUidTo()); + default: + throw new RuntimeException("Unknown MessageRange type " + range.getType()); + } + }); + } + + @Override + public long countMessagesInMailbox(Mailbox mailbox) { + return mailboxMessageDAO.countTotalMessagesByMailboxId((PostgresMailboxId) mailbox.getMailboxId()) + .block(); + } + + @Override + public MailboxCounters getMailboxCounters(Mailbox mailbox) { + return getMailboxCountersReactive(mailbox).block(); + } + + @Override + public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) { + return mailboxMessageDAO.countTotalMessagesByMailboxId((PostgresMailboxId) mailbox.getMailboxId()) + .flatMap(totalMessage -> mailboxMessageDAO.countUnseenMessagesByMailboxId((PostgresMailboxId) mailbox.getMailboxId()) + .map(unseenMessage -> MailboxCounters.builder() + .mailboxId(mailbox.getMailboxId()) + .count(totalMessage) + .unseen(unseenMessage) + .build())); + } + + @Override + public void delete(Mailbox mailbox, MailboxMessage message) throws MailboxException { + deleteMessages(mailbox, List.of(message.getUid())); + } + + @Override + public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) { + return deleteMessagesReactive(mailbox, uids).block(); + } + + @Override + public Mono<Map<MessageUid, MessageMetaData>> deleteMessagesReactive(Mailbox mailbox, List<MessageUid> uids) { + return mailboxMessageDAO.findMessagesByMailboxIdAndUIDs((PostgresMailboxId) mailbox.getMailboxId(), uids) + .map(SimpleMailboxMessage.Builder::build) + .collectMap(MailboxMessage::getUid, MailboxMessage::metaData) + .flatMap(map -> mailboxMessageDAO.deleteByMailboxIdAndMessageUids((PostgresMailboxId) mailbox.getMailboxId(), uids) + .then(Mono.just(map))); + } + + @Override + public MessageUid findFirstUnseenMessageUid(Mailbox mailbox) { + return mailboxMessageDAO.findFirstUnseenMessageUid((PostgresMailboxId) mailbox.getMailboxId()).block(); + } + + @Override + public Mono<Optional<MessageUid>> findFirstUnseenMessageUidReactive(Mailbox mailbox) { + return mailboxMessageDAO.findFirstUnseenMessageUid((PostgresMailboxId) mailbox.getMailboxId()) + .map(Optional::of) + .switchIfEmpty(Mono.just(Optional.empty())); + } + + @Override + public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) { + return findRecentMessageUidsInMailboxReactive(mailbox).block(); + } + + @Override + public Mono<List<MessageUid>> findRecentMessageUidsInMailboxReactive(Mailbox mailbox) { + return mailboxMessageDAO.findAllRecentMessageUid((PostgresMailboxId) mailbox.getMailboxId()) + .collectList(); + } + + @Override + public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws MailboxException { + return addReactive(mailbox, message).block(); + } + + @Override + public Mono<MessageMetaData> addReactive(Mailbox mailbox, MailboxMessage message) { + return Mono.fromCallable(() -> { + message.setSaveDate(Date.from(clock.instant())); + return message; + }) + .flatMap(this::setNewUidAndModSeq) + .then(saveFullContent(message) + .flatMap(blobId -> messageDAO.insert(message, blobId.asString()))) + .then(Mono.defer(() -> mailboxMessageDAO.insert(message))) + .then(Mono.fromCallable(message::metaData)); + } + + private Mono<BlobId> saveFullContent(MailboxMessage message) { + return Mono.fromCallable(() -> MESSAGE_FULL_CONTENT_LOADER.apply(message)) + .flatMap(bodyByteSource -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST))); + } + + @Override + public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) { + return updateFlagsPublisher(mailbox, flagsUpdateCalculator, range) + .toIterable() + .iterator(); + } + + @Override + public Mono<List<UpdatedFlags>> updateFlagsReactive(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) { + return updateFlagsPublisher(mailbox, flagsUpdateCalculator, range) + .collectList(); + } + + private Flux<UpdatedFlags> updateFlagsPublisher(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange range) { + return mailboxMessageDAO.findMessagesMetadata((PostgresMailboxId) mailbox.getMailboxId(), range) + .flatMap(currentMetaData -> modSeqProvider.nextModSeqReactive(mailbox.getMailboxId()) + .flatMap(newModSeq -> updateFlags(currentMetaData, flagsUpdateCalculator, newModSeq))); + } + + private Mono<UpdatedFlags> updateFlags(ComposedMessageIdWithMetaData currentMetaData, + FlagsUpdateCalculator flagsUpdateCalculator, + ModSeq newModSeq) { + Flags oldFlags = currentMetaData.getFlags(); + Flags newFlags = flagsUpdateCalculator.buildNewFlags(oldFlags); + + ComposedMessageId composedMessageId = currentMetaData.getComposedMessageId(); + + return Mono.just(UpdatedFlags.builder() + .messageId(composedMessageId.getMessageId()) + .oldFlags(oldFlags) + .newFlags(newFlags) + .uid(composedMessageId.getUid())) + .flatMap(builder -> { + if (oldFlags.equals(newFlags)) { + return Mono.just(builder.modSeq(currentMetaData.getModSeq()) + .build()); + } + return Mono.fromCallable(() -> builder.modSeq(newModSeq).build()) + .flatMap(updatedFlags -> mailboxMessageDAO.updateFlag((PostgresMailboxId) composedMessageId.getMailboxId(), composedMessageId.getUid(), updatedFlags) + .thenReturn(updatedFlags)); + }); + } + + @Override + public List<UpdatedFlags> resetRecent(Mailbox mailbox) { + return resetRecentReactive(mailbox).block(); + } + + @Override + public Mono<List<UpdatedFlags>> resetRecentReactive(Mailbox mailbox) { + return mailboxMessageDAO.findAllRecentMessageMetadata((PostgresMailboxId) mailbox.getMailboxId()) + .collectList() + .flatMapMany(mailboxMessageList -> resetRecentFlag((PostgresMailboxId) mailbox.getMailboxId(), mailboxMessageList)) + .collectList(); + } + + private Flux<UpdatedFlags> resetRecentFlag(PostgresMailboxId mailboxId, List<ComposedMessageIdWithMetaData> messageIdWithMetaDataList) { + return Flux.fromIterable(messageIdWithMetaDataList) + .collectMap(m -> m.getComposedMessageId().getUid(), Function.identity()) + .flatMapMany(uidMapping -> modSeqProvider.nextModSeqReactive(mailboxId) + .flatMapMany(newModSeq -> mailboxMessageDAO.resetRecentFlag(mailboxId, List.copyOf(uidMapping.keySet()), newModSeq)) + .map(newMetaData -> UpdatedFlags.builder() + .messageId(newMetaData.getMessageId()) + .modSeq(newMetaData.getModSeq()) + .oldFlags(uidMapping.get(newMetaData.getUid()).getFlags()) + .newFlags(newMetaData.getFlags()) + .uid(newMetaData.getUid()) + .build())); + } + + @Override + public MessageMetaData copy(Mailbox mailbox, MailboxMessage original) throws MailboxException { + return copyReactive(mailbox, original).block(); + } + + private Mono<Void> setNewUidAndModSeq(MailboxMessage mailboxMessage) { + return mailboxDAO.incrementAndGetLastUidAndModSeq(mailboxMessage.getMailboxId()) + .defaultIfEmpty(Pair.of(MessageUid.MIN_VALUE, ModSeq.first())) + .map(pair -> { + mailboxMessage.setUid(pair.getLeft()); + mailboxMessage.setModSeq(pair.getRight()); + return pair; + }).then(); + } + + + @Override + public Mono<MessageMetaData> copyReactive(Mailbox mailbox, MailboxMessage original) { + return Mono.fromCallable(() -> { + MailboxMessage copiedMessage = original.copy(mailbox); + copiedMessage.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flags.Flag.RECENT).build()); + copiedMessage.setSaveDate(Date.from(clock.instant())); + return copiedMessage; + }) + .flatMap(copiedMessage -> setNewUidAndModSeq(copiedMessage) + .then(Mono.defer(() -> mailboxMessageDAO.insert(copiedMessage)) + .thenReturn(copiedMessage)) + .map(MailboxMessage::metaData)); + } + + + @Override + public MessageMetaData move(Mailbox mailbox, MailboxMessage original) { + var t = moveReactive(mailbox, original).block(); + return t; + } + + @Override + public List<MessageMetaData> move(Mailbox mailbox, List<MailboxMessage> original) throws MailboxException { + return MailboxReactorUtils.block(moveReactive(mailbox, original)); + } + + + @Override + public Mono<MessageMetaData> moveReactive(Mailbox mailbox, MailboxMessage original) { + return copyReactive(mailbox, original) + .flatMap(copiedResult -> mailboxMessageDAO.deleteByMailboxIdAndMessageUid((PostgresMailboxId) original.getMailboxId(), original.getUid()) + .thenReturn(copiedResult)); + } + + @Override + public Optional<MessageUid> getLastUid(Mailbox mailbox) { + return uidProvider.lastUid(mailbox); + } + + @Override + public Mono<Optional<MessageUid>> getLastUidReactive(Mailbox mailbox) { + return uidProvider.lastUidReactive(mailbox); + } + + @Override + public ModSeq getHighestModSeq(Mailbox mailbox) { + return modSeqProvider.highestModSeq(mailbox); + } + + @Override + public Mono<ModSeq> getHighestModSeqReactive(Mailbox mailbox) { + return modSeqProvider.highestModSeqReactive(mailbox); + } + + @Override + public Flags getApplicableFlag(Mailbox mailbox) { + return getApplicableFlagReactive(mailbox).block(); + } + + @Override + public Mono<Flags> getApplicableFlagReactive(Mailbox mailbox) { + return mailboxMessageDAO.listDistinctUserFlags((PostgresMailboxId) mailbox.getMailboxId()) + .map(flags -> ApplicableFlagBuilder.builder().add(flags).build()); + } + + @Override + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { + return mailboxMessageDAO.listAllMessageUid((PostgresMailboxId) mailbox.getMailboxId()); + } + +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java new file mode 100644 index 0000000000..dd3cde8727 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageModule.java @@ -0,0 +1,164 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import static org.jooq.impl.DSL.foreignKey; + +import java.time.LocalDateTime; +import java.util.UUID; + +import org.apache.james.backends.postgres.PostgresCommons.DataTypes; +import org.apache.james.backends.postgres.PostgresIndex; +import org.apache.james.backends.postgres.PostgresModule; +import org.apache.james.backends.postgres.PostgresTable; +import org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Table; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.jooq.postgres.extensions.types.Hstore; + +public interface PostgresMessageModule { + + Field<UUID> MESSAGE_ID = DSL.field("message_id", SQLDataType.UUID.notNull()); + Field<LocalDateTime> INTERNAL_DATE = DSL.field("internal_date", DataTypes.TIMESTAMP); + Field<Long> SIZE = DSL.field("size", SQLDataType.BIGINT.notNull()); + + interface MessageTable { + Table<Record> TABLE_NAME = DSL.table("message"); + Field<UUID> MESSAGE_ID = PostgresMessageModule.MESSAGE_ID; + Field<String> BLOB_ID = DSL.field("blob_id", SQLDataType.VARCHAR(200).notNull()); + Field<String> MIME_TYPE = DSL.field("mime_type", SQLDataType.VARCHAR(200)); + Field<String> MIME_SUBTYPE = DSL.field("mime_subtype", SQLDataType.VARCHAR(200)); + Field<LocalDateTime> INTERNAL_DATE = PostgresMessageModule.INTERNAL_DATE; + Field<Long> SIZE = PostgresMessageModule.SIZE; + Field<Integer> BODY_START_OCTET = DSL.field("body_start_octet", SQLDataType.INTEGER.notNull()); + Field<byte[]> HEADER_CONTENT = DSL.field("header_content", SQLDataType.BLOB.notNull()); + Field<Integer> TEXTUAL_LINE_COUNT = DSL.field("textual_line_count", SQLDataType.INTEGER); + + Field<String> CONTENT_DESCRIPTION = DSL.field("content_description", SQLDataType.VARCHAR(200)); + Field<String> CONTENT_LOCATION = DSL.field("content_location", SQLDataType.VARCHAR(200)); + Field<String> CONTENT_TRANSFER_ENCODING = DSL.field("content_transfer_encoding", SQLDataType.VARCHAR(200)); + Field<String> CONTENT_DISPOSITION_TYPE = DSL.field("content_disposition_type", SQLDataType.VARCHAR(200)); + Field<String> CONTENT_ID = DSL.field("content_id", SQLDataType.VARCHAR(200)); + Field<String> CONTENT_MD5 = DSL.field("content_md5", SQLDataType.VARCHAR(200)); + Field<String[]> CONTENT_LANGUAGE = DSL.field("content_language", DataTypes.STRING_ARRAY); + Field<Hstore> CONTENT_TYPE_PARAMETERS = DSL.field("content_type_parameters", DataTypes.HSTORE); + Field<Hstore> CONTENT_DISPOSITION_PARAMETERS = DSL.field("content_disposition_parameters", DataTypes.HSTORE); + + PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName()) + .createTableStep(((dsl, tableName) -> dsl.createTableIfNotExists(tableName) + .column(MESSAGE_ID) + .column(BLOB_ID) + .column(MIME_TYPE) + .column(MIME_SUBTYPE) + .column(INTERNAL_DATE) + .column(SIZE) + .column(BODY_START_OCTET) + .column(HEADER_CONTENT) + .column(TEXTUAL_LINE_COUNT) + .column(CONTENT_DESCRIPTION) + .column(CONTENT_LOCATION) + .column(CONTENT_TRANSFER_ENCODING) + .column(CONTENT_DISPOSITION_TYPE) + .column(CONTENT_ID) + .column(CONTENT_MD5) + .column(CONTENT_LANGUAGE) + .column(CONTENT_TYPE_PARAMETERS) + .column(CONTENT_DISPOSITION_PARAMETERS) + .constraint(DSL.primaryKey(MESSAGE_ID)) + .comment("Holds the metadata of a mail"))) + .supportsRowLevelSecurity(); + } + + interface MessageToMailboxTable { + Table<Record> TABLE_NAME = DSL.table("message_mailbox"); + Field<UUID> MAILBOX_ID = DSL.field("mailbox_id", SQLDataType.UUID.notNull()); + Field<Long> MESSAGE_UID = DSL.field("message_uid", SQLDataType.BIGINT.notNull()); + Field<Long> MOD_SEQ = DSL.field("mod_seq", SQLDataType.BIGINT.notNull()); + Field<UUID> MESSAGE_ID = PostgresMessageModule.MESSAGE_ID; + Field<UUID> THREAD_ID = DSL.field("thread_id", SQLDataType.UUID); + Field<LocalDateTime> INTERNAL_DATE = PostgresMessageModule.INTERNAL_DATE; + Field<Long> SIZE = PostgresMessageModule.SIZE; + Field<Boolean> IS_DELETED = DSL.field("is_deleted", SQLDataType.BOOLEAN.nullable(false) + .defaultValue(DSL.field("false", SQLDataType.BOOLEAN))); + Field<Boolean> IS_ANSWERED = DSL.field("is_answered", SQLDataType.BOOLEAN.nullable(false)); + Field<Boolean> IS_DRAFT = DSL.field("is_draft", SQLDataType.BOOLEAN.nullable(false)); + Field<Boolean> IS_FLAGGED = DSL.field("is_flagged", SQLDataType.BOOLEAN.nullable(false)); + Field<Boolean> IS_RECENT = DSL.field("is_recent", SQLDataType.BOOLEAN.nullable(false)); + Field<Boolean> IS_SEEN = DSL.field("is_seen", SQLDataType.BOOLEAN.nullable(false)); + Field<String[]> USER_FLAGS = DSL.field("user_flags", DataTypes.STRING_ARRAY); + Field<LocalDateTime> SAVE_DATE = DSL.field("save_date", DataTypes.TIMESTAMP); + + + PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName()) + .createTableStep(((dsl, tableName) -> dsl.createTableIfNotExists(tableName) + .column(MAILBOX_ID) + .column(MESSAGE_UID) + .column(MOD_SEQ) + .column(MESSAGE_ID) + .column(THREAD_ID) + .column(INTERNAL_DATE) + .column(SIZE) + .column(IS_DELETED) + .column(IS_ANSWERED) + .column(IS_DRAFT) + .column(IS_FLAGGED) + .column(IS_RECENT) + .column(IS_SEEN) + .column(USER_FLAGS) + .column(SAVE_DATE) + .constraints(DSL.primaryKey(MAILBOX_ID, MESSAGE_UID), + foreignKey(MAILBOX_ID).references(PostgresMailboxTable.TABLE_NAME, PostgresMailboxTable.MAILBOX_ID), + foreignKey(MESSAGE_ID).references(MessageTable.TABLE_NAME, MessageTable.MESSAGE_ID)) + .comment("Holds mailbox and flags for each message"))) + .supportsRowLevelSecurity(); + + PostgresIndex MESSAGE_ID_INDEX = PostgresIndex.name("message_mailbox_message_id_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, MESSAGE_ID)); + + PostgresIndex MAILBOX_ID_MESSAGE_UID_INDEX = PostgresIndex.name("mailbox_id_mail_uid_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, MAILBOX_ID, MESSAGE_UID.asc())); + PostgresIndex MAILBOX_ID_IS_SEEN_MESSAGE_UID_INDEX = PostgresIndex.name("mailbox_id_is_seen_mail_uid_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, MAILBOX_ID, IS_SEEN, MESSAGE_UID.asc())); + PostgresIndex MAILBOX_ID_IS_RECENT_MESSAGE_UID_INDEX = PostgresIndex.name("mailbox_id_is_recent_mail_uid_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, MAILBOX_ID, IS_RECENT, MESSAGE_UID.asc())); + PostgresIndex MAILBOX_ID_IS_DELETE_MESSAGE_UID_INDEX = PostgresIndex.name("mailbox_id_is_delete_mail_uid_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, MAILBOX_ID, IS_DELETED, MESSAGE_UID.asc())); + + } + + PostgresModule MODULE = PostgresModule.builder() + .addTable(MessageTable.TABLE) + .addTable(MessageToMailboxTable.TABLE) + .addIndex(MessageToMailboxTable.MESSAGE_ID_INDEX) + .addIndex(MessageToMailboxTable.MAILBOX_ID_MESSAGE_UID_INDEX) + .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_SEEN_MESSAGE_UID_INDEX) + .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_RECENT_MESSAGE_UID_INDEX) + .addIndex(MessageToMailboxTable.MAILBOX_ID_IS_DELETE_MESSAGE_UID_INDEX) + .build(); + +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java index f820a2ce07..22ccdf9e04 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java @@ -39,6 +39,7 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.postgres.utils.PostgresExecutor; import org.apache.james.core.Username; import org.apache.james.mailbox.MessageUid; @@ -238,4 +239,14 @@ public class PostgresMailboxDAO { .map(record -> record.get(MAILBOX_HIGHEST_MODSEQ)) .map(ModSeq::of); } + + public Mono<Pair<MessageUid, ModSeq>> incrementAndGetLastUidAndModSeq(MailboxId mailboxId) { + int increment = 1; + return postgresExecutor.executeRow(dsl -> Mono.from(dsl.update(TABLE_NAME) + .set(MAILBOX_LAST_UID, coalesce(MAILBOX_LAST_UID, 0L).add(increment)) + .set(MAILBOX_HIGHEST_MODSEQ, coalesce(MAILBOX_HIGHEST_MODSEQ, 0L).add(increment)) + .where(MAILBOX_ID.eq(getMailboxId(mailboxId).asUuid())) + .returning(MAILBOX_LAST_UID, MAILBOX_HIGHEST_MODSEQ))) + .map(record -> Pair.of(MessageUid.of(record.get(MAILBOX_LAST_UID)), ModSeq.of(record.get(MAILBOX_HIGHEST_MODSEQ)))); + } } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java new file mode 100644 index 0000000000..c55132b59f --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java @@ -0,0 +1,378 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail.dao; + + +import static org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME; +import static org.apache.james.backends.postgres.PostgresCommons.IN_CLAUSE_MAX_SIZE; +import static org.apache.james.backends.postgres.PostgresCommons.TABLE_FIELD; +import static org.apache.james.backends.postgres.PostgresCommons.UNNEST_FIELD; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BLOB_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DELETED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DRAFT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_FLAGGED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_RECENT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_SEEN; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MAILBOX_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_UID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MOD_SEQ; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.SAVE_DATE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.TABLE_NAME; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.THREAD_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.USER_FLAGS; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.BOOLEAN_FLAGS_MAPPING; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.MESSAGE_METADATA_FIELDS_REQUIRE; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_METADATA_FUNCTION; +import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_UID_FUNCTION; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import javax.mail.Flags; +import javax.mail.Flags.Flag; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageMetaData; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.UpdatedFlags; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.streams.Limit; +import org.jooq.Condition; +import org.jooq.DSLContext; +import org.jooq.Record; +import org.jooq.Record1; +import org.jooq.SelectFinalStep; +import org.jooq.SelectSeekStep1; +import org.jooq.SortField; +import org.jooq.TableOnConditionStep; +import org.jooq.UpdateConditionStep; +import org.jooq.UpdateSetStep; +import org.jooq.impl.DSL; + +import com.google.common.collect.Iterables; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresMailboxMessageDAO { + + private static final TableOnConditionStep<Record> MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP = TABLE_NAME.join(MessageTable.TABLE_NAME) + .on(TABLE_FIELD.of(TABLE_NAME, MESSAGE_ID).eq(TABLE_FIELD.of(MessageTable.TABLE_NAME, MessageTable.MESSAGE_ID))); + + public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); + + private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { + SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(extraCondition) + .orderBy(MESSAGE_UID.asc()); + return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) + .orElse(queryWithoutLimit); + } + + private final PostgresExecutor postgresExecutor; + + public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) { + this.postgresExecutor = postgresExecutor; + } + + public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, + IS_SEEN.eq(false), Limit.limit(1), dslContext))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, + IS_RECENT.eq(true), Limit.unlimited(), dslContext))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, + DSL.noCondition(), Limit.unlimited(), dslContext))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Mono<MessageMetaData> deleteByMailboxIdAndMessageUid(PostgresMailboxId mailboxId, MessageUid messageUid) { + return postgresExecutor.executeRow(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.eq(messageUid.asLong())) + .returning(MESSAGE_METADATA_FIELDS_REQUIRE))) + .map(RECORD_TO_MESSAGE_METADATA_FUNCTION); + } + + public Flux<MessageMetaData> deleteByMailboxIdAndMessageUids(PostgresMailboxId mailboxId, List<MessageUid> uids) { + Function<List<MessageUid>, Flux<MessageMetaData>> deletePublisherFunction = uidsToDelete -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.deleteFrom(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.in(uidsToDelete.stream().map(MessageUid::asLong).toArray(Long[]::new))) + .returning(MESSAGE_METADATA_FIELDS_REQUIRE))) + .map(RECORD_TO_MESSAGE_METADATA_FUNCTION); + + if (uids.size() <= IN_CLAUSE_MAX_SIZE) { + return deletePublisherFunction.apply(uids); + } else { + return Flux.fromIterable(Iterables.partition(uids, IN_CLAUSE_MAX_SIZE)) + .flatMap(deletePublisherFunction); + } + } + + public Mono<Integer> countUnseenMessagesByMailboxId(PostgresMailboxId mailboxId) { + return postgresExecutor.executeCount(dslContext -> Mono.from(dslContext.selectCount() + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)))); + } + + public Mono<Integer> countTotalMessagesByMailboxId(PostgresMailboxId mailboxId) { + return postgresExecutor.executeCount(dslContext -> Mono.from(dslContext.selectCount() + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())))); + } + + public Flux<Pair<SimpleMailboxMessage.Builder, String>> findMessagesByMailboxId(PostgresMailboxId mailboxId, Limit limit) { + Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select() + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .orderBy(DEFAULT_SORT_ORDER_BY); + + return postgresExecutor.executeRows(dslContext -> limit.getLimit() + .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) + .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) + .map(record -> Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), record.get(BLOB_ID))); + } + + public Flux<Pair<SimpleMailboxMessage.Builder, String>> findMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, MessageUid from, MessageUid to, Limit limit) { + Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select() + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.greaterOrEqual(from.asLong())) + .and(MESSAGE_UID.lessOrEqual(to.asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY); + + return postgresExecutor.executeRows(dslContext -> limit.getLimit() + .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) + .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) + .map(record -> Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), record.get(BLOB_ID))); + } + + public Mono<Pair<SimpleMailboxMessage.Builder, String>> findMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid uid) { + return postgresExecutor.executeRow(dslContext -> Mono.from(dslContext.select() + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.eq(uid.asLong())))) + .map(record -> Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), record.get(BLOB_ID))); + } + + public Flux<Pair<SimpleMailboxMessage.Builder, String>> findMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, MessageUid from, Limit limit) { + Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select() + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.greaterOrEqual(from.asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY); + + return postgresExecutor.executeRows(dslContext -> limit.getLimit() + .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) + .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) + .map(record -> Pair.of(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION.apply(record), record.get(BLOB_ID))); + } + + public Flux<SimpleMailboxMessage.Builder> findMessagesByMailboxIdAndUIDs(PostgresMailboxId mailboxId, List<MessageUid> uids) { + Function<List<MessageUid>, Flux<SimpleMailboxMessage.Builder>> queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new))) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION); + + if (uids.size() <= IN_CLAUSE_MAX_SIZE) { + return queryPublisherFunction.apply(uids); + } else { + return Flux.fromIterable(Iterables.partition(uids, IN_CLAUSE_MAX_SIZE)) + .flatMap(queryPublisherFunction); + } + } + + public Flux<MessageUid> findDeletedMessagesByMailboxId(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_DELETED.eq(true)) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Flux<MessageUid> findDeletedMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, MessageUid from, MessageUid to) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_DELETED.eq(true)) + .and(MESSAGE_UID.greaterOrEqual(from.asLong())) + .and(MESSAGE_UID.lessOrEqual(to.asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Flux<MessageUid> findDeletedMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, MessageUid from) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_DELETED.eq(true)) + .and(MESSAGE_UID.greaterOrEqual(from.asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Mono<MessageUid> findDeletedMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid uid) { + return postgresExecutor.executeRow(dslContext -> Mono.from(dslContext.select(MESSAGE_UID) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_DELETED.eq(true)) + .and(MESSAGE_UID.eq(uid.asLong())))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + } + + public Flux<ComposedMessageIdWithMetaData> findMessagesMetadata(PostgresMailboxId mailboxId, MessageRange range) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + } + + public Flux<ComposedMessageIdWithMetaData> findMessagesMetadata(PostgresMailboxId mailboxId, List<MessageUid> messageUids) { + Function<List<MessageUid>, Flux<ComposedMessageIdWithMetaData>> queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new))) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + + if (messageUids.size() <= IN_CLAUSE_MAX_SIZE) { + return queryPublisherFunction.apply(messageUids); + } else { + return Flux.fromIterable(Iterables.partition(messageUids, IN_CLAUSE_MAX_SIZE)) + .flatMap(queryPublisherFunction); + } + } + + public Flux<ComposedMessageIdWithMetaData> findAllRecentMessageMetadata(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_RECENT.eq(true)) + .orderBy(DEFAULT_SORT_ORDER_BY))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + } + + public Mono<Void> updateFlag(PostgresMailboxId mailboxId, MessageUid uid, UpdatedFlags updatedFlags) { + return postgresExecutor.executeVoid(dslContext -> + Mono.from(buildUpdateFlagStatement(dslContext, updatedFlags, mailboxId, uid))); + } + + public Mono<Flags> listDistinctUserFlags(PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.selectDistinct(UNNEST_FIELD.apply(USER_FLAGS)) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())))) + .map(record -> record.get(0, String.class)) + .collectList() + .map(flagList -> { + Flags flags = new Flags(); + flagList.forEach(flags::add); + return flags; + }); + } + + private UpdateConditionStep<Record> buildUpdateFlagStatement(DSLContext dslContext, UpdatedFlags updatedFlags, + PostgresMailboxId mailboxId, MessageUid uid) { + AtomicReference<UpdateSetStep<Record>> updateStatement = new AtomicReference<>(dslContext.update(TABLE_NAME)); + + BOOLEAN_FLAGS_MAPPING.forEach((flagColumn, flagMapped) -> { + if (updatedFlags.isChanged(flagMapped)) { + updateStatement.getAndUpdate(currentStatement -> { + if (flagMapped.equals(Flag.RECENT)) { + return currentStatement.set(flagColumn, updatedFlags.getNewFlags().contains(Flag.RECENT)); + } + return currentStatement.set(flagColumn, updatedFlags.isModifiedToSet(flagMapped)); + }); + } + }); + + return updateStatement.get() + .set(USER_FLAGS, updatedFlags.getNewFlags().getUserFlags()) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.eq(uid.asLong())); + } + + public Flux<MessageMetaData> resetRecentFlag(PostgresMailboxId mailboxId, List<MessageUid> uids, ModSeq newModSeq) { + Function<List<MessageUid>, Flux<MessageMetaData>> queryPublisherFunction = uidsMatching -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.update(TABLE_NAME) + .set(IS_RECENT, false) + .set(MOD_SEQ, newModSeq.asLong()) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.in(uidsMatching.stream().map(MessageUid::asLong).toArray(Long[]::new))) + .and(MOD_SEQ.notEqual(newModSeq.asLong())) + .returning(MESSAGE_METADATA_FIELDS_REQUIRE))) + .map(RECORD_TO_MESSAGE_METADATA_FUNCTION); + if (uids.size() <= IN_CLAUSE_MAX_SIZE) { + return queryPublisherFunction.apply(uids); + } else { + return Flux.fromIterable(Iterables.partition(uids, IN_CLAUSE_MAX_SIZE)) + .flatMap(queryPublisherFunction); + } + } + + public Mono<Void> insert(MailboxMessage mailboxMessage) { + return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.insertInto(TABLE_NAME) + .set(MAILBOX_ID, ((PostgresMailboxId) mailboxMessage.getMailboxId()).asUuid()) + .set(MESSAGE_UID, mailboxMessage.getUid().asLong()) + .set(MOD_SEQ, mailboxMessage.getModSeq().asLong()) + .set(MESSAGE_ID, ((PostgresMessageId) mailboxMessage.getMessageId()).asUuid()) + .set(THREAD_ID, ((PostgresMessageId) mailboxMessage.getThreadId().getBaseMessageId()).asUuid()) + .set(INTERNAL_DATE, DATE_TO_LOCAL_DATE_TIME.apply(mailboxMessage.getInternalDate())) + .set(SIZE, mailboxMessage.getFullContentOctets()) + .set(IS_DELETED, mailboxMessage.isDeleted()) + .set(IS_ANSWERED, mailboxMessage.isAnswered()) + .set(IS_DRAFT, mailboxMessage.isDraft()) + .set(IS_FLAGGED, mailboxMessage.isFlagged()) + .set(IS_RECENT, mailboxMessage.isRecent()) + .set(IS_SEEN, mailboxMessage.isSeen()) + .set(USER_FLAGS, mailboxMessage.createFlags().getUserFlags()) + .set(SAVE_DATE, mailboxMessage.getSaveDate().map(DATE_TO_LOCAL_DATE_TIME).orElse(null)))); + } + +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java new file mode 100644 index 0000000000..1d832e20c5 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOUtils.java @@ -0,0 +1,186 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail.dao; + +import static org.apache.james.backends.postgres.PostgresCommons.LOCAL_DATE_TIME_DATE_FUNCTION; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_START_OCTET; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_PARAMETERS; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LANGUAGE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LOCATION; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_MD5; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TRANSFER_ENCODING; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TYPE_PARAMETERS; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DELETED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_DRAFT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_FLAGGED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_RECENT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_SEEN; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MAILBOX_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MESSAGE_UID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.MOD_SEQ; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.SAVE_DATE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.THREAD_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.USER_FLAGS; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import javax.mail.Flags; + +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.Content; +import org.apache.james.mailbox.model.MessageMetaData; +import org.apache.james.mailbox.model.ThreadId; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.postgres.mail.PostgresMessageModule; +import org.apache.james.mailbox.store.mail.model.impl.Properties; +import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.jooq.Field; +import org.jooq.Record; + +interface PostgresMailboxMessageDAOUtils { + Map<Field<Boolean>, Flags.Flag> BOOLEAN_FLAGS_MAPPING = Map.of( + IS_ANSWERED, Flags.Flag.ANSWERED, + IS_DELETED, Flags.Flag.DELETED, + IS_DRAFT, Flags.Flag.DRAFT, + IS_FLAGGED, Flags.Flag.FLAGGED, + IS_RECENT, Flags.Flag.RECENT, + IS_SEEN, Flags.Flag.SEEN); + Function<Record, MessageUid> RECORD_TO_MESSAGE_UID_FUNCTION = record -> MessageUid.of(record.get(MESSAGE_UID)); + Function<Record, Flags> RECORD_TO_FLAGS_FUNCTION = record -> { + Flags flags = new Flags(); + BOOLEAN_FLAGS_MAPPING.forEach((flagColumn, flagMapped) -> { + if (record.get(flagColumn)) { + flags.add(flagMapped); + } + }); + + Optional.ofNullable(record.get(USER_FLAGS)).stream() + .flatMap(Arrays::stream) + .forEach(flags::add); + return flags; + }; + + Function<Record, ThreadId> RECORD_TO_THREAD_ID_FUNCTION = record -> Optional.ofNullable(record.get(THREAD_ID)) + .map(threadIdAsUuid -> ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdAsUuid))) + .orElse(ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(record.get(MESSAGE_ID)))); + + + Field<?>[] MESSAGE_METADATA_FIELDS_REQUIRE = new Field[] { + MESSAGE_UID, + MOD_SEQ, + SIZE, + INTERNAL_DATE, + SAVE_DATE, + MESSAGE_ID, + THREAD_ID, + IS_ANSWERED, + IS_DELETED, + IS_DRAFT, + IS_FLAGGED, + IS_RECENT, + IS_SEEN, + USER_FLAGS + }; + + Function<Record, MessageMetaData> RECORD_TO_MESSAGE_METADATA_FUNCTION = record -> + new MessageMetaData(MessageUid.of(record.get(MESSAGE_UID)), + ModSeq.of(record.get(MOD_SEQ)), + RECORD_TO_FLAGS_FUNCTION.apply(record), + record.get(SIZE), + LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(INTERNAL_DATE)), + Optional.ofNullable(record.get(SAVE_DATE)).map(LOCAL_DATE_TIME_DATE_FUNCTION), + PostgresMessageId.Factory.of(record.get(MESSAGE_ID)), + RECORD_TO_THREAD_ID_FUNCTION.apply(record)); + + Function<Record, ComposedMessageIdWithMetaData> RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION = record -> ComposedMessageIdWithMetaData + .builder() + .composedMessageId(new ComposedMessageId(PostgresMailboxId.of(record.get(MAILBOX_ID)), + PostgresMessageId.Factory.of(record.get(MESSAGE_ID)), + MessageUid.of(record.get(MESSAGE_UID)))) + .threadId(RECORD_TO_THREAD_ID_FUNCTION.apply(record)) + .flags(RECORD_TO_FLAGS_FUNCTION.apply(record)) + .modSeq(ModSeq.of(record.get(MOD_SEQ))) + .build(); + + Function<Record, Properties> RECORD_TO_PROPERTIES_FUNCTION = record -> { + PropertyBuilder property = new PropertyBuilder(); + + property.setMediaType(record.get(PostgresMessageModule.MessageTable.MIME_TYPE)); + property.setSubType(record.get(PostgresMessageModule.MessageTable.MIME_SUBTYPE)); + property.setTextualLineCount(Optional.ofNullable(record.get(PostgresMessageModule.MessageTable.TEXTUAL_LINE_COUNT)) + .map(Long::valueOf) + .orElse(null)); + + property.setContentID(record.get(CONTENT_ID)); + property.setContentMD5(record.get(CONTENT_MD5)); + property.setContentTransferEncoding(record.get(CONTENT_TRANSFER_ENCODING)); + property.setContentLocation(record.get(CONTENT_LOCATION)); + property.setContentLanguage(Optional.ofNullable(record.get(CONTENT_LANGUAGE)).map(List::of).orElse(null)); + property.setContentDispositionParameters(record.get(CONTENT_DISPOSITION_PARAMETERS, LinkedHashMap.class)); + property.setContentTypeParameters(record.get(CONTENT_TYPE_PARAMETERS, LinkedHashMap.class)); + return property.build(); + }; + + Function<byte[], Content> BYTE_TO_CONTENT_FUNCTION = contentAsBytes -> new Content() { + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(contentAsBytes); + } + + @Override + public long size() { + return contentAsBytes.length; + } + }; + + Function<Record, SimpleMailboxMessage.Builder> RECORD_TO_MAILBOX_MESSAGE_BUILDER_FUNCTION = record -> SimpleMailboxMessage.builder() + .messageId(PostgresMessageId.Factory.of(record.get(MESSAGE_ID))) + .mailboxId(PostgresMailboxId.of(record.get(MAILBOX_ID))) + .uid(MessageUid.of(record.get(MESSAGE_UID))) + .threadId(RECORD_TO_THREAD_ID_FUNCTION.apply(record)) + .internalDate(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(PostgresMessageModule.MessageTable.INTERNAL_DATE, LocalDateTime.class))) + .saveDate(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(SAVE_DATE, LocalDateTime.class))) + .flags(RECORD_TO_FLAGS_FUNCTION.apply(record)) + .size(record.get(PostgresMessageModule.MessageTable.SIZE)) + .bodyStartOctet(record.get(BODY_START_OCTET)) + .content(BYTE_TO_CONTENT_FUNCTION.apply(record.get(HEADER_CONTENT))) + .properties(RECORD_TO_PROPERTIES_FUNCTION.apply(record)); + + +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java new file mode 100644 index 0000000000..5437307788 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMessageDAO.java @@ -0,0 +1,91 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail.dao; + +import static org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BLOB_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_START_OCTET; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DESCRIPTION; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_PARAMETERS; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_DISPOSITION_TYPE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LANGUAGE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_LOCATION; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_MD5; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TRANSFER_ENCODING; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.CONTENT_TYPE_PARAMETERS; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MESSAGE_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MIME_SUBTYPE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.MIME_TYPE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.TABLE_NAME; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.TEXTUAL_LINE_COUNT; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.jooq.postgres.extensions.types.Hstore; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class PostgresMessageDAO { + public static final long DEFAULT_LONG_VALUE = 0L; + private final PostgresExecutor postgresExecutor; + + public PostgresMessageDAO(PostgresExecutor postgresExecutor) { + this.postgresExecutor = postgresExecutor; + } + + public Mono<Void> insert(MailboxMessage message, String blobId) { + return Mono.fromCallable(() -> IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets())) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(headerContentAsByte -> postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.insertInto(TABLE_NAME) + .set(MESSAGE_ID, ((PostgresMessageId) message.getMessageId()).asUuid()) + .set(BLOB_ID, blobId) + .set(MIME_TYPE, message.getMediaType()) + .set(MIME_SUBTYPE, message.getSubType()) + .set(INTERNAL_DATE, DATE_TO_LOCAL_DATE_TIME.apply(message.getInternalDate())) + .set(SIZE, message.getFullContentOctets()) + .set(BODY_START_OCTET, (int) (message.getFullContentOctets() - message.getBodyOctets())) + .set(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE).intValue()) + .set(CONTENT_DESCRIPTION, message.getProperties().getContentDescription()) + .set(CONTENT_DISPOSITION_TYPE, message.getProperties().getContentDispositionType()) + .set(CONTENT_ID, message.getProperties().getContentID()) + .set(CONTENT_MD5, message.getProperties().getContentMD5()) + .set(CONTENT_LANGUAGE, message.getProperties().getContentLanguage().toArray(new String[0])) + .set(CONTENT_LOCATION, message.getProperties().getContentLocation()) + .set(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding()) + .set(CONTENT_TYPE_PARAMETERS, Hstore.hstore(message.getProperties().getContentTypeParameters())) + .set(CONTENT_DISPOSITION_PARAMETERS, Hstore.hstore(message.getProperties().getContentDispositionParameters())) + .set(HEADER_CONTENT, headerContentAsByte)))); + } + + public Mono<Void> deleteByMessageId(PostgresMessageId messageId) { + return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())))); + } + +} diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java deleted file mode 100644 index 5041b74302..0000000000 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMapperTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.postgres.mail; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.Optional; - -import javax.mail.Flags; - -import org.apache.james.backends.jpa.JpaTestCluster; -import org.apache.james.mailbox.FlagsBuilder; -import org.apache.james.mailbox.MessageManager; -import org.apache.james.mailbox.ModSeq; -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.postgres.JPAMailboxFixture; -import org.apache.james.mailbox.model.UpdatedFlags; -import org.apache.james.mailbox.store.FlagsUpdateCalculator; -import org.apache.james.mailbox.store.mail.model.MapperProvider; -import org.apache.james.mailbox.store.mail.model.MessageMapperTest; -import org.apache.james.utils.UpdatableTickingClock; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -class JpaMessageMapperTest extends MessageMapperTest { - - static final JpaTestCluster JPA_TEST_CLUSTER = JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES); - - @Override - protected MapperProvider createMapperProvider() { - return new JPAMapperProvider(JPA_TEST_CLUSTER); - } - - @Override - protected UpdatableTickingClock updatableTickingClock() { - return null; - } - - @AfterEach - void cleanUp() { - JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES); - } - - @Test - @Override - public void flagsAdditionShouldReturnAnUpdatedFlagHighlightingTheAddition() throws MailboxException { - saveMessages(); - messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE)); - ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox); - - // JPA does not support MessageId - assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD))) - .contains(UpdatedFlags.builder() - .uid(message1.getUid()) - .modSeq(modSeq.next()) - .oldFlags(new Flags(Flags.Flag.FLAGGED)) - .newFlags(new FlagsBuilder().add(Flags.Flag.SEEN, Flags.Flag.FLAGGED).build()) - .build()); - } - - @Test - @Override - public void flagsReplacementShouldReturnAnUpdatedFlagHighlightingTheReplacement() throws MailboxException { - saveMessages(); - ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox); - Optional<UpdatedFlags> updatedFlags = messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), - new FlagsUpdateCalculator(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE)); - - // JPA does not support MessageId - assertThat(updatedFlags) - .contains(UpdatedFlags.builder() - .uid(message1.getUid()) - .modSeq(modSeq.next()) - .oldFlags(new Flags()) - .newFlags(new Flags(Flags.Flag.FLAGGED)) - .build()); - } - - @Test - @Override - public void flagsRemovalShouldReturnAnUpdatedFlagHighlightingTheRemoval() throws MailboxException { - saveMessages(); - messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new FlagsBuilder().add(Flags.Flag.FLAGGED, Flags.Flag.SEEN).build(), MessageManager.FlagsUpdateMode.REPLACE)); - ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox); - - // JPA does not support MessageId - assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REMOVE))) - .contains( - UpdatedFlags.builder() - .uid(message1.getUid()) - .modSeq(modSeq.next()) - .oldFlags(new FlagsBuilder().add(Flags.Flag.SEEN, Flags.Flag.FLAGGED).build()) - .newFlags(new Flags(Flags.Flag.FLAGGED)) - .build()); - } - - @Test - @Override - public void userFlagsUpdateShouldReturnCorrectUpdatedFlags() throws MailboxException { - saveMessages(); - ModSeq modSeq = messageMapper.getHighestModSeq(benwaInboxMailbox); - - // JPA does not support MessageId - assertThat(messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), new FlagsUpdateCalculator(new Flags(USER_FLAG), MessageManager.FlagsUpdateMode.ADD))) - .contains( - UpdatedFlags.builder() - .uid(message1.getUid()) - .modSeq(modSeq.next()) - .oldFlags(new Flags()) - .newFlags(new Flags(USER_FLAG)) - .build()); - } - - @Test - @Override - public void userFlagsUpdateShouldReturnCorrectUpdatedFlagsWhenNoop() throws MailboxException { - saveMessages(); - - // JPA does not support MessageId - assertThat( - messageMapper.updateFlags(benwaInboxMailbox,message1.getUid(), - new FlagsUpdateCalculator(new Flags(USER_FLAG), MessageManager.FlagsUpdateMode.REMOVE))) - .contains( - UpdatedFlags.builder() - .uid(message1.getUid()) - .modSeq(message1.getModSeq()) - .oldFlags(new Flags()) - .newFlags(new Flags()) - .build()); - } - - @Nested - @Disabled("JPA does not support saveDate.") - class SaveDateTests { - - } -} diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java new file mode 100644 index 0000000000..7258ba7a19 --- /dev/null +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java @@ -0,0 +1,135 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import java.time.Instant; +import java.util.List; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.memory.MemoryBlobStoreDAO; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.model.Mailbox; +import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.store.mail.AttachmentMapper; +import org.apache.james.mailbox.store.mail.MailboxMapper; +import org.apache.james.mailbox.store.mail.MessageIdMapper; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.MapperProvider; +import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; +import org.apache.james.utils.UpdatableTickingClock; + +import com.google.common.collect.ImmutableList; + +public class PostgresMapperProvider implements MapperProvider { + + private final PostgresMessageId.Factory messageIdFactory; + private final PostgresExtension postgresExtension; + private final UpdatableTickingClock updatableTickingClock; + private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + + public PostgresMapperProvider(PostgresExtension postgresExtension) { + this.postgresExtension = postgresExtension; + this.updatableTickingClock = new UpdatableTickingClock(Instant.now()); + this.messageIdFactory = new PostgresMessageId.Factory(); + this.blobIdFactory = new HashBlobId.Factory(); + this.blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory); + } + + @Override + public List<Capabilities> getSupportedCapabilities() { + return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, Capabilities.MESSAGE, Capabilities.MOVE, Capabilities.ATTACHMENT); + } + + @Override + public MailboxMapper createMailboxMapper() { + return new PostgresMailboxMapper(new PostgresMailboxDAO(postgresExtension.getPostgresExecutor())); + } + + @Override + public MessageMapper createMessageMapper() { + PostgresMailboxDAO mailboxDAO = new PostgresMailboxDAO(postgresExtension.getPostgresExecutor()); + + PostgresModSeqProvider modSeqProvider = new PostgresModSeqProvider(mailboxDAO); + PostgresUidProvider uidProvider = new PostgresUidProvider(mailboxDAO); + + return new PostgresMessageMapper( + postgresExtension.getPostgresExecutor(), + modSeqProvider, + uidProvider, + blobStore, + updatableTickingClock, + blobIdFactory); + } + + @Override + public MessageIdMapper createMessageIdMapper() { + throw new NotImplementedException("not implemented"); + } + + @Override + public AttachmentMapper createAttachmentMapper() { + throw new NotImplementedException("not implemented"); + } + + @Override + public MailboxId generateId() { + return PostgresMailboxId.generate(); + } + + @Override + public MessageUid generateMessageUid() { + throw new NotImplementedException("not implemented"); + } + + @Override + public ModSeq generateModSeq(Mailbox mailbox) { + throw new NotImplementedException("not implemented"); + } + + @Override + public ModSeq highestModSeq(Mailbox mailbox) { + throw new NotImplementedException("not implemented"); + } + + @Override + public boolean supportPartialAttachmentFetch() { + return false; + } + + @Override + public MessageId generateMessageId() { + return messageIdFactory.generate(); + } + + public UpdatableTickingClock getUpdatableTickingClock() { + return updatableTickingClock; + } +} diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java similarity index 61% copy from mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java copy to mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java index a8499468f1..55a6864e88 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapperTest.java @@ -19,24 +19,28 @@ package org.apache.james.mailbox.postgres.mail; -import org.apache.james.backends.jpa.JpaTestCluster; -import org.apache.james.mailbox.postgres.JPAMailboxFixture; +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule; import org.apache.james.mailbox.store.mail.model.MapperProvider; -import org.apache.james.mailbox.store.mail.model.MessageMoveTest; -import org.junit.jupiter.api.AfterEach; +import org.apache.james.mailbox.store.mail.model.MessageMapperTest; +import org.apache.james.utils.UpdatableTickingClock; +import org.junit.jupiter.api.extension.RegisterExtension; -class JpaMessageMoveTest extends MessageMoveTest { - - static final JpaTestCluster JPA_TEST_CLUSTER = JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES); +public class PostgresMessageMapperTest extends MessageMapperTest { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE); + + private PostgresMapperProvider postgresMapperProvider; @Override protected MapperProvider createMapperProvider() { - return new JPAMapperProvider(JPA_TEST_CLUSTER); + postgresMapperProvider = new PostgresMapperProvider(postgresExtension); + return postgresMapperProvider; } - - @AfterEach - void cleanUp() { - JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES); + + @Override + protected UpdatableTickingClock updatableTickingClock() { + return postgresMapperProvider.getUpdatableTickingClock(); } } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java similarity index 74% rename from mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java rename to mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java index a8499468f1..b9c87c578f 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/JpaMessageMoveTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMoveTest.java @@ -19,24 +19,19 @@ package org.apache.james.mailbox.postgres.mail; -import org.apache.james.backends.jpa.JpaTestCluster; -import org.apache.james.mailbox.postgres.JPAMailboxFixture; +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule; import org.apache.james.mailbox.store.mail.model.MapperProvider; import org.apache.james.mailbox.store.mail.model.MessageMoveTest; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; -class JpaMessageMoveTest extends MessageMoveTest { - - static final JpaTestCluster JPA_TEST_CLUSTER = JpaTestCluster.create(JPAMailboxFixture.MAILBOX_PERSISTANCE_CLASSES); +class PostgresMessageMoveTest extends MessageMoveTest { + + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withRowLevelSecurity(PostgresMailboxAggregateModule.MODULE); @Override protected MapperProvider createMapperProvider() { - return new JPAMapperProvider(JPA_TEST_CLUSTER); - } - - @AfterEach - void cleanUp() { - JPA_TEST_CLUSTER.clear(JPAMailboxFixture.MAILBOX_TABLE_NAMES); + return new PostgresMapperProvider(postgresExtension); } - } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java index ae93906c49..1eb901f3be 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java @@ -1217,7 +1217,7 @@ public abstract class MessageMapperTest { messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flag.DELETED), FlagsUpdateMode.ADD), - MessageRange.range(message2.getUid(), message4.getUid())); + MessageRange.range(message2.getUid(), message4.getUid())).forEachRemaining(any -> {}); List<MessageUid> uids = messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, MessageRange.all()); messageMapper.deleteMessages(benwaInboxMailbox, uids); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org