This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dc4a5231ad2d7584b6d84f11ed66fc53892a9c66
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Wed Aug 21 17:04:20 2019 +0200

    JAMES-2873 Add Hostname into Created event
---
 .../init/CassandraZonedDateTimeModule.java         | 26 +++++----
 .../adapter/mailbox/ReIndexerManagementTest.java   |  4 +-
 .../cassandra/vacation/CassandraVacationDAO.java   |  2 +-
 .../routes/CassandraMappingsRoutesTest.java        |  3 +-
 .../routes/CassandraMigrationRoutesTest.java       |  3 +-
 .../james/webadmin/dto/ExecutionDetailsDto.java    |  2 +-
 .../james/webadmin/routes/TasksRoutesTest.java     |  3 +-
 .../routes/DeletedMessagesVaultRoutesTest.java     |  3 +-
 .../routes/EventDeadLettersRoutesTest.java         |  3 +-
 .../webadmin/routes/ReindexingRoutesTest.java      |  3 +-
 .../james/webadmin/routes/MailQueueRoutesTest.java |  3 +-
 .../webadmin/routes/MailQueueRoutesUnitTest.java   |  3 +-
 .../routes/MailRepositoriesRoutesTest.java         |  3 +-
 ...assandraTaskExecutionDetailsProjectionDAO.scala | 16 +++---
 ...andraTaskExecutionDetailsProjectionModule.scala |  2 +
 .../eventsourcing/distributed/TaskEventDTO.scala   |  7 +--
 .../distributed/DistributedTaskManagerTest.java    |  5 +-
 .../distributed/TaskEventsSerializationTest.java   |  4 +-
 .../org/apache/james/task/MemoryTaskManager.java   | 18 ++++---
 .../apache/james/task/TaskExecutionDetails.scala   | 62 ++++++++++++++--------
 .../james/task/eventsourcing/CommandHandlers.scala |  4 +-
 .../eventsourcing/EventSourcingTaskManager.scala   |  9 ++--
 .../apache/james/task/eventsourcing/Events.scala   |  6 ++-
 .../james/task/eventsourcing/TaskAggregate.scala   |  4 +-
 .../TaskExecutionDetailsProjection.scala           |  2 +-
 .../apache/james/task/MemoryTaskManagerTest.java   |  4 +-
 .../EventSourcingTaskManagerTest.java              | 25 +++++++--
 .../james/task/TaskExecutionDetailsFixture.scala   | 15 ++++--
 28 files changed, 163 insertions(+), 81 deletions(-)

diff --git 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java
 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java
index dfc0654..7c0a2da 100644
--- 
a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java
+++ 
b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java
@@ -42,19 +42,25 @@ public interface CassandraZonedDateTimeModule {
             .addColumn(TIME_ZONE, text()))
         .build();
 
+    static UDTValue toUDT(UserType zonedDateTimeUserType, ZonedDateTime 
zonedDateTime) {
+        ZonedDateTimeRepresentation representation = 
ZonedDateTimeRepresentation.fromZonedDateTime(zonedDateTime);
+        return zonedDateTimeUserType.newValue()
+            .setTimestamp(CassandraZonedDateTimeModule.DATE, 
representation.getDate())
+            .setString(CassandraZonedDateTimeModule.TIME_ZONE, 
representation.getSerializedZoneId());
+    }
+
     static Optional<UDTValue> toUDT(UserType zonedDateTimeUserType, 
Optional<ZonedDateTime> zonedDateTimeOptional) {
-        return 
zonedDateTimeOptional.map(ZonedDateTimeRepresentation::fromZonedDateTime)
-            .map(representation -> zonedDateTimeUserType.newValue()
-                .setTimestamp(CassandraZonedDateTimeModule.DATE, 
representation.getDate())
-                .setString(CassandraZonedDateTimeModule.TIME_ZONE, 
representation.getSerializedZoneId()));
+        return zonedDateTimeOptional.map(zonedDateTime -> 
toUDT(zonedDateTimeUserType, zonedDateTime));
     }
 
