This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dc4a5231ad2d7584b6d84f11ed66fc53892a9c66 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Wed Aug 21 17:04:20 2019 +0200 JAMES-2873 Add Hostname into Created event --- .../init/CassandraZonedDateTimeModule.java | 26 +++++---- .../adapter/mailbox/ReIndexerManagementTest.java | 4 +- .../cassandra/vacation/CassandraVacationDAO.java | 2 +- .../routes/CassandraMappingsRoutesTest.java | 3 +- .../routes/CassandraMigrationRoutesTest.java | 3 +- .../james/webadmin/dto/ExecutionDetailsDto.java | 2 +- .../james/webadmin/routes/TasksRoutesTest.java | 3 +- .../routes/DeletedMessagesVaultRoutesTest.java | 3 +- .../routes/EventDeadLettersRoutesTest.java | 3 +- .../webadmin/routes/ReindexingRoutesTest.java | 3 +- .../james/webadmin/routes/MailQueueRoutesTest.java | 3 +- .../webadmin/routes/MailQueueRoutesUnitTest.java | 3 +- .../routes/MailRepositoriesRoutesTest.java | 3 +- ...assandraTaskExecutionDetailsProjectionDAO.scala | 16 +++--- ...andraTaskExecutionDetailsProjectionModule.scala | 2 + .../eventsourcing/distributed/TaskEventDTO.scala | 7 +-- .../distributed/DistributedTaskManagerTest.java | 5 +- .../distributed/TaskEventsSerializationTest.java | 4 +- .../org/apache/james/task/MemoryTaskManager.java | 18 ++++--- .../apache/james/task/TaskExecutionDetails.scala | 62 ++++++++++++++-------- .../james/task/eventsourcing/CommandHandlers.scala | 4 +- .../eventsourcing/EventSourcingTaskManager.scala | 9 ++-- .../apache/james/task/eventsourcing/Events.scala | 6 ++- .../james/task/eventsourcing/TaskAggregate.scala | 4 +- .../TaskExecutionDetailsProjection.scala | 2 +- .../apache/james/task/MemoryTaskManagerTest.java | 4 +- .../EventSourcingTaskManagerTest.java | 25 +++++++-- .../james/task/TaskExecutionDetailsFixture.scala | 15 ++++-- 28 files changed, 163 insertions(+), 81 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java index dfc0654..7c0a2da 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/CassandraZonedDateTimeModule.java @@ -42,19 +42,25 @@ public interface CassandraZonedDateTimeModule { .addColumn(TIME_ZONE, text())) .build(); + static UDTValue toUDT(UserType zonedDateTimeUserType, ZonedDateTime zonedDateTime) { + ZonedDateTimeRepresentation representation = ZonedDateTimeRepresentation.fromZonedDateTime(zonedDateTime); + return zonedDateTimeUserType.newValue() + .setTimestamp(CassandraZonedDateTimeModule.DATE, representation.getDate()) + .setString(CassandraZonedDateTimeModule.TIME_ZONE, representation.getSerializedZoneId()); + } + static Optional<UDTValue> toUDT(UserType zonedDateTimeUserType, Optional<ZonedDateTime> zonedDateTimeOptional) { - return zonedDateTimeOptional.map(ZonedDateTimeRepresentation::fromZonedDateTime) - .map(representation -> zonedDateTimeUserType.newValue() - .setTimestamp(CassandraZonedDateTimeModule.DATE, representation.getDate()) - .setString(CassandraZonedDateTimeModule.TIME_ZONE, representation.getSerializedZoneId())); + return zonedDateTimeOptional.map(zonedDateTime -> toUDT(zonedDateTimeUserType, zonedDateTime)); } - static Optional<ZonedDateTime> fromUDT(UDTValue value) { - return Optional.ofNullable(value) - .map(udtValue -> ZonedDateTimeRepresentation.fromDate( - udtValue.getTimestamp(CassandraZonedDateTimeModule.DATE), - udtValue.getString(CassandraZonedDateTimeModule.TIME_ZONE)) - .getZonedDateTime()); + static Optional<ZonedDateTime> fromUDTOptional(UDTValue value) { + return Optional.ofNullable(value).map(CassandraZonedDateTimeModule::fromUDT); } + static ZonedDateTime fromUDT(UDTValue udtValue) { + return ZonedDateTimeRepresentation.fromDate( + udtValue.getTimestamp(CassandraZonedDateTimeModule.DATE), + udtValue.getString(CassandraZonedDateTimeModule.TIME_ZONE)) + .getZonedDateTime(); + } } diff --git a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java index 276b613..db04843 100644 --- a/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java +++ b/server/container/mailbox-jmx/src/test/java/org/apache/james/adapter/mailbox/ReIndexerManagementTest.java @@ -31,6 +31,8 @@ import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.task.MemoryTaskManager; import org.apache.james.task.Task; import org.apache.james.task.TaskManager; +import org.apache.james.task.eventsourcing.Hostname; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,7 +43,7 @@ public class ReIndexerManagementTest { @BeforeEach void setUp() { - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); reIndexer = mock(ReIndexer.class); testee = new ReIndexerManagement(taskManager, reIndexer); } diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java index 6ef717d..ef8bac3 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/vacation/CassandraVacationDAO.java @@ -98,7 +98,7 @@ public class CassandraVacationDAO { } private Optional<ZonedDateTime> retrieveDate(Row row, String dateField) { - return CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(dateField)); + return CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(dateField)); } private Insert createSpecificUpdate(VacationPatch vacationPatch, Insert baseInsert) { diff --git a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java index 2ffa702..1e8e5ed 100644 --- a/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra-data/src/test/java/org/apache/james/webadmin/routes/CassandraMappingsRoutesTest.java @@ -37,6 +37,7 @@ import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration; import org.apache.james.rrt.lib.Mapping; import org.apache.james.rrt.lib.MappingSource; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.service.CassandraMappingsService; @@ -76,7 +77,7 @@ class CassandraMappingsRoutesTest { CassandraMappingsService cassandraMappingsService = new CassandraMappingsService(mappingsSourcesMigration, cassandraMappingsSourcesDAO); JsonTransformer jsonTransformer = new JsonTransformer(); - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); webAdminServer = WebAdminUtils.createWebAdminServer( new CassandraMappingsRoutes(cassandraMappingsService, taskManager, jsonTransformer), new TasksRoutes(taskManager, jsonTransformer)) diff --git a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java index ff1126b..0bd7a1d 100644 --- a/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java +++ b/server/protocols/webadmin/webadmin-cassandra/src/test/java/org/apache/james/webadmin/routes/CassandraMigrationRoutesTest.java @@ -45,6 +45,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.SchemaTransition; import org.apache.james.backends.cassandra.versions.SchemaVersion; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.utils.JsonTransformer; @@ -80,7 +81,7 @@ public class CassandraMigrationRoutesTest { when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.empty())); when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty()); - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); JsonTransformer jsonTransformer = new JsonTransformer(); webAdminServer = WebAdminUtils.createWebAdminServer( new CassandraMigrationRoutes(new CassandraMigrationService(schemaVersionDAO, transitions, version -> new MigrationTask(schemaVersionDAO, transitions, version), LATEST_VERSION), diff --git a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java index 7069e8b..243083c 100644 --- a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java +++ b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/dto/ExecutionDetailsDto.java @@ -65,7 +65,7 @@ public class ExecutionDetailsDto { @JsonInclude(JsonInclude.Include.NON_ABSENT) @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ") - public Optional<ZonedDateTime> getSubmitDate() { + public ZonedDateTime getSubmitDate() { return executionDetails.getSubmitDate(); } diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java index cb1becf..a99feda 100644 --- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java +++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java @@ -35,6 +35,7 @@ import org.apache.james.task.MemoryTaskManager; import org.apache.james.task.Task; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.utils.JsonTransformer; @@ -53,7 +54,7 @@ class TasksRoutesTest { @BeforeEach void setUp() { - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); webAdminServer = WebAdminUtils.createWebAdminServer(new TasksRoutes(taskManager, new JsonTransformer())) .start(); diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java index 3f948f5..b64d477 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java @@ -102,6 +102,7 @@ import org.apache.james.mailbox.model.MultimailboxesSearchQuery; import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.user.memory.MemoryUsersRepository; import org.apache.james.utils.UpdatableTickingClock; import org.apache.james.vault.DeletedMessage; @@ -181,7 +182,7 @@ class DeletedMessagesVaultRoutesTest { InMemoryIntegrationResources inMemoryResource = InMemoryIntegrationResources.defaultResources(); mailboxManager = spy(inMemoryResource.getMailboxManager()); - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); JsonTransformer jsonTransformer = new JsonTransformer(); RestoreService vaultRestore = new RestoreService(vault, mailboxManager); diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java index 67998c8..1fe7d60 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java @@ -51,6 +51,7 @@ import org.apache.james.mailbox.store.quota.DefaultUserQuotaRootResolver; import org.apache.james.mailbox.util.EventCollector; import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.service.EventDeadLettersRedeliverService; @@ -121,7 +122,7 @@ class EventDeadLettersRoutesTest { EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(eventBus, deadLetters); EventDeadLettersService service = new EventDeadLettersService(redeliverService, deadLetters); - taskManager = new MemoryTaskManager(); + taskManager = new MemoryTaskManager(new Hostname("foo")); webAdminServer = WebAdminUtils.createWebAdminServer( new EventDeadLettersRoutes(service, eventSerializer, taskManager, jsonTransformer), new TasksRoutes(taskManager, jsonTransformer)) diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java index 65ea398..e5925b6 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ReindexingRoutesTest.java @@ -47,6 +47,7 @@ import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.service.PreviousReIndexingService; @@ -79,7 +80,7 @@ class ReindexingRoutesTest { @BeforeEach void beforeEach() { mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager(); - MemoryTaskManager taskManager = new MemoryTaskManager(); + MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory(); searchIndex = mock(ListeningMessageSearchIndex.class); ReIndexer reIndexer = new ReIndexerImpl( diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java index 83fe246..ec62b10 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesTest.java @@ -45,6 +45,7 @@ import org.apache.james.queue.memory.MemoryMailQueueFactory; import org.apache.james.queue.memory.MemoryMailQueueFactory.MemoryMailQueue; import org.apache.james.task.MemoryTaskManager; import org.apache.james.task.TaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.service.ClearMailQueueTask; @@ -85,7 +86,7 @@ class MailQueueRoutesTest { WebAdminServer createServer(MemoryMailQueueFactory mailQueueFactory) { - TaskManager taskManager = new MemoryTaskManager(); + TaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); JsonTransformer jsonTransformer = new JsonTransformer(); return WebAdminUtils.createWebAdminServer( diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java index 5e1ef9e..872c213 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/routes/MailQueueRoutesUnitTest.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.webadmin.utils.JsonTransformer; import org.junit.Before; import org.junit.Test; @@ -36,7 +37,7 @@ public class MailQueueRoutesUnitTest { @Before public void setup() { - MemoryTaskManager taskManager = new MemoryTaskManager(); + MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); MailQueueFactory<ManageableMailQueue> mailQueueFactory = null; testee = new MailQueueRoutes(mailQueueFactory, new JsonTransformer(), taskManager); } diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java index 05c713c..6ff48e5 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java @@ -64,6 +64,7 @@ import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.apache.james.queue.memory.MemoryMailQueueFactory; import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.util.ClassLoaderUtils; import org.apache.james.webadmin.Constants; import org.apache.james.webadmin.WebAdminServer; @@ -111,7 +112,7 @@ public class MailRepositoriesRoutesTest { public void setUp() throws Exception { createMailRepositoryStore(); - MemoryTaskManager taskManager = new MemoryTaskManager(); + MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); JsonTransformer jsonTransformer = new JsonTransformer(); MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL); diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala index 1dff42f..e6b7aef 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala @@ -27,6 +27,7 @@ import javax.inject.Inject import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule} import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._ +import org.apache.james.task.eventsourcing.Hostname import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManager} import reactor.core.publisher.{Flux, Mono} @@ -41,6 +42,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .value(TYPE, bindMarker(TYPE)) .value(STATUS, bindMarker(STATUS)) .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE)) + .value(SUBMITTED_NODE, bindMarker(SUBMITTED_NODE)) .value(STARTED_DATE, bindMarker(STARTED_DATE)) .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE)) .value(CANCELED_DATE, bindMarker(CANCELED_DATE)) @@ -56,7 +58,8 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .setUUID(TASK_ID, details.getTaskId.getValue) .setString(TYPE, details.getType) .setString(STATUS, details.getStatus.getValue) - .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate).orElse(null)) + .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate)) + .setString(SUBMITTED_NODE, details.getSubmittedNode.asString) .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)) .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null)) .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null)) @@ -74,10 +77,11 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider taskId = TaskId.fromUUID(row.getUUID(TASK_ID)), `type` = row.getString(TYPE), status = TaskManager.Status.fromString(row.getString(STATUS)), - submitDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)), - startedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(STARTED_DATE)), - completedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(COMPLETED_DATE)), - canceledDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(CANCELED_DATE)), - failedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(FAILED_DATE)), + submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)), + submittedNode = Hostname(row.getString(SUBMITTED_NODE)), + startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)), + completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)), + canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)), + failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)), additionalInformation = Optional.empty) } diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala index 4b3e681..2f7300d 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala @@ -31,6 +31,7 @@ object CassandraTaskExecutionDetailsProjectionTable { val TYPE: String = "type" val STATUS: String = "status" val SUBMITTED_DATE: String = "submittedDate" + val SUBMITTED_NODE: String = "submittedNode" val STARTED_DATE: String = "startedDate" val COMPLETED_DATE: String = "completedDate" val CANCELED_DATE: String = "canceledDate" @@ -51,6 +52,7 @@ object CassandraTaskExecutionDetailsProjectionModule { .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text) .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala index 3bbcdda..63d9afe 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala @@ -34,14 +34,15 @@ sealed abstract class TaskEventDTO(val getType: String, val getAggregate: String case class CreatedDTO(@JsonProperty("type") typeName: String, @JsonProperty("aggregate") aggregateId: String, @JsonProperty("event") eventId: Int, - @JsonProperty("task") getTask: String) + @JsonProperty("task") getTask: String, + @JsonProperty("hostname") getHostname: String) extends TaskEventDTO(typeName, aggregateId, eventId) { - def toDomainObject(serializer: JsonTaskSerializer): Created = Created(domainAggregateId, domainEventId, serializer.deserialize(getTask)) + def toDomainObject(serializer: JsonTaskSerializer): Created = Created(domainAggregateId, domainEventId, serializer.deserialize(getTask), Hostname(getHostname)) } object CreatedDTO { def fromDomainObject(event: Created, typeName: String, serializer: JsonTaskSerializer): CreatedDTO = - CreatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializer.serialize(event.task)) + CreatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializer.serialize(event.task), event.hostname.asString) } case class StartedDTO(@JsonProperty("type") typeName: String, diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index ce43266..d84243c 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -43,6 +43,7 @@ import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerWorker; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.WorkerStatusListener; @@ -82,8 +83,8 @@ class DistributedTaskManagerTest { TaskManagerWorker worker = new SerialTaskManagerWorker(listener); return new MemoryWorkQueue(worker); }; - TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); - TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); + TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, new Hostname("foo")); + TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, new Hostname("bar")); TaskId taskId = taskManager1.submit(new CompletedTask()); Awaitility.await() diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java index 53952fd..183e10e 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java @@ -40,6 +40,7 @@ import org.apache.james.task.eventsourcing.Cancelled; import org.apache.james.task.eventsourcing.CancelRequested; import org.apache.james.task.eventsourcing.Completed; import org.apache.james.task.eventsourcing.Failed; +import org.apache.james.task.eventsourcing.Hostname; import org.apache.james.task.eventsourcing.Started; import org.apache.james.task.eventsourcing.TaskAggregateId; import org.apache.james.task.eventsourcing.TaskEvent; @@ -54,6 +55,7 @@ class TaskEventsSerializationTest { private static final TaskAggregateId AGGREGATE_ID = new TaskAggregateId(TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd")); private static final EventId EVENT_ID = EventId.fromSerialized(42); private static final Task TASK = new CompletedTask(); + private static final Hostname HOSTNAME = new Hostname("foo"); @ParameterizedTest @MethodSource @@ -77,7 +79,7 @@ class TaskEventsSerializationTest { private static Stream<Arguments> validTasks() throws Exception { return Stream.of( - Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK), "{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"), + Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), "{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"), Arguments.of(new Started(AGGREGATE_ID, EVENT_ID), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\"}"), Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), "{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"), Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.COMPLETED), "{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"), diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index f972e39..2e71938 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -28,6 +28,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import org.apache.james.task.eventsourcing.Hostname; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; @@ -78,22 +81,25 @@ public class MemoryTaskManager implements TaskManager { } } - public static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500); + private static final Duration AWAIT_POLLING_DURATION = Duration.ofMillis(500); public static final Duration NOW = Duration.ZERO; + + private final Hostname hostname; private final WorkQueue workQueue; private final TaskManagerWorker worker; private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails; - public MemoryTaskManager() { - - idToExecutionDetails = new ConcurrentHashMap<>(); - worker = new SerialTaskManagerWorker(updater()); + @Inject + public MemoryTaskManager(Hostname hostname) { + this.hostname = hostname; + this.idToExecutionDetails = new ConcurrentHashMap<>(); + this.worker = new SerialTaskManagerWorker(updater()); workQueue = new MemoryWorkQueue(worker); } public TaskId submit(Task task) { TaskId taskId = TaskId.generateTaskId(); - TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId); + TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId, hostname); idToExecutionDetails.put(taskId, executionDetails); workQueue.submit(new TaskWithId(taskId, task)); return taskId; diff --git a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala index 082694c..4e9f20d 100644 --- a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala +++ b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala @@ -23,6 +23,7 @@ import java.time.ZonedDateTime import java.util.{Objects, Optional} import org.apache.james.task.TaskManager.Status._ +import org.apache.james.task.eventsourcing.Hostname import com.google.common.base.MoreObjects @@ -30,14 +31,15 @@ object TaskExecutionDetails { trait AdditionalInformation {} - def from(task: Task, id: TaskId) = new TaskExecutionDetails(id, task.`type`, () => task.details, WAITING, submitDate = Optional.of(ZonedDateTime.now)) + def from(task: Task, id: TaskId, hostname: Hostname) = new TaskExecutionDetails(id, task.`type`, WAITING, submittedDate = ZonedDateTime.now, submittedNode = hostname, () => task.details) } class TaskExecutionDetails(val taskId: TaskId, private val `type`: String, - private val additionalInformation: () => Optional[TaskExecutionDetails.AdditionalInformation], private val status: TaskManager.Status, - private val submitDate: Optional[ZonedDateTime] = Optional.empty(), + private val submittedDate: ZonedDateTime, + private val submittedNode: Hostname, + private val additionalInformation: () => Optional[TaskExecutionDetails.AdditionalInformation], private val startedDate: Optional[ZonedDateTime] = Optional.empty(), private val completedDate: Optional[ZonedDateTime] = Optional.empty(), private val canceledDate: Optional[ZonedDateTime] = Optional.empty(), @@ -50,7 +52,9 @@ class TaskExecutionDetails(val taskId: TaskId, def getAdditionalInformation: Optional[TaskExecutionDetails.AdditionalInformation] = additionalInformation() - def getSubmitDate: Optional[ZonedDateTime] = submitDate + def getSubmitDate: ZonedDateTime = submittedDate + + def getSubmittedNode: Hostname = submittedNode def getStartedDate: Optional[ZonedDateTime] = startedDate @@ -100,7 +104,8 @@ class TaskExecutionDetails(val taskId: TaskId, Objects.equals(`type`, that.`type`) && Objects.equals(additionalInformation(), that.additionalInformation()) && Objects.equals(status, that.status) && - Objects.equals(submitDate, that.submitDate) && + Objects.equals(submittedDate, that.submittedDate) && + Objects.equals(submittedNode, that.submittedNode) && Objects.equals(startedDate, that.startedDate) && Objects.equals(completedDate, that.completedDate) && Objects.equals(canceledDate, that.canceledDate) && @@ -109,38 +114,49 @@ class TaskExecutionDetails(val taskId: TaskId, } override def hashCode(): Int = - Objects.hash(taskId, `type`, additionalInformation(), status, submitDate, startedDate, completedDate, canceledDate, failedDate) + Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode, startedDate, completedDate, canceledDate, failedDate) override def toString: String = MoreObjects.toStringHelper(this) .add("taskId", taskId) .add("type", `type`) - .add("", additionalInformation()) - .add("", status) - .add("", submitDate) - .add("", startedDate) - .add("", completedDate) - .add("", canceledDate) - .add("", failedDate) + .add("additionalInformation", additionalInformation()) + .add("status", status) + .add("submittedDate", submittedDate) + .add("submittedNode", submittedNode) + .add("startedDate", startedDate) + .add("completedDate", completedDate) + .add("canceledDate", canceledDate) + .add("failedDate", failedDate) .toString - private def start = new TaskExecutionDetails(taskId, `type`, additionalInformation, IN_PROGRESS, - submitDate = submitDate, + private def start = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS, + submittedDate = submittedDate, + submittedNode = submittedNode, + additionalInformation = additionalInformation, startedDate = Optional.of(ZonedDateTime.now)) - private def complete = new TaskExecutionDetails(taskId, `type`, additionalInformation, TaskManager.Status.COMPLETED, - submitDate = submitDate, + private def complete = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.COMPLETED, + submittedDate = submittedDate, + submittedNode = submittedNode, startedDate = startedDate, + additionalInformation = additionalInformation, completedDate = Optional.of(ZonedDateTime.now)) - private def fail = new TaskExecutionDetails(taskId, `type`, additionalInformation, TaskManager.Status.FAILED, - submitDate = submitDate, + private def fail = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.FAILED, + submittedDate = submittedDate, + submittedNode = submittedNode, startedDate = startedDate, + additionalInformation = additionalInformation, failedDate = Optional.of(ZonedDateTime.now)) - private def requestCancel = new TaskExecutionDetails(taskId, `type`, additionalInformation, TaskManager.Status.CANCEL_REQUESTED, - submitDate = submitDate, + private def requestCancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED, + submittedDate = submittedDate, + submittedNode = submittedNode, + additionalInformation = additionalInformation, startedDate = startedDate, canceledDate = Optional.of(ZonedDateTime.now)) - private def cancel = new TaskExecutionDetails(taskId, `type`, additionalInformation, TaskManager.Status.CANCELLED, - submitDate = submitDate, + private def cancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCELLED, + submittedDate = submittedDate, + submittedNode = submittedNode, + additionalInformation = additionalInformation, startedDate = startedDate, canceledDate = Optional.of(ZonedDateTime.now)) } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index 1bb8abf..905f1ec 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -33,11 +33,11 @@ sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandle } } -class CreateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Create] { +class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, hostname: Hostname) extends TaskCommandHandler[Create] { override def handledClass: Class[Create] = classOf[Create] override def handle(command: Create): util.List[_ <: Event] = { - loadAggregate(loadHistory, command.id).create(command.task) + loadAggregate(loadHistory, command.id).create(command.task, hostname) } } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 34f55cf..d091197 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -33,12 +33,13 @@ import scala.annotation.tailrec class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( workQueueSupplier: WorkQueueSupplier, val eventStore: EventStore, - val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable { + val executionDetailsProjection: TaskExecutionDetailsProjection, + val hostname: Hostname) extends TaskManager with Closeable { private val delayBetweenPollingInMs = 500 private def workDispatcher: Subscriber = { - case Created(aggregateId, _, task) => + case Created(aggregateId, _, task, _) => val taskWithId = new TaskWithId(aggregateId.taskId, task) workQueue.submit(taskWithId) case CancelRequested(aggregateId, _) => @@ -51,7 +52,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _ private val eventSourcingSystem = ScalaEventSourcingSystem( handlers = Set( - new CreateCommandHandler(loadHistory), + new CreateCommandHandler(loadHistory, hostname), new StartCommandHandler(loadHistory), new RequestCancelCommandHandler(loadHistory), new CompleteCommandHandler(loadHistory), @@ -103,4 +104,4 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] override def close(): Unit = { workQueue.close() } -} \ No newline at end of file +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala index 2536b38..8f8bb40 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -26,7 +26,11 @@ sealed abstract class TaskEvent(aggregateId: TaskAggregateId, val eventId: Event override def getAggregateId: TaskAggregateId = aggregateId } -case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, task: Task) extends TaskEvent(aggregateId, eventId) +case class Hostname(private val value: String) { + def asString: String = value +} + +case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, task: Task, hostname: Hostname) extends TaskEvent(aggregateId, eventId) case class Started(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 5fd5875..2013d91 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -37,9 +37,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor .status - def create(task: Task): util.List[Event] = { + def create(task: Task, hostname: Hostname): util.List[Event] = { if (currentStatus.isEmpty) { - createEventWithId(Created(aggregateId, _, task)) + createEventWithId(Created(aggregateId, _, task, hostname)) } else Nil.asJava } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala index 918a5e0..65f0aca 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -27,7 +27,7 @@ import collection.JavaConverters._ trait TaskExecutionDetailsProjection { val asSubscriber: Subscriber = { case created: Created => - update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId)) + update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname)) case cancelRequested: CancelRequested => update(cancelRequested.aggregateId.taskId)(_.cancelRequested) case started: Started => diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java index 50cdf51..01c5eb1 100644 --- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java @@ -19,6 +19,8 @@ package org.apache.james.task; +import org.apache.james.task.eventsourcing.Hostname; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,7 +32,7 @@ class MemoryTaskManagerTest implements TaskManagerContract { @BeforeEach void setUp() { - memoryTaskManager = new MemoryTaskManager(); + memoryTaskManager = new MemoryTaskManager(new Hostname("foo")); } @AfterEach diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index 63c62b0..867665f 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -19,33 +19,41 @@ package org.apache.james.task.eventsourcing; +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; import org.apache.james.task.CountDownLatchExtension; import org.apache.james.task.MemoryWorkQueue; import org.apache.james.task.SerialTaskManagerWorker; +import org.apache.james.task.Task; +import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; import org.apache.james.task.TaskManagerWorker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(CountDownLatchExtension.class) class EventSourcingTaskManagerTest implements TaskManagerContract { + private static final Hostname HOSTNAME = new Hostname("foo"); private EventSourcingTaskManager taskManager; + private EventStore eventStore; @BeforeEach void setUp() { - EventStore eventStore = new InMemoryEventStore(); + eventStore = new InMemoryEventStore(); TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection(); WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> { WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem); TaskManagerWorker worker = new SerialTaskManagerWorker(listener); return new MemoryWorkQueue(worker); }; - taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection); + taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME); } @AfterEach @@ -57,4 +65,15 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { public TaskManager taskManager() { return taskManager; } -} \ No newline at end of file + + @Test + void createdTaskShouldKeepOriginHostname() { + TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED); + TaskAggregateId aggregateId = new TaskAggregateId(taskId); + assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + .filteredOn(event -> event instanceof Created) + .extracting("hostname") + .containsOnly(HOSTNAME); + } + +} diff --git a/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala b/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala index a3c5b38..ddcf176 100644 --- a/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala +++ b/server/task/src/test/scala/org/apache/james/task/TaskExecutionDetailsFixture.scala @@ -18,22 +18,29 @@ * ***************************************************************/ package org.apache.james.task +import java.time.{LocalDateTime, ZoneId, ZonedDateTime} import java.util.Optional import org.apache.james.task.TaskExecutionDetails.AdditionalInformation +import org.apache.james.task.eventsourcing.Hostname object TaskExecutionDetailsFixture { + val SUBMITTED_DATE = ZonedDateTime.of(LocalDateTime.of(2000, 1, 1, 0, 0), ZoneId.of("Europe/Paris")) + val SUBMITTED_NODE = Hostname("foo") + val SUBMITTED_DATE_2 = ZonedDateTime.of(LocalDateTime.of(2011, 11, 11, 11, 11), ZoneId.of("Europe/Paris")) + val SUBMITTED_NODE_2 = Hostname("bar") val TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd") val TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe") val ADDITIONAL_INFORMATION: () => Optional[AdditionalInformation] = Optional.empty - val TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION, TaskManager.Status.COMPLETED) - val TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", ADDITIONAL_INFORMATION, TaskManager.Status.COMPLETED) - val TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION, TaskManager.Status.FAILED) + val TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", TaskManager.Status.COMPLETED, SUBMITTED_DATE, SUBMITTED_NODE, ADDITIONAL_INFORMATION) + val TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", TaskManager.Status.COMPLETED, SUBMITTED_DATE, SUBMITTED_NODE, ADDITIONAL_INFORMATION) + val TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, "type", TaskManager.Status.FAILED, SUBMITTED_DATE, SUBMITTED_NODE, ADDITIONAL_INFORMATION) val ADDITIONAL_INFORMATION_2: () => Optional[AdditionalInformation] = () => Optional.of(new CustomAdditionalInformation("hello")) - val TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION = new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION_2, TaskManager.Status.COMPLETED) + val TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION = new TaskExecutionDetails(TASK_ID, "type", TaskManager.Status.COMPLETED, SUBMITTED_DATE_2, SUBMITTED_NODE_2, ADDITIONAL_INFORMATION) + } case class CustomAdditionalInformation(value: String) extends AdditionalInformation --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org