Repository: james-project
Updated Branches:
  refs/heads/master 6e8d222ac -> 4ac0a34af


JAMES-2540 Split RabbitMQ Mail Queue class in smaller chuncks

Have a package-visible class for each mail queue operation to help readability


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/994f5b1f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/994f5b1f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/994f5b1f

Branch: refs/heads/master
Commit: 994f5b1f27367f938156653f0d9c19e55532c189
Parents: fc03482
Author: Benoit Tellier <btell...@linagora.com>
Authored: Tue Sep 11 10:07:52 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Fri Sep 14 11:05:42 2018 +0700

----------------------------------------------------------------------
 .../apache/james/queue/rabbitmq/Dequeuer.java   | 182 ++++++++++++++++++
 .../apache/james/queue/rabbitmq/Enqueuer.java   |  79 ++++++++
 .../james/queue/rabbitmq/RabbitMQMailQueue.java | 189 ++-----------------
 3 files changed, 278 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
new file mode 100644
index 0000000..f5b690d
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -0,0 +1,182 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Date;
+import java.util.concurrent.Executors;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.core.MailAddress;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.server.core.MailImpl;
+import org.apache.james.util.SerializationUtil;
+import org.apache.mailet.Mail;
+import org.apache.mailet.PerRecipientHeaders;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+import com.rabbitmq.client.GetResponse;
+
+class Dequeuer {
+    private static class NoMailYetException extends RuntimeException {
+    }
+
+    private static class RabbitMQMailQueueItem implements 
MailQueue.MailQueueItem {
+        private final RabbitClient rabbitClient;
+        private final long deliveryTag;
+        private final Mail mail;
+
+        private RabbitMQMailQueueItem(RabbitClient rabbitClient, long 
deliveryTag, Mail mail) {
+            this.rabbitClient = rabbitClient;
+            this.deliveryTag = deliveryTag;
+            this.mail = mail;
+        }
+
+        @Override
+        public Mail getMail() {
+            return mail;
+        }
+
+        @Override
+        public void done(boolean success) throws MailQueue.MailQueueException {
+            try {
+                rabbitClient.ack(deliveryTag);
+            } catch (IOException e) {
+                throw new MailQueue.MailQueueException("Failed to ACK " + 
mail.getName() + " with delivery tag " + deliveryTag, e);
+            }
+        }
+    }
+
+    private static final int TEN_MS = 10;
+
+    private final MailQueueName name;
+    private final RabbitClient rabbitClient;
+    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
+    private final BlobId.Factory blobIdFactory;
+    private final ObjectMapper objectMapper;
+    private final Metric dequeueMetric;
+
+    Dequeuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, 
MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory, 
ObjectMapper objectMapper, MetricFactory metricFactory) {
+        this.name = name;
+        this.rabbitClient = rabbitClient;
+        this.mimeMessageStore = mimeMessageStore;
+        this.blobIdFactory = blobIdFactory;
+        this.objectMapper = objectMapper;
+        this.dequeueMetric = 
metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
+    }
+
+    MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException {
+        GetResponse getResponse = pollChannel();
+        MailDTO mailDTO = toDTO(getResponse);
+        Mail mail = toMail(mailDTO);
+        dequeueMetric.increment();
+        return new RabbitMQMailQueueItem(rabbitClient, 
getResponse.getEnvelope().getDeliveryTag(), mail);
+    }
+
+    private MailDTO toDTO(GetResponse getResponse) throws 
MailQueue.MailQueueException {
+        try {
+            return objectMapper.readValue(getResponse.getBody(), 
MailDTO.class);
+        } catch (IOException e) {
+            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
+        }
+    }
+
+    private GetResponse pollChannel() {
+        return new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())
+            .withFixedRate()
+            .withMinDelay(TEN_MS)
+            .retryOn(NoMailYetException.class)
+            .getWithRetry(this::singleChannelRead)
+            .join();
+    }
+
+    private GetResponse singleChannelRead() throws IOException {
+        return rabbitClient.poll(name)
+            .filter(getResponse -> getResponse.getBody() != null)
+            .orElseThrow(NoMailYetException::new);
+    }
+
+    private Mail toMail(MailDTO dto) throws MailQueue.MailQueueException {
+        try {
+            MimeMessage mimeMessage = mimeMessageStore.read(
+                MimeMessagePartsId.builder()
+                    .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId()))
+                    .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId()))
+                    .build())
+                .join();
+
+            MailImpl mail = new MailImpl(
+                dto.getName(),
+                dto.getSender().map(MailAddress::getMailSender).orElse(null),
+                dto.getRecipients()
+                    .stream()
+                    .map(Throwing.<String, 
MailAddress>function(MailAddress::new).sneakyThrow())
+                    .collect(Guavate.toImmutableList()),
+                mimeMessage);
+
+            mail.setErrorMessage(dto.getErrorMessage());
+            mail.setRemoteAddr(dto.getRemoteAddr());
+            mail.setRemoteHost(dto.getRemoteHost());
+            mail.setState(dto.getState());
+            dto.getLastUpdated()
+                .map(Instant::toEpochMilli)
+                .map(Date::new)
+                .ifPresent(mail::setLastUpdated);
+
+            dto.getAttributes()
+                .forEach((name, value) -> mail.setAttribute(name, 
SerializationUtil.<Serializable>deserialize(value)));
+
+            
mail.addAllSpecificHeaderForRecipient(retrievePerRecipientHeaders(dto));
+
+            return mail;
+        } catch (AddressException e) {
+            throw new MailQueue.MailQueueException("Failed to parse mail 
address", e);
+        } catch (MessagingException e) {
+            throw new MailQueue.MailQueueException("Failed to generate mime 
message", e);
+        }
+    }
+
+    private PerRecipientHeaders retrievePerRecipientHeaders(MailDTO dto) {
+        PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
+        dto.getPerRecipientHeaders()
+            .entrySet()
+            .stream()
+            .flatMap(entry -> entry.getValue().toHeaders().stream()
+                .map(Throwing.function(header -> Pair.of(new 
MailAddress(entry.getKey()), header))))
+            .forEach(pair -> 
perRecipientHeaders.addHeaderForRecipient(pair.getValue(), pair.getKey()));
+        return perRecipientHeaders;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
new file mode 100644
index 0000000..ba655f8
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -0,0 +1,79 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.mailet.Mail;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class Enqueuer {
+    private final MailQueueName name;
+    private final RabbitClient rabbitClient;
+    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
+    private final ObjectMapper objectMapper;
+    private final Metric enqueueMetric;
+
+    Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, 
MimeMessagePartsId> mimeMessageStore,
+             ObjectMapper objectMapper, MetricFactory metricFactory) {
+        this.name = name;
+        this.rabbitClient = rabbitClient;
+        this.mimeMessageStore = mimeMessageStore;
+        this.objectMapper = objectMapper;
+        this.enqueueMetric = 
metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString());
+    }
+
+    void enQueue(Mail mail) throws MailQueue.MailQueueException {
+        MimeMessagePartsId partsId = saveBlobs(mail).join();
+        MailDTO mailDTO = MailDTO.fromMail(mail, partsId);
+        byte[] message = getMessageBytes(mailDTO);
+        rabbitClient.publish(name, message);
+
+        enqueueMetric.increment();
+    }
+
+    private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws 
MailQueue.MailQueueException {
+        try {
+            return mimeMessageStore.save(mail.getMessage());
+        } catch (MessagingException e) {
+            throw new MailQueue.MailQueueException("Error while saving blob", 
e);
+        }
+    }
+
+    private byte[] getMessageBytes(MailDTO mailDTO) throws 
MailQueue.MailQueueException {
+        try {
+            return objectMapper.writeValueAsBytes(mailDTO);
+        } catch (JsonProcessingException e) {
+            throw new MailQueue.MailQueueException("Unable to serialize 
message", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 8f9b68e..74f6ef7 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -19,85 +19,39 @@
 
 package org.apache.james.queue.rabbitmq;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.time.Instant;
-import java.util.Date;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
-import javax.mail.MessagingException;
-import javax.mail.internet.AddressException;
 import javax.mail.internet.MimeMessage;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
-import org.apache.james.core.MailAddress;
 import org.apache.james.metrics.api.GaugeRegistry;
-import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.server.core.MailImpl;
-import org.apache.james.util.SerializationUtil;
 import org.apache.mailet.Mail;
-import org.apache.mailet.PerRecipientHeaders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.guava.GuavaModule;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-import com.rabbitmq.client.GetResponse;
 
 public class RabbitMQMailQueue implements MailQueue {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQMailQueue.class);
 
-    private static class NoMailYetException extends RuntimeException {
-    }
-
-    private static class RabbitMQMailQueueItem implements MailQueueItem {
-        private final RabbitClient rabbitClient;
-        private final long deliveryTag;
-        private final Mail mail;
-
-        private RabbitMQMailQueueItem(RabbitClient rabbitClient, long 
deliveryTag, Mail mail) {
-            this.rabbitClient = rabbitClient;
-            this.deliveryTag = deliveryTag;
-            this.mail = mail;
-        }
-
-        @Override
-        public Mail getMail() {
-            return mail;
-        }
-
-        @Override
-        public void done(boolean success) throws MailQueueException {
-            try {
-                rabbitClient.ack(deliveryTag);
-            } catch (IOException e) {
-                throw new MailQueueException("Failed to ACK " + mail.getName() 
+ " with delivery tag " + deliveryTag, e);
-            }
-        }
-    }
-
     static class Factory {
         private final MetricFactory metricFactory;
         private final GaugeRegistry gaugeRegistry;
         private final RabbitClient rabbitClient;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final BlobId.Factory blobIdFactory;
+        private final ObjectMapper objectMapper;
 
         @Inject
         @VisibleForTesting Factory(MetricFactory metricFactory, GaugeRegistry 
gaugeRegistry, RabbitClient rabbitClient,
@@ -107,40 +61,34 @@ public class RabbitMQMailQueue implements MailQueue {
             this.rabbitClient = rabbitClient;
             this.mimeMessageStore = mimeMessageStore;
             this.blobIdFactory = blobIdFactory;
+            this.objectMapper = new ObjectMapper()
+                .registerModule(new Jdk8Module())
+                .registerModule(new JavaTimeModule())
+                .registerModule(new GuavaModule());
         }
 
         RabbitMQMailQueue create(MailQueueName mailQueueName) {
-            return new RabbitMQMailQueue(metricFactory, gaugeRegistry, 
mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory);
+            return new RabbitMQMailQueue(metricFactory, gaugeRegistry, 
mailQueueName,
+                new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, 
objectMapper, metricFactory),
+                new Dequeuer(mailQueueName, rabbitClient, mimeMessageStore, 
blobIdFactory, objectMapper, metricFactory));
         }
     }
 
-    private static final int TEN_MS = 10;
-
     private final MailQueueName name;
-    private final RabbitClient rabbitClient;
-    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
-    private final BlobId.Factory blobIdFactory;
-    private final ObjectMapper objectMapper;
     private final MetricFactory metricFactory;
-    private final Metric enqueueMetric;
-    private final Metric dequeueMetric;
     private final GaugeRegistry gaugeRegistry;
+    private final Enqueuer enqueuer;
+    private final Dequeuer dequeuer;
+
+    RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry 
gaugeRegistry, MailQueueName name,
+                      Enqueuer enqueuer, Dequeuer dequeuer) {
 
-    RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry 
gaugeRegistry, MailQueueName name, RabbitClient rabbitClient,
-                      Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, 
BlobId.Factory blobIdFactory) {
-        this.mimeMessageStore = mimeMessageStore;
-        this.blobIdFactory = blobIdFactory;
         this.name = name;
-        this.rabbitClient = rabbitClient;
-        this.objectMapper = new ObjectMapper()
-            .registerModule(new Jdk8Module())
-            .registerModule(new JavaTimeModule())
-            .registerModule(new GuavaModule());
+        this.enqueuer = enqueuer;
+        this.dequeuer = dequeuer;
 
         this.metricFactory = metricFactory;
         this.gaugeRegistry = gaugeRegistry;
-        this.enqueueMetric = 
metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString());
-        this.dequeueMetric = 
metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
     }
 
     @Override
@@ -159,115 +107,12 @@ public class RabbitMQMailQueue implements MailQueue {
     @Override
     public void enQueue(Mail mail) throws MailQueueException {
         metricFactory.withMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
-            Throwing.runnable(() -> {
-                MimeMessagePartsId partsId = saveBlobs(mail).join();
-                MailDTO mailDTO = MailDTO.fromMail(mail, partsId);
-                byte[] message = getMessageBytes(mailDTO);
-                rabbitClient.publish(name, message);
-
-                enqueueMetric.increment();
-            }).sneakyThrow());
-    }
-
-    private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws 
MailQueueException {
-        try {
-            return mimeMessageStore.save(mail.getMessage());
-        } catch (MessagingException e) {
-            throw new MailQueueException("Error while saving blob", e);
-        }
-    }
-
-    private byte[] getMessageBytes(MailDTO mailDTO) throws MailQueueException {
-        try {
-            return objectMapper.writeValueAsBytes(mailDTO);
-        } catch (JsonProcessingException e) {
-            throw new MailQueueException("Unable to serialize message", e);
-        }
+            Throwing.runnable(() -> enqueuer.enQueue(mail)).sneakyThrow());
     }
 
     @Override
     public MailQueueItem deQueue() throws MailQueueException {
         return metricFactory.withMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
-            Throwing.supplier(() -> {
-                GetResponse getResponse = pollChannel();
-                MailDTO mailDTO = toDTO(getResponse);
-                Mail mail = toMail(mailDTO);
-                dequeueMetric.increment();
-                return new RabbitMQMailQueueItem(rabbitClient, 
getResponse.getEnvelope().getDeliveryTag(), mail);
-            }).sneakyThrow());
-    }
-
-    private MailDTO toDTO(GetResponse getResponse) throws MailQueueException {
-        try {
-            return objectMapper.readValue(getResponse.getBody(), 
MailDTO.class);
-        } catch (IOException e) {
-            throw new MailQueueException("Failed to parse DTO", e);
-        }
-    }
-
-    private GetResponse pollChannel() {
-        return new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())
-            .withFixedRate()
-            .withMinDelay(TEN_MS)
-            .retryOn(NoMailYetException.class)
-            .getWithRetry(this::singleChannelRead)
-            .join();
-    }
-
-    private GetResponse singleChannelRead() throws IOException {
-        return rabbitClient.poll(name)
-            .filter(getResponse -> getResponse.getBody() != null)
-            .orElseThrow(NoMailYetException::new);
-    }
-
-    private Mail toMail(MailDTO dto) throws MailQueueException {
-        try {
-            MimeMessage mimeMessage = mimeMessageStore.read(
-                MimeMessagePartsId.builder()
-                    .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId()))
-                    .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId()))
-                    .build())
-                .join();
-
-            MailImpl mail = new MailImpl(
-                dto.getName(),
-                dto.getSender().map(MailAddress::getMailSender).orElse(null),
-                dto.getRecipients()
-                    .stream()
-                    .map(Throwing.<String, 
MailAddress>function(MailAddress::new).sneakyThrow())
-                    .collect(Guavate.toImmutableList()),
-                mimeMessage);
-
-            mail.setErrorMessage(dto.getErrorMessage());
-            mail.setRemoteAddr(dto.getRemoteAddr());
-            mail.setRemoteHost(dto.getRemoteHost());
-            mail.setState(dto.getState());
-            dto.getLastUpdated()
-                .map(Instant::toEpochMilli)
-                .map(Date::new)
-                .ifPresent(mail::setLastUpdated);
-
-            dto.getAttributes()
-                .forEach((name, value) -> mail.setAttribute(name, 
SerializationUtil.<Serializable>deserialize(value)));
-
-            
mail.addAllSpecificHeaderForRecipient(retrievePerRecipientHeaders(dto));
-
-            return mail;
-        } catch (AddressException e) {
-            throw new MailQueueException("Failed to parse mail address", e);
-        } catch (MessagingException e) {
-            throw new MailQueueException("Failed to generate mime message", e);
-        }
-    }
-
-    private PerRecipientHeaders retrievePerRecipientHeaders(MailDTO dto) {
-        PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders();
-        dto.getPerRecipientHeaders()
-            .entrySet()
-            .stream()
-            .flatMap(entry -> entry.getValue().toHeaders().stream()
-                .map(Throwing.function(header -> Pair.of(new 
MailAddress(entry.getKey()), header))))
-            .forEach(pair -> 
perRecipientHeaders.addHeaderForRecipient(pair.getValue(), pair.getKey()));
-        return perRecipientHeaders;
+            Throwing.supplier(dequeuer::deQueue).sneakyThrow());
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to