-    static Optional<ZonedDateTime> fromUDT(UDTValue value) {
-        return Optional.ofNullable(value)
-            .map(udtValue -> ZonedDateTimeRepresentation.fromDate(
-                    udtValue.getTimestamp(CassandraZonedDateTimeModule.DATE),
-                    udtValue.getString(CassandraZonedDateTimeModule.TIME_ZONE))
-                .getZonedDateTime());
+    static Optional<ZonedDateTime> fromUDTOptional(UDTValue value) {
+        return 
Optional.ofNullable(value).map(CassandraZonedDateTimeModule::fromUDT);
     }
 
+    static ZonedDateTime fromUDT(UDTValue udtValue) {
+        return ZonedDateTimeRepresentation.fromDate(
+                udtValue.getTimestamp(CassandraZonedDateTimeModule.DATE),
+                udtValue.getString(CassandraZonedDateTimeModule.TIME_ZONE))
+            .getZonedDateTime();
+    }
 }
diff --git 
a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java
 
b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java
index 276b613..db04843 100644
--- 
a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java
+++ 
b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java
@@ -31,6 +31,8 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -41,7 +43,7 @@ public class ReIndexerManagementTest {
 
     @BeforeEach
     void setUp() {
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
         reIndexer = mock(ReIndexer.class);
         testee = new ReIndexerManagement(taskManager, reIndexer);
     }
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java
index 6ef717d..ef8bac3 100644
--- 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java
@@ -98,7 +98,7 @@ public class CassandraVacationDAO {
     }
 
     private Optional<ZonedDateTime> retrieveDate(Row row, String dateField) {
-        return 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(dateField));
+        return 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(dateField));
     }
 
     private Insert createSpecificUpdate(VacationPatch vacationPatch, Insert 
baseInsert) {
diff --git 
a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
 
b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
index 2ffa702..1e8e5ed 100644
--- 
a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java
@@ -37,6 +37,7 @@ import 
org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration;
 import org.apache.james.rrt.lib.Mapping;
 import org.apache.james.rrt.lib.MappingSource;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.service.CassandraMappingsService;
@@ -76,7 +77,7 @@ class CassandraMappingsRoutesTest {
         CassandraMappingsService cassandraMappingsService = new 
CassandraMappingsService(mappingsSourcesMigration, cassandraMappingsSourcesDAO);
 
         JsonTransformer jsonTransformer = new JsonTransformer();
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
         webAdminServer = WebAdminUtils.createWebAdminServer(
                 new CassandraMappingsRoutes(cassandraMappingsService, 
taskManager, jsonTransformer),
                 new TasksRoutes(taskManager, jsonTransformer))
diff --git 
a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
 
b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
index ff1126b..0bd7a1d 100644
--- 
a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java
@@ -45,6 +45,7 @@ import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import org.apache.james.backends.cassandra.versions.SchemaTransition;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.utils.JsonTransformer;
@@ -80,7 +81,7 @@ public class CassandraMigrationRoutesTest {
         
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.empty()));
         when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty());
 
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
         JsonTransformer jsonTransformer = new JsonTransformer();
         webAdminServer = WebAdminUtils.createWebAdminServer(
                 new CassandraMigrationRoutes(new 
CassandraMigrationService(schemaVersionDAO, transitions, version -> new 
MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION),
diff --git 
a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java
 
b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java
index 7069e8b..243083c 100644
--- 
a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java
+++ 
b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java
@@ -65,7 +65,7 @@ public class ExecutionDetailsDto {
 
     @JsonInclude(JsonInclude.Include.NON_ABSENT)
     @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZ")
-    public Optional<ZonedDateTime> getSubmitDate() {
+    public ZonedDateTime getSubmitDate() {
         return executionDetails.getSubmitDate();
     }
 
diff --git 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
index cb1becf..a99feda 100644
--- 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
@@ -35,6 +35,7 @@ import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.utils.JsonTransformer;
@@ -53,7 +54,7 @@ class TasksRoutesTest {
 
     @BeforeEach
     void setUp() {
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
 
         webAdminServer = WebAdminUtils.createWebAdminServer(new 
TasksRoutes(taskManager, new JsonTransformer()))
             .start();
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index 3f948f5..b64d477 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -102,6 +102,7 @@ import 
org.apache.james.mailbox.model.MultimailboxesSearchQuery;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.user.memory.MemoryUsersRepository;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.apache.james.vault.DeletedMessage;
@@ -181,7 +182,7 @@ class DeletedMessagesVaultRoutesTest {
         InMemoryIntegrationResources inMemoryResource = 
InMemoryIntegrationResources.defaultResources();
         mailboxManager = spy(inMemoryResource.getMailboxManager());
 
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
         JsonTransformer jsonTransformer = new JsonTransformer();
 
         RestoreService vaultRestore = new RestoreService(vault, 
mailboxManager);
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index 67998c8..1fe7d60 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -51,6 +51,7 @@ import 
org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver;
 import org.apache.james.mailbox.util.EventCollector;
 import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.service.EventDeadLettersRedeliverService;
@@ -121,7 +122,7 @@ class EventDeadLettersRoutesTest {
         EventDeadLettersRedeliverService redeliverService = new 
EventDeadLettersRedeliverService(eventBus, deadLetters);
         EventDeadLettersService service = new 
EventDeadLettersService(redeliverService, deadLetters);
 
-        taskManager = new MemoryTaskManager();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
         webAdminServer = WebAdminUtils.createWebAdminServer(
                 new EventDeadLettersRoutes(service, eventSerializer, 
taskManager, jsonTransformer),
                 new TasksRoutes(taskManager, jsonTransformer))
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java
index 65ea398..e5925b6 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java
@@ -47,6 +47,7 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.service.PreviousReIndexingService;
@@ -79,7 +80,7 @@ class ReindexingRoutesTest {
     @BeforeEach
     void beforeEach() {
         mailboxManager = 
InMemoryIntegrationResources.defaultResources().getMailboxManager();
-        MemoryTaskManager taskManager = new MemoryTaskManager();
+        MemoryTaskManager taskManager = new MemoryTaskManager(new 
Hostname("foo"));
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         searchIndex = mock(ListeningMessageSearchIndex.class);
         ReIndexer reIndexer = new ReIndexerImpl(
diff --git 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
index 83fe246..ec62b10 100644
--- 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java
@@ -45,6 +45,7 @@ import org.apache.james.queue.memory.MemoryMailQueueFactory;
 import org.apache.james.queue.memory.MemoryMailQueueFactory.MemoryMailQueue;
 import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.task.TaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
 import org.apache.james.webadmin.service.ClearMailQueueTask;
@@ -85,7 +86,7 @@ class MailQueueRoutesTest {
 
 
     WebAdminServer createServer(MemoryMailQueueFactory mailQueueFactory) {
-        TaskManager taskManager = new MemoryTaskManager();
+        TaskManager taskManager = new MemoryTaskManager(new Hostname("foo"));
         JsonTransformer jsonTransformer = new JsonTransformer();
 
         return WebAdminUtils.createWebAdminServer(
diff --git 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java
 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java
index 5e1ef9e..872c213 100644
--- 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.webadmin.utils.JsonTransformer;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +37,7 @@ public class MailQueueRoutesUnitTest {
 
     @Before
     public void setup() {
-        MemoryTaskManager taskManager = new MemoryTaskManager();
+        MemoryTaskManager taskManager = new MemoryTaskManager(new 
Hostname("foo"));
         MailQueueFactory<ManageableMailQueue> mailQueueFactory = null;
         testee = new MailQueueRoutes(mailQueueFactory, new JsonTransformer(), 
taskManager);
     }
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 05c713c..6ff48e5 100644
--- 
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -64,6 +64,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.apache.james.queue.memory.MemoryMailQueueFactory;
 import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.util.ClassLoaderUtils;
 import org.apache.james.webadmin.Constants;
 import org.apache.james.webadmin.WebAdminServer;
@@ -111,7 +112,7 @@ public class MailRepositoriesRoutesTest {
     public void setUp() throws Exception {
         createMailRepositoryStore();
 
-        MemoryTaskManager taskManager = new MemoryTaskManager();
+        MemoryTaskManager taskManager = new MemoryTaskManager(new 
Hostname("foo"));
         JsonTransformer jsonTransformer = new JsonTransformer();
         MailQueueFactory<ManageableMailQueue> queueFactory = new 
MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
         spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL);
diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index 1dff42f..e6b7aef 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -27,6 +27,7 @@ import javax.inject.Inject
 import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, 
CassandraZonedDateTimeModule}
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
+import org.apache.james.task.eventsourcing.Hostname
 import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManager}
 import reactor.core.publisher.{Flux, Mono}
 
@@ -41,6 +42,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
     .value(TYPE, bindMarker(TYPE))
     .value(STATUS, bindMarker(STATUS))
     .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE))
+    .value(SUBMITTED_NODE, bindMarker(SUBMITTED_NODE))
     .value(STARTED_DATE, bindMarker(STARTED_DATE))
     .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
     .value(CANCELED_DATE, bindMarker(CANCELED_DATE))
@@ -56,7 +58,8 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
       .setUUID(TASK_ID, details.getTaskId.getValue)
       .setString(TYPE, details.getType)
       .setString(STATUS, details.getStatus.getValue)
-      .setUDTValue(SUBMITTED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, 
details.getSubmitDate).orElse(null))
+      .setUDTValue(SUBMITTED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate))
+      .setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
       .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getStartedDate).orElse(null))
       .setUDTValue(COMPLETED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, 
details.getCompletedDate).orElse(null))
       .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getCanceledDate).orElse(null))
@@ -74,10 +77,11 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
     taskId = TaskId.fromUUID(row.getUUID(TASK_ID)),
     `type` = row.getString(TYPE),
     status = TaskManager.Status.fromString(row.getString(STATUS)),
-    submitDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
-    startedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(STARTED_DATE)),
-    completedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(COMPLETED_DATE)),
-    canceledDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(CANCELED_DATE)),
-    failedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(FAILED_DATE)),
+    submittedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
+    submittedNode = Hostname(row.getString(SUBMITTED_NODE)),
+    startedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)),
+    completedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)),
+    canceledDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
+    failedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
     additionalInformation = Optional.empty)
 }
diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
index 4b3e681..2f7300d 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -31,6 +31,7 @@ object CassandraTaskExecutionDetailsProjectionTable {
   val TYPE: String = "type"
   val STATUS: String = "status"
   val SUBMITTED_DATE: String = "submittedDate"
+  val SUBMITTED_NODE: String = "submittedNode"
   val STARTED_DATE: String = "startedDate"
   val COMPLETED_DATE: String = "completedDate"
   val CANCELED_DATE: String = "canceledDate"
@@ -51,6 +52,7 @@ object CassandraTaskExecutionDetailsProjectionModule {
       .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text)
       .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text)
       
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, 
text)
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index 3bbcdda..63d9afe 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -34,14 +34,15 @@ sealed abstract class TaskEventDTO(val getType: String, val 
getAggregate: String
 case class CreatedDTO(@JsonProperty("type") typeName: String,
                       @JsonProperty("aggregate") aggregateId: String,
                       @JsonProperty("event") eventId: Int,
-                      @JsonProperty("task") getTask: String)
+                      @JsonProperty("task") getTask: String,
+                      @JsonProperty("hostname") getHostname: String)
   extends TaskEventDTO(typeName, aggregateId, eventId) {
-  def toDomainObject(serializer: JsonTaskSerializer): Created = 
Created(domainAggregateId, domainEventId, serializer.deserialize(getTask))
+  def toDomainObject(serializer: JsonTaskSerializer): Created = 
Created(domainAggregateId, domainEventId, serializer.deserialize(getTask), 
Hostname(getHostname))
 }
 
 object CreatedDTO {
   def fromDomainObject(event: Created, typeName: String, serializer: 
JsonTaskSerializer): CreatedDTO =
-    CreatedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize(), serializer.serialize(event.task))
+    CreatedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize(), serializer.serialize(event.task), 
event.hostname.asString)
 }
 
 case class StartedDTO(@JsonProperty("type") typeName: String,
diff --git 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index ce43266..d84243c 100644
--- 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
 import org.apache.james.task.eventsourcing.WorkQueueSupplier;
 import org.apache.james.task.eventsourcing.WorkerStatusListener;
@@ -82,8 +83,8 @@ class DistributedTaskManagerTest {
             TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
             return new MemoryWorkQueue(worker);
         };
-        TaskManager taskManager1 = new 
EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection);
-        TaskManager taskManager2 = new 
EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection);
+        TaskManager taskManager1 = new 
EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection, new Hostname("foo"));
+        TaskManager taskManager2 = new 
EventSourcingTaskManager(workQueueSupplier, eventStore, 
executionDetailsProjection, new Hostname("bar"));
 
         TaskId taskId = taskManager1.submit(new CompletedTask());
         Awaitility.await()
