This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bc64574d21e7f703a2044744d79ce2a15ae45b0a Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Wed Aug 28 11:34:04 2019 +0200 JAMES-2873 Add Hostname into CancelRequested event --- .../CassandraTaskExecutionDetailsProjectionDAO.scala | 3 +++ .../CassandraTaskExecutionDetailsProjectionModule.scala | 2 ++ .../task/eventsourcing/distributed/TaskEventDTO.scala | 7 ++++--- .../distributed/TaskEventsSerializationTest.java | 4 ++-- .../java/org/apache/james/task/MemoryTaskManager.java | 2 +- .../org/apache/james/task/TaskExecutionDetails.scala | 17 +++++++++++++---- .../james/task/eventsourcing/CommandHandlers.scala | 7 ++++--- .../task/eventsourcing/EventSourcingTaskManager.scala | 4 ++-- .../org/apache/james/task/eventsourcing/Events.scala | 2 +- .../apache/james/task/eventsourcing/TaskAggregate.scala | 4 ++-- .../eventsourcing/TaskExecutionDetailsProjection.scala | 2 +- .../eventsourcing/EventSourcingTaskManagerTest.java | 16 ++++++++++++++++ 12 files changed, 51 insertions(+), 19 deletions(-) diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala index e7af442..f598aa5 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala @@ -47,6 +47,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .value(RAN_NODE, bindMarker(RAN_NODE)) .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE)) .value(CANCELED_DATE, bindMarker(CANCELED_DATE)) + .value(CANCEL_REQUESTED_NODE, bindMarker(CANCEL_REQUESTED_NODE)) .value(FAILED_DATE, bindMarker(FAILED_DATE))) private val selectStatement = session.prepare(select().from(TABLE_NAME) @@ -65,6 +66,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null)) .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null)) .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null)) + .setString(CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString).orElse(null)) .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))) def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = cassandraAsyncExecutor @@ -85,6 +87,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)), completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)), canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)), + cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)), failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)), additionalInformation = Optional.empty) } diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala index 1ee0554..4baaa0b 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala @@ -36,6 +36,7 @@ object CassandraTaskExecutionDetailsProjectionTable { val RAN_NODE: String = "ranNode" val COMPLETED_DATE: String = "completedDate" val CANCELED_DATE: String = "canceledDate" + val CANCEL_REQUESTED_NODE: String = "cancelRequestedNode" val FAILED_DATE: String = "failedDate" } @@ -58,6 +59,7 @@ object CassandraTaskExecutionDetailsProjectionModule { .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .addColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, text) .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))) .build } diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala index 4e13454..8ff3093 100644 --- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala +++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala @@ -60,14 +60,15 @@ object StartedDTO { case class CancelRequestedDTO(@JsonProperty("type") typeName: String, @JsonProperty("aggregate") aggregateId: String, - @JsonProperty("event") eventId: Int) + @JsonProperty("event") eventId: Int, + @JsonProperty("hostname") getHostname: String) extends TaskEventDTO(typeName, aggregateId, eventId) { - def toDomainObject: CancelRequested = CancelRequested(domainAggregateId, domainEventId) + def toDomainObject: CancelRequested = CancelRequested(domainAggregateId, domainEventId, Hostname(getHostname)) } object CancelRequestedDTO { def fromDomainObject(event: CancelRequested, typeName: String): CancelRequestedDTO = - CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize()) + CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), event.hostname.asString) } case class CompletedDTO(@JsonProperty("type") typeName: String, diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java index b41c762..eaf03ec 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java @@ -81,7 +81,7 @@ class TaskEventsSerializationTest { return Stream.of( Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), "{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"), Arguments.of(new Started(AGGREGATE_ID, EVENT_ID, HOSTNAME), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\",\"hostname\":\"foo\"}"), - Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), "{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"), + Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID, HOSTNAME), "{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"), Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.COMPLETED), "{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"), Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.PARTIAL), "{\"result\":\"PARTIAL\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"), Arguments.of(new Failed(AGGREGATE_ID, EVENT_ID), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-failed\"}"), @@ -89,4 +89,4 @@ class TaskEventsSerializationTest { ); } -} +} \ No newline at end of file diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index b065edd..90e88c0 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -133,7 +133,7 @@ public class MemoryTaskManager implements TaskManager { @Override public void cancel(TaskId id) { Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> { - updateDetails(id).accept(TaskExecutionDetails::cancelRequested); + updateDetails(id).accept(taskExecutionDetails -> taskExecutionDetails.cancelRequested(hostname)); workQueue.cancel(id); } ); diff --git a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala index 95b0d99..d08d879 100644 --- a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala +++ b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala @@ -44,6 +44,7 @@ class TaskExecutionDetails(val taskId: TaskId, private val ranNode: Optional[Hostname] = Optional.empty(), private val completedDate: Optional[ZonedDateTime] = Optional.empty(), private val canceledDate: Optional[ZonedDateTime] = Optional.empty(), + private val cancelRequestedNode: Optional[Hostname] = Optional.empty(), private val failedDate: Optional[ZonedDateTime] = Optional.empty()) { def getTaskId: TaskId = taskId @@ -65,6 +66,8 @@ class TaskExecutionDetails(val taskId: TaskId, def getCanceledDate: Optional[ZonedDateTime] = canceledDate + def getCancelRequestedNode: Optional[Hostname] = cancelRequestedNode + def getFailedDate: Optional[ZonedDateTime] = failedDate def started(hostname: Hostname): TaskExecutionDetails = status match { @@ -86,8 +89,8 @@ class TaskExecutionDetails(val taskId: TaskId, } def cancelRequested(hostname: Hostname): TaskExecutionDetails = status match { - case IN_PROGRESS => requestCancel - case WAITING => requestCancel + case IN_PROGRESS => requestCancel(hostname) + case WAITING => requestCancel(hostname) case _ => this } @@ -113,12 +116,13 @@ class TaskExecutionDetails(val taskId: TaskId, Objects.equals(ranNode, that.ranNode) && Objects.equals(completedDate, that.completedDate) && Objects.equals(canceledDate, that.canceledDate) && + Objects.equals(cancelRequestedNode, that.cancelRequestedNode) && Objects.equals(failedDate, that.failedDate) case _ => false } override def hashCode(): Int = - Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode, startedDate, ranNode, completedDate, canceledDate, failedDate) + Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode, startedDate, ranNode, completedDate, canceledDate, cancelRequestedNode, failedDate) override def toString: String = MoreObjects.toStringHelper(this) @@ -132,6 +136,7 @@ class TaskExecutionDetails(val taskId: TaskId, .add("ranNode", ranNode) .add("completedDate", completedDate) .add("canceledDate", canceledDate) + .add("cancelRequestedNode", cancelRequestedNode) .add("failedDate", failedDate) .toString @@ -147,6 +152,7 @@ class TaskExecutionDetails(val taskId: TaskId, additionalInformation = additionalInformation, startedDate = startedDate, ranNode = ranNode, + cancelRequestedNode = cancelRequestedNode, completedDate = Optional.of(ZonedDateTime.now)) private def fail = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.FAILED, submittedDate = submittedDate, @@ -154,13 +160,15 @@ class TaskExecutionDetails(val taskId: TaskId, additionalInformation = additionalInformation, startedDate = startedDate, ranNode = ranNode, + cancelRequestedNode = cancelRequestedNode, failedDate = Optional.of(ZonedDateTime.now)) - private def requestCancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED, + private def requestCancel(hostname: Hostname) = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED, submittedDate = submittedDate, submittedNode = submittedNode, additionalInformation = additionalInformation, startedDate = startedDate, ranNode = ranNode, + cancelRequestedNode = Optional.of(hostname), canceledDate = Optional.of(ZonedDateTime.now)) private def cancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCELLED, submittedDate = submittedDate, @@ -168,5 +176,6 @@ class TaskExecutionDetails(val taskId: TaskId, additionalInformation = additionalInformation, startedDate = startedDate, ranNode = ranNode, + cancelRequestedNode = cancelRequestedNode, canceledDate = Optional.of(ZonedDateTime.now)) } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index cdc241e..d585e74 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -50,11 +50,12 @@ class StartCommandHandler(private val loadHistory: TaskAggregateId => History, } } -class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[RequestCancel] { +class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => History, + private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] { override def handledClass: Class[RequestCancel] = classOf[RequestCancel] override def handle(command: RequestCancel): util.List[_ <: Event] = { - loadAggregate(loadHistory, command.id).requestCancel() + loadAggregate(loadHistory, command.id).requestCancel(hostname) } } @@ -80,4 +81,4 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex override def handle(command: Fail): util.List[_ <: Event] = { loadAggregate(loadHistory, command.id).fail() } -} +} \ No newline at end of file diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 7302b4b..f57c947 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -42,7 +42,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] case Created(aggregateId, _, task, _) => val taskWithId = new TaskWithId(aggregateId.taskId, task) workQueue.submit(taskWithId) - case CancelRequested(aggregateId, _) => + case CancelRequested(aggregateId, _, _) => workQueue.cancel(aggregateId.taskId) case _ => } @@ -54,7 +54,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] handlers = Set( new CreateCommandHandler(loadHistory, hostname), new StartCommandHandler(loadHistory, hostname), - new RequestCancelCommandHandler(loadHistory), + new RequestCancelCommandHandler(loadHistory, hostname), new CompleteCommandHandler(loadHistory), new CancelCommandHandler(loadHistory), new FailCommandHandler(loadHistory)), diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala index 61d7d8f..fa0607e 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -34,7 +34,7 @@ case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) -case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) +case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result) extends TaskEvent(aggregateId, eventId) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 24aef85..ae86433 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -50,9 +50,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor } } - def requestCancel(): util.List[Event] = { + def requestCancel(hostname: Hostname): util.List[Event] = { currentStatus match { - case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _)) + case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _, hostname)) case _ => Nil.asJava } } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala index 5654e0f..a6ffff2 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -29,7 +29,7 @@ trait TaskExecutionDetailsProjection { case created: Created => update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname)) case cancelRequested: CancelRequested => - update(cancelRequested.aggregateId.taskId)(_.cancelRequested) + update(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname)) case started: Started => update(started.aggregateId.taskId)(_.started(hostname)) case completed: Completed => diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index b8773a8..281e217 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -95,4 +95,20 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { .extracting("hostname") .containsOnly(HOSTNAME)); } + + @Test + void cancelRequestedTaskShouldKeepOriginHostname() { + TaskId taskId = taskManager.submit(() -> { + Thread.sleep(100); + return Task.Result.COMPLETED; + }); + taskManager.cancel(taskId); + + TaskAggregateId aggregateId = new TaskAggregateId(taskId); + CALMLY_AWAIT.untilAsserted(() -> + assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + .filteredOn(event -> event instanceof CancelRequested) + .extracting("hostname") + .containsOnly(HOSTNAME)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org