This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cc8f433ad7c5c39a207b00621fada3d66998b4fd Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Fri Aug 23 16:21:03 2019 +0200 JAMES-2873 Add Hostname into Started event --- ...{TaskManagerModule.java => HostnameModule.java} | 37 ++++++++++++++++------ .../james/modules/server/TaskManagerModule.java | 1 + ...assandraTaskExecutionDetailsProjectionDAO.scala | 3 ++ ...andraTaskExecutionDetailsProjectionModule.scala | 2 ++ .../eventsourcing/distributed/TaskEventDTO.scala | 7 ++-- .../distributed/TaskEventsSerializationTest.java | 4 +-- .../org/apache/james/task/MemoryTaskManager.java | 8 +++-- .../apache/james/task/TaskExecutionDetails.scala | 26 ++++++++++----- .../james/task/eventsourcing/CommandHandlers.scala | 7 ++-- .../eventsourcing/EventSourcingTaskManager.scala | 4 +-- .../apache/james/task/eventsourcing/Events.scala | 2 +- .../james/task/eventsourcing/TaskAggregate.scala | 4 +-- .../TaskExecutionDetailsProjection.scala | 4 +-- .../EventSourcingTaskManagerTest.java | 19 +++++++++++ 14 files changed, 92 insertions(+), 36 deletions(-) diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java similarity index 52% copy from server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java copy to server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java index b3d1e4f..6607a74 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java @@ -1,4 +1,5 @@ -/**************************************************************** +/** + * ************************************************************* * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,29 +7,45 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - ****************************************************************/ + ***************************************************************/ package org.apache.james.modules.server; -import org.apache.james.task.MemoryTaskManager; -import org.apache.james.task.TaskManager; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.james.task.eventsourcing.Hostname; import com.google.inject.AbstractModule; import com.google.inject.Scopes; -public class TaskManagerModule extends AbstractModule { +public class HostnameModule extends AbstractModule { + private static class UnconfigurableHostnameException extends RuntimeException { + UnconfigurableHostnameException(String message, Exception originException) { + super(message, originException); + } + } + @Override protected void configure() { - bind(MemoryTaskManager.class).in(Scopes.SINGLETON); - bind(TaskManager.class).to(MemoryTaskManager.class); + bind(Hostname.class).in(Scopes.SINGLETON); + bind(Hostname.class).toInstance(getHostname()); + } + + private Hostname getHostname() { + try { + return new Hostname(InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new UnconfigurableHostnameException("Hostname can not be retrieved, unable to initialize the distributed task manager", e); + } } } diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java index b3d1e4f..484ad4c 100644 --- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java +++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java @@ -28,6 +28,7 @@ import com.google.inject.Scopes; public class TaskManagerModule extends AbstractModule { @Override protected void configure() { + install(new HostnameModule()); bind(MemoryTaskManager.class).in(Scopes.SINGLETON); bind(TaskManager.class).to(MemoryTaskManager.class); } diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala index e6b7aef..e7af442 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala @@ -44,6 +44,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE)) .value(SUBMITTED_NODE, bindMarker(SUBMITTED_NODE)) .value(STARTED_DATE, bindMarker(STARTED_DATE)) + .value(RAN_NODE, bindMarker(RAN_NODE)) .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE)) .value(CANCELED_DATE, bindMarker(CANCELED_DATE)) .value(FAILED_DATE, bindMarker(FAILED_DATE))) @@ -61,6 +62,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate)) .setString(SUBMITTED_NODE, details.getSubmittedNode.asString) .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)) + .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null)) .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null)) .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null)) .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))) @@ -80,6 +82,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)), submittedNode = Hostname(row.getString(SUBMITTED_NODE)), startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)), + ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)), completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)), canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)), failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)), diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala index 2f7300d..1ee0554 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala @@ -33,6 +33,7 @@ object CassandraTaskExecutionDetailsProjectionTable { val SUBMITTED_DATE: String = "submittedDate" val SUBMITTED_NODE: String = "submittedNode" val STARTED_DATE: String = "startedDate" + val RAN_NODE: String = "ranNode" val COMPLETED_DATE: String = "completedDate" val CANCELED_DATE: String = "canceledDate" val FAILED_DATE: String = "failedDate" @@ -54,6 +55,7 @@ object CassandraTaskExecutionDetailsProjectionModule { .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))) diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala index 63d9afe..4e13454 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala @@ -47,14 +47,15 @@ object CreatedDTO { case class StartedDTO(@JsonProperty("type") typeName: String, @JsonProperty("aggregate") aggregateId: String, - @JsonProperty("event") eventId: Int) + @JsonProperty("event") eventId: Int, + @JsonProperty("hostname") getHostname: String) extends TaskEventDTO(typeName, aggregateId, eventId) { - def toDomainObject: Started = Started(domainAggregateId, domainEventId) + def toDomainObject: Started = Started(domainAggregateId, domainEventId, Hostname(getHostname)) } object StartedDTO { def fromDomainObject(event: Started, typeName: String): StartedDTO = - StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize()) + StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), event.hostname.asString) } case class CancelRequestedDTO(@JsonProperty("type") typeName: String, diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java index 183e10e..b41c762 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java @@ -80,7 +80,7 @@ class TaskEventsSerializationTest { private static Stream<Arguments> validTasks() throws Exception { return Stream.of( Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), "{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"), - Arguments.of(new Started(AGGREGATE_ID, EVENT_ID), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\"}"), + Arguments.of(new Started(AGGREGATE_ID, EVENT_ID, HOSTNAME), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\",\"hostname\":\"foo\"}"), Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), "{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"), Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.COMPLETED), "{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"), Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.PARTIAL), "{\"result\":\"PARTIAL\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"), @@ -89,4 +89,4 @@ class TaskEventsSerializationTest { ); } -} \ No newline at end of file +} diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 2e71938..b065edd 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -47,14 +47,16 @@ public class MemoryTaskManager implements TaskManager { private static class DetailsUpdater implements TaskManagerWorker.Listener { private final TaskExecutionDetailsUpdaterFactory updaterFactory; + private final Hostname hostname; - DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory) { + DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory, Hostname hostname) { this.updaterFactory = updaterFactory; + this.hostname = hostname; } @Override public void started(TaskId taskId) { - updaterFactory.apply(taskId).accept(TaskExecutionDetails::started); + updaterFactory.apply(taskId).accept(details -> details.started(hostname)); } @Override @@ -162,7 +164,7 @@ public class MemoryTaskManager implements TaskManager { } private DetailsUpdater updater() { - return new DetailsUpdater(this::updateDetails); + return new DetailsUpdater(this::updateDetails, hostname); } private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) { diff --git a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala index 4e9f20d..95b0d99 100644 --- a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala +++ b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala @@ -41,6 +41,7 @@ class TaskExecutionDetails(val taskId: TaskId, private val submittedNode: Hostname, private val additionalInformation: () => Optional[TaskExecutionDetails.AdditionalInformation], private val startedDate: Optional[ZonedDateTime] = Optional.empty(), + private val ranNode: Optional[Hostname] = Optional.empty(), private val completedDate: Optional[ZonedDateTime] = Optional.empty(), private val canceledDate: Optional[ZonedDateTime] = Optional.empty(), private val failedDate: Optional[ZonedDateTime] = Optional.empty()) { @@ -58,14 +59,16 @@ class TaskExecutionDetails(val taskId: TaskId, def getStartedDate: Optional[ZonedDateTime] = startedDate + def getRanNode: Optional[Hostname] = ranNode + def getCompletedDate: Optional[ZonedDateTime] = completedDate def getCanceledDate: Optional[ZonedDateTime] = canceledDate def getFailedDate: Optional[ZonedDateTime] = failedDate - def started: TaskExecutionDetails = status match { - case WAITING => start + def started(hostname: Hostname): TaskExecutionDetails = status match { + case WAITING => start(hostname) case _ => this } @@ -82,7 +85,7 @@ class TaskExecutionDetails(val taskId: TaskId, case _ => this } - def cancelRequested: TaskExecutionDetails = status match { + def cancelRequested(hostname: Hostname): TaskExecutionDetails = status match { case IN_PROGRESS => requestCancel case WAITING => requestCancel case _ => this @@ -107,6 +110,7 @@ class TaskExecutionDetails(val taskId: TaskId, Objects.equals(submittedDate, that.submittedDate) && Objects.equals(submittedNode, that.submittedNode) && Objects.equals(startedDate, that.startedDate) && + Objects.equals(ranNode, that.ranNode) && Objects.equals(completedDate, that.completedDate) && Objects.equals(canceledDate, that.canceledDate) && Objects.equals(failedDate, that.failedDate) @@ -114,7 +118,7 @@ class TaskExecutionDetails(val taskId: TaskId, } override def hashCode(): Int = - Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode, startedDate, completedDate, canceledDate, failedDate) + Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode, startedDate, ranNode, completedDate, canceledDate, failedDate) override def toString: String = MoreObjects.toStringHelper(this) @@ -125,38 +129,44 @@ class TaskExecutionDetails(val taskId: TaskId, .add("submittedDate", submittedDate) .add("submittedNode", submittedNode) .add("startedDate", startedDate) + .add("ranNode", ranNode) .add("completedDate", completedDate) .add("canceledDate", canceledDate) .add("failedDate", failedDate) .toString - private def start = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS, + private def start(hostname: Hostname) = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS, submittedDate = submittedDate, submittedNode = submittedNode, additionalInformation = additionalInformation, - startedDate = Optional.of(ZonedDateTime.now)) + startedDate = Optional.of(ZonedDateTime.now), + ranNode = Optional.of(hostname)) private def complete = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.COMPLETED, submittedDate = submittedDate, submittedNode = submittedNode, - startedDate = startedDate, additionalInformation = additionalInformation, + startedDate = startedDate, + ranNode = ranNode, completedDate = Optional.of(ZonedDateTime.now)) private def fail = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.FAILED, submittedDate = submittedDate, submittedNode = submittedNode, - startedDate = startedDate, additionalInformation = additionalInformation, + startedDate = startedDate, + ranNode = ranNode, failedDate = Optional.of(ZonedDateTime.now)) private def requestCancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED, submittedDate = submittedDate, submittedNode = submittedNode, additionalInformation = additionalInformation, startedDate = startedDate, + ranNode = ranNode, canceledDate = Optional.of(ZonedDateTime.now)) private def cancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCELLED, submittedDate = submittedDate, submittedNode = submittedNode, additionalInformation = additionalInformation, startedDate = startedDate, + ranNode = ranNode, canceledDate = Optional.of(ZonedDateTime.now)) } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index 905f1ec..cdc241e 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -41,11 +41,12 @@ class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, } } -class StartCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Start] { +class StartCommandHandler(private val loadHistory: TaskAggregateId => History, + private val hostname: Hostname) extends TaskCommandHandler[Start] { override def handledClass: Class[Start] = classOf[Start] override def handle(command: Start): util.List[_ <: Event] = { - loadAggregate(loadHistory, command.id).start() + loadAggregate(loadHistory, command.id).start(hostname) } } @@ -79,4 +80,4 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex override def handle(command: Fail): util.List[_ <: Event] = { loadAggregate(loadHistory, command.id).fail() } -} \ No newline at end of file +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index d091197..7302b4b 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -53,13 +53,13 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] private val eventSourcingSystem = ScalaEventSourcingSystem( handlers = Set( new CreateCommandHandler(loadHistory, hostname), - new StartCommandHandler(loadHistory), + new StartCommandHandler(loadHistory, hostname), new RequestCancelCommandHandler(loadHistory), new CompleteCommandHandler(loadHistory), new CancelCommandHandler(loadHistory), new FailCommandHandler(loadHistory)), subscribers = Set( - executionDetailsProjection.asSubscriber, + executionDetailsProjection.asSubscriber(hostname), workDispatcher), eventStore = eventStore) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala index 8f8bb40..61d7d8f 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -32,7 +32,7 @@ case class Hostname(private val value: String) { case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, task: Task, hostname: Hostname) extends TaskEvent(aggregateId, eventId) -case class Started(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) +case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 2013d91..24aef85 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -43,9 +43,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor } else Nil.asJava } - private[eventsourcing] def start(): util.List[Event] = { + private[eventsourcing] def start(hostname: Hostname): util.List[Event] = { currentStatus match { - case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _)) + case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname)) case _ => Nil.asJava } } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala index 65f0aca..5654e0f 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -25,13 +25,13 @@ import org.apache.james.task.{TaskExecutionDetails, TaskId} import collection.JavaConverters._ trait TaskExecutionDetailsProjection { - val asSubscriber: Subscriber = { + def asSubscriber(hostname: Hostname): Subscriber = { case created: Created => update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname)) case cancelRequested: CancelRequested => update(cancelRequested.aggregateId.taskId)(_.cancelRequested) case started: Started => - update(started.aggregateId.taskId)(_.started) + update(started.aggregateId.taskId)(_.started(hostname)) case completed: Completed => update(completed.aggregateId.taskId)(_.completed) case failed: Failed => diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index 867665f..b8773a8 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -20,6 +20,7 @@ package org.apache.james.task.eventsourcing; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.eventstore.EventStore; @@ -32,6 +33,9 @@ import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; import org.apache.james.task.TaskManagerWorker; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +43,10 @@ import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(CountDownLatchExtension.class) class EventSourcingTaskManagerTest implements TaskManagerContract { + ConditionFactory CALMLY_AWAIT = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await(); private static final Hostname HOSTNAME = new Hostname("foo"); private EventSourcingTaskManager taskManager; @@ -76,4 +84,15 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { .containsOnly(HOSTNAME); } + @Test + void startedTaskShouldKeepOriginHostname() { + TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED); + TaskAggregateId aggregateId = new TaskAggregateId(taskId); + + CALMLY_AWAIT.untilAsserted(() -> + assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + .filteredOn(event -> event instanceof Started) + .extracting("hostname") + .containsOnly(HOSTNAME)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org