diff --git 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
index 53952fd..183e10e 100644
--- 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
+++ 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
@@ -40,6 +40,7 @@ import org.apache.james.task.eventsourcing.Cancelled;
 import org.apache.james.task.eventsourcing.CancelRequested;
 import org.apache.james.task.eventsourcing.Completed;
 import org.apache.james.task.eventsourcing.Failed;
+import org.apache.james.task.eventsourcing.Hostname;
 import org.apache.james.task.eventsourcing.Started;
 import org.apache.james.task.eventsourcing.TaskAggregateId;
 import org.apache.james.task.eventsourcing.TaskEvent;
@@ -54,6 +55,7 @@ class TaskEventsSerializationTest {
     private static final TaskAggregateId AGGREGATE_ID = new 
TaskAggregateId(TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd"));
     private static final EventId EVENT_ID = EventId.fromSerialized(42);
     private static final Task TASK = new CompletedTask();
+    private static final Hostname HOSTNAME = new Hostname("foo");
 
     @ParameterizedTest
     @MethodSource
@@ -77,7 +79,7 @@ class TaskEventsSerializationTest {
 
     private static Stream<Arguments> validTasks() throws Exception {
         return Stream.of(
-            Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK), 
"{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"),
+            Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), 
"{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
             Arguments.of(new Started(AGGREGATE_ID, EVENT_ID), 
"{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\"}"),
             Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), 
"{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, 
Task.Result.COMPLETED), 
"{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
diff --git 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java 
b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index f972e39..2e71938 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
+import org.apache.james.task.eventsourcing.Hostname;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
@@ -78,22 +81,25 @@ public class MemoryTaskManager implements TaskManager {
         }
     }
 
-    public static final Duration AWAIT_POLLING_DURATION = 
Duration.ofMillis(500);
+    private static final Duration AWAIT_POLLING_DURATION = 
Duration.ofMillis(500);
     public static final Duration NOW = Duration.ZERO;
+
+    private final Hostname hostname;
     private final WorkQueue workQueue;
     private final TaskManagerWorker worker;
     private final ConcurrentHashMap<TaskId, TaskExecutionDetails> 
idToExecutionDetails;
 
-    public MemoryTaskManager() {
-
-        idToExecutionDetails = new ConcurrentHashMap<>();
-        worker = new SerialTaskManagerWorker(updater());
+    @Inject
+    public MemoryTaskManager(Hostname hostname) {
+        this.hostname = hostname;
+        this.idToExecutionDetails = new ConcurrentHashMap<>();
+        this.worker = new SerialTaskManagerWorker(updater());
         workQueue = new MemoryWorkQueue(worker);
     }
 
     public TaskId submit(Task task) {
         TaskId taskId = TaskId.generateTaskId();
-        TaskExecutionDetails executionDetails = 
TaskExecutionDetails.from(task, taskId);
+        TaskExecutionDetails executionDetails = 
TaskExecutionDetails.from(task, taskId, hostname);
         idToExecutionDetails.put(taskId, executionDetails);
         workQueue.submit(new TaskWithId(taskId, task));
         return taskId;
diff --git 
a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala 
b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
index 082694c..4e9f20d 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
@@ -23,6 +23,7 @@ import java.time.ZonedDateTime
 import java.util.{Objects, Optional}
 
 import org.apache.james.task.TaskManager.Status._
+import org.apache.james.task.eventsourcing.Hostname
 
 import com.google.common.base.MoreObjects
 
@@ -30,14 +31,15 @@ object TaskExecutionDetails {
 
   trait AdditionalInformation {}
 
-  def from(task: Task, id: TaskId) = new TaskExecutionDetails(id, task.`type`, 
() => task.details, WAITING, submitDate = Optional.of(ZonedDateTime.now))
+  def from(task: Task, id: TaskId, hostname: Hostname) = new 
TaskExecutionDetails(id, task.`type`, WAITING, submittedDate = 
ZonedDateTime.now, submittedNode = hostname, () => task.details)
 }
 
 class TaskExecutionDetails(val taskId: TaskId,
                            private val `type`: String,
-                           private val additionalInformation: () => 
Optional[TaskExecutionDetails.AdditionalInformation],
                            private val status: TaskManager.Status,
-                           private val submitDate: Optional[ZonedDateTime] = 
Optional.empty(),
+                           private val submittedDate: ZonedDateTime,
+                           private val submittedNode: Hostname,
+                           private val additionalInformation: () => 
Optional[TaskExecutionDetails.AdditionalInformation],
                            private val startedDate: Optional[ZonedDateTime] = 
Optional.empty(),
                            private val completedDate: Optional[ZonedDateTime] 
= Optional.empty(),
                            private val canceledDate: Optional[ZonedDateTime] = 
Optional.empty(),
@@ -50,7 +52,9 @@ class TaskExecutionDetails(val taskId: TaskId,
 
   def getAdditionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation] = additionalInformation()
 
-  def getSubmitDate: Optional[ZonedDateTime] = submitDate
+  def getSubmitDate: ZonedDateTime = submittedDate
+
+  def getSubmittedNode: Hostname = submittedNode
 
   def getStartedDate: Optional[ZonedDateTime] = startedDate
 
@@ -100,7 +104,8 @@ class TaskExecutionDetails(val taskId: TaskId,
         Objects.equals(`type`, that.`type`) &&
         Objects.equals(additionalInformation(), that.additionalInformation()) 
&&
         Objects.equals(status, that.status) &&
-        Objects.equals(submitDate, that.submitDate) &&
+        Objects.equals(submittedDate, that.submittedDate) &&
+        Objects.equals(submittedNode, that.submittedNode) &&
         Objects.equals(startedDate, that.startedDate) &&
         Objects.equals(completedDate, that.completedDate) &&
         Objects.equals(canceledDate, that.canceledDate) &&
@@ -109,38 +114,49 @@ class TaskExecutionDetails(val taskId: TaskId,
   }
 
   override def hashCode(): Int =
-    Objects.hash(taskId, `type`, additionalInformation(), status, submitDate, 
startedDate, completedDate, canceledDate, failedDate)
+    Objects.hash(taskId, `type`, additionalInformation(), status, 
submittedDate, submittedNode, startedDate, completedDate, canceledDate, 
failedDate)
 
   override def toString: String =
     MoreObjects.toStringHelper(this)
       .add("taskId", taskId)
       .add("type", `type`)
-      .add("", additionalInformation())
-      .add("", status)
-      .add("", submitDate)
-      .add("", startedDate)
-      .add("", completedDate)
-      .add("", canceledDate)
-      .add("", failedDate)
+      .add("additionalInformation", additionalInformation())
+      .add("status", status)
+      .add("submittedDate", submittedDate)
+      .add("submittedNode", submittedNode)
+      .add("startedDate", startedDate)
+      .add("completedDate", completedDate)
+      .add("canceledDate", canceledDate)
+      .add("failedDate", failedDate)
       .toString
 
-  private def start = new TaskExecutionDetails(taskId, `type`, 
additionalInformation, IN_PROGRESS,
-    submitDate = submitDate,
+  private def start = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS,
+    submittedDate = submittedDate,
+    submittedNode = submittedNode,
+    additionalInformation = additionalInformation,
     startedDate = Optional.of(ZonedDateTime.now))
-  private def complete = new TaskExecutionDetails(taskId, `type`, 
additionalInformation, TaskManager.Status.COMPLETED,
-    submitDate = submitDate,
+  private def complete = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.COMPLETED,
+    submittedDate = submittedDate,
+    submittedNode = submittedNode,
     startedDate = startedDate,
+    additionalInformation = additionalInformation,
     completedDate = Optional.of(ZonedDateTime.now))
-  private def fail = new TaskExecutionDetails(taskId, `type`, 
additionalInformation, TaskManager.Status.FAILED,
-    submitDate = submitDate,
+  private def fail = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.FAILED,
+    submittedDate = submittedDate,
+    submittedNode = submittedNode,
     startedDate = startedDate,
+    additionalInformation = additionalInformation,
     failedDate = Optional.of(ZonedDateTime.now))
-  private def requestCancel = new TaskExecutionDetails(taskId, `type`, 
additionalInformation, TaskManager.Status.CANCEL_REQUESTED,
-    submitDate = submitDate,
+  private def requestCancel = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.CANCEL_REQUESTED,
+    submittedDate = submittedDate,
+    submittedNode = submittedNode,
+    additionalInformation = additionalInformation,
     startedDate = startedDate,
     canceledDate = Optional.of(ZonedDateTime.now))
-  private def cancel = new TaskExecutionDetails(taskId, `type`, 
additionalInformation, TaskManager.Status.CANCELLED,
-    submitDate = submitDate,
+  private def cancel = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.CANCELLED,
+    submittedDate = submittedDate,
+    submittedNode = submittedNode,
+    additionalInformation = additionalInformation,
     startedDate = startedDate,
     canceledDate = Optional.of(ZonedDateTime.now))
 }
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 1bb8abf..905f1ec 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -33,11 +33,11 @@ sealed abstract class TaskCommandHandler[T <: TaskCommand] 
extends CommandHandle
   }
 }
 
-class CreateCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Create] {
+class CreateCommandHandler(private val loadHistory: TaskAggregateId => 
History, hostname: Hostname) extends TaskCommandHandler[Create] {
   override def handledClass: Class[Create] = classOf[Create]
 
   override def handle(command: Create): util.List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).create(command.task)
+    loadAggregate(loadHistory, command.id).create(command.task, hostname)
   }
 }
 
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 34f55cf..d091197 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -33,12 +33,13 @@ import scala.annotation.tailrec
 class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing](
                                                                                
   workQueueSupplier: WorkQueueSupplier,
                                                                                
   val eventStore: EventStore,
-                                                                               
   val executionDetailsProjection: TaskExecutionDetailsProjection) extends 
TaskManager with Closeable {
+                                                                               
   val executionDetailsProjection: TaskExecutionDetailsProjection,
+                                                                               
   val hostname: Hostname) extends TaskManager with Closeable {
 
   private val delayBetweenPollingInMs = 500
 
   private def workDispatcher: Subscriber = {
-    case Created(aggregateId, _, task) =>
+    case Created(aggregateId, _, task, _) =>
       val taskWithId = new TaskWithId(aggregateId.taskId, task)
       workQueue.submit(taskWithId)
     case CancelRequested(aggregateId, _) =>
@@ -51,7 +52,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
   private val loadHistory: AggregateId => History = 
eventStore.getEventsOfAggregate _
   private val eventSourcingSystem = ScalaEventSourcingSystem(
     handlers = Set(
-      new CreateCommandHandler(loadHistory),
+      new CreateCommandHandler(loadHistory, hostname),
       new StartCommandHandler(loadHistory),
       new RequestCancelCommandHandler(loadHistory),
       new CompleteCommandHandler(loadHistory),
@@ -103,4 +104,4 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
   override def close(): Unit = {
     workQueue.close()
   }
-}
\ No newline at end of file
+}
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
index 2536b38..8f8bb40 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -26,7 +26,11 @@ sealed abstract class TaskEvent(aggregateId: 
TaskAggregateId, val eventId: Event
   override def getAggregateId: TaskAggregateId = aggregateId
 }
 
-case class Created(aggregateId: TaskAggregateId, override val eventId: 
EventId, task: Task) extends TaskEvent(aggregateId, eventId)
+case class Hostname(private val value: String) {
+  def asString: String = value
+}
+
+case class Created(aggregateId: TaskAggregateId, override val eventId: 
EventId, task: Task, hostname: Hostname) extends TaskEvent(aggregateId, eventId)
 
 case class Started(aggregateId: TaskAggregateId, override val eventId: 
EventId) extends TaskEvent(aggregateId, eventId)
 
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 5fd5875..2013d91 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -37,9 +37,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, 
private val histor
     .status
 
 
-  def create(task: Task): util.List[Event] = {
+  def create(task: Task, hostname: Hostname): util.List[Event] = {
     if (currentStatus.isEmpty) {
-      createEventWithId(Created(aggregateId, _, task))
+      createEventWithId(Created(aggregateId, _, task, hostname))
     } else Nil.asJava
   }
 
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 918a5e0..65f0aca 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -27,7 +27,7 @@ import collection.JavaConverters._
 trait TaskExecutionDetailsProjection {
   val asSubscriber: Subscriber = {
     case created: Created =>
-      update(TaskExecutionDetails.from(created.task, 
created.aggregateId.taskId))
+      update(TaskExecutionDetails.from(created.task, 
created.aggregateId.taskId, created.hostname))
     case cancelRequested: CancelRequested =>
       update(cancelRequested.aggregateId.taskId)(_.cancelRequested)
     case started: Started =>
diff --git 
a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java 
b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index 50cdf51..01c5eb1 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.task;
 
+import org.apache.james.task.eventsourcing.Hostname;
+
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -30,7 +32,7 @@ class MemoryTaskManagerTest implements TaskManagerContract {
 
     @BeforeEach
     void setUp() {
-        memoryTaskManager = new MemoryTaskManager();
+        memoryTaskManager = new MemoryTaskManager(new Hostname("foo"));
     }
 
     @AfterEach
diff --git 
a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index 63c62b0..867665f 100644
--- 
a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -19,33 +19,41 @@
 
 package org.apache.james.task.eventsourcing;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore;
 import org.apache.james.task.CountDownLatchExtension;
 import org.apache.james.task.MemoryWorkQueue;
 import org.apache.james.task.SerialTaskManagerWorker;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.TaskManagerWorker;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(CountDownLatchExtension.class)
 class EventSourcingTaskManagerTest implements TaskManagerContract {
 
+    private static final Hostname HOSTNAME = new Hostname("foo");
     private EventSourcingTaskManager taskManager;
+    private EventStore eventStore;
 
     @BeforeEach
     void setUp() {
-        EventStore eventStore = new InMemoryEventStore();
+        eventStore = new InMemoryEventStore();
         TaskExecutionDetailsProjection executionDetailsProjection = new 
MemoryTaskExecutionDetailsProjection();
         WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
             WorkerStatusListener listener = new 
WorkerStatusListener(eventSourcingSystem);
             TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
             return new MemoryWorkQueue(worker);
         };
-        taskManager = new EventSourcingTaskManager(workQueueSupplier, 
eventStore, executionDetailsProjection);
+        taskManager = new EventSourcingTaskManager(workQueueSupplier, 
eventStore, executionDetailsProjection, HOSTNAME);
     }
 
     @AfterEach
@@ -57,4 +65,15 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
     public TaskManager taskManager() {
         return taskManager;
     }
-}
\ No newline at end of file
+
+    @Test
+    void createdTaskShouldKeepOriginHostname() {
+        TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED);
+        TaskAggregateId aggregateId = new TaskAggregateId(taskId);
+        assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents())
+                .filteredOn(event -> event instanceof Created)
+                .extracting("hostname")
+                .containsOnly(HOSTNAME);
+    }
+
+}
diff --git 
a/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala
 
b/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala
index a3c5b38..ddcf176 100644
--- 
a/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala
+++ 
b/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala
@@ -18,22 +18,29 @@
  * ***************************************************************/
 package org.apache.james.task
 
+import java.time.{LocalDateTime, ZoneId, ZonedDateTime}
 import java.util.Optional
 
 import org.apache.james.task.TaskExecutionDetails.AdditionalInformation
+import org.apache.james.task.eventsourcing.Hostname
 
 object TaskExecutionDetailsFixture {
+  val SUBMITTED_DATE = ZonedDateTime.of(LocalDateTime.of(2000, 1, 1, 0, 0), 
ZoneId.of("Europe/Paris"))
+  val SUBMITTED_NODE = Hostname("foo")
+  val SUBMITTED_DATE_2 = ZonedDateTime.of(LocalDateTime.of(2011, 11, 11, 11, 
11), ZoneId.of("Europe/Paris"))
+  val SUBMITTED_NODE_2 = Hostname("bar")
   val TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd")
   val TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe")
   val ADDITIONAL_INFORMATION: () => Optional[AdditionalInformation] = 
Optional.empty
 
-  val TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", 
ADDITIONAL_INFORMATION, TaskManager.Status.COMPLETED)
-  val TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", 
ADDITIONAL_INFORMATION, TaskManager.Status.COMPLETED)
-  val TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, 
"type", ADDITIONAL_INFORMATION, TaskManager.Status.FAILED)
+  val TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", 
TaskManager.Status.COMPLETED, SUBMITTED_DATE, SUBMITTED_NODE, 
ADDITIONAL_INFORMATION)
+  val TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", 
TaskManager.Status.COMPLETED, SUBMITTED_DATE, SUBMITTED_NODE, 
ADDITIONAL_INFORMATION)
+  val TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, 
"type", TaskManager.Status.FAILED, SUBMITTED_DATE, SUBMITTED_NODE, 
ADDITIONAL_INFORMATION)
 
 
   val ADDITIONAL_INFORMATION_2: () => Optional[AdditionalInformation] = () => 
Optional.of(new CustomAdditionalInformation("hello"))
-  val TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION = new 
TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION_2, 
TaskManager.Status.COMPLETED)
+  val TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION = new 
TaskExecutionDetails(TASK_ID, "type", TaskManager.Status.COMPLETED, 
SUBMITTED_DATE_2, SUBMITTED_NODE_2, ADDITIONAL_INFORMATION)
+
 }
 
 case class CustomAdditionalInformation(value: String) extends 
AdditionalInformation


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to