This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bc64574d21e7f703a2044744d79ce2a15ae45b0a
Author: Gautier DI FOLCO <gdifo...@linagora.com>
AuthorDate: Wed Aug 28 11:34:04 2019 +0200

    JAMES-2873 Add Hostname into CancelRequested event
---
 .../CassandraTaskExecutionDetailsProjectionDAO.scala    |  3 +++
 .../CassandraTaskExecutionDetailsProjectionModule.scala |  2 ++
 .../task/eventsourcing/distributed/TaskEventDTO.scala   |  7 ++++---
 .../distributed/TaskEventsSerializationTest.java        |  4 ++--
 .../java/org/apache/james/task/MemoryTaskManager.java   |  2 +-
 .../org/apache/james/task/TaskExecutionDetails.scala    | 17 +++++++++++++----
 .../james/task/eventsourcing/CommandHandlers.scala      |  7 ++++---
 .../task/eventsourcing/EventSourcingTaskManager.scala   |  4 ++--
 .../org/apache/james/task/eventsourcing/Events.scala    |  2 +-
 .../apache/james/task/eventsourcing/TaskAggregate.scala |  4 ++--
 .../eventsourcing/TaskExecutionDetailsProjection.scala  |  2 +-
 .../eventsourcing/EventSourcingTaskManagerTest.java     | 16 ++++++++++++++++
 12 files changed, 51 insertions(+), 19 deletions(-)

diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index e7af442..f598aa5 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -47,6 +47,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
     .value(RAN_NODE, bindMarker(RAN_NODE))
     .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
     .value(CANCELED_DATE, bindMarker(CANCELED_DATE))
+    .value(CANCEL_REQUESTED_NODE, bindMarker(CANCEL_REQUESTED_NODE))
     .value(FAILED_DATE, bindMarker(FAILED_DATE)))
 
   private val selectStatement = session.prepare(select().from(TABLE_NAME)
@@ -65,6 +66,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
       .setString(RAN_NODE, 
details.getRanNode.map[String](_.asString).orElse(null))
       .setUDTValue(COMPLETED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, 
details.getCompletedDate).orElse(null))
       .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getCanceledDate).orElse(null))
+      .setString(CANCEL_REQUESTED_NODE, 
details.getCancelRequestedNode.map[String](_.asString).orElse(null))
       .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getStartedDate).orElse(null)))
 
   def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = 
cassandraAsyncExecutor
@@ -85,6 +87,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: 
Session, typesProvider
     ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)),
     completedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)),
     canceledDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
+    cancelRequestedNode = 
Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
     failedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
     additionalInformation = Optional.empty)
 }
diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
index 1ee0554..4baaa0b 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -36,6 +36,7 @@ object CassandraTaskExecutionDetailsProjectionTable {
   val RAN_NODE: String = "ranNode"
   val COMPLETED_DATE: String = "completedDate"
   val CANCELED_DATE: String = "canceledDate"
+  val CANCEL_REQUESTED_NODE: String = "cancelRequestedNode"
   val FAILED_DATE: String = "failedDate"
 }
 
@@ -58,6 +59,7 @@ object CassandraTaskExecutionDetailsProjectionModule {
       .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text)
       
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      
.addColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, 
text)
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
     .build
 }
diff --git 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index 4e13454..8ff3093 100644
--- 
a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ 
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -60,14 +60,15 @@ object StartedDTO {
 
 case class CancelRequestedDTO(@JsonProperty("type") typeName: String,
                               @JsonProperty("aggregate") aggregateId: String,
-                              @JsonProperty("event") eventId: Int)
+                              @JsonProperty("event") eventId: Int,
+                              @JsonProperty("hostname") getHostname: String)
   extends TaskEventDTO(typeName, aggregateId, eventId) {
-  def toDomainObject: CancelRequested = CancelRequested(domainAggregateId, 
domainEventId)
+  def toDomainObject: CancelRequested = CancelRequested(domainAggregateId, 
domainEventId, Hostname(getHostname))
 }
 
 object CancelRequestedDTO {
   def fromDomainObject(event: CancelRequested, typeName: String): 
CancelRequestedDTO =
-    CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize())
+    CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize(), event.hostname.asString)
 }
 
 case class CompletedDTO(@JsonProperty("type") typeName: String,
diff --git 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
index b41c762..eaf03ec 100644
--- 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
+++ 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
@@ -81,7 +81,7 @@ class TaskEventsSerializationTest {
         return Stream.of(
             Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), 
"{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
             Arguments.of(new Started(AGGREGATE_ID, EVENT_ID, HOSTNAME), 
"{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\",\"hostname\":\"foo\"}"),
-            Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), 
"{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"),
+            Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID, 
HOSTNAME), 
"{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, 
Task.Result.COMPLETED), 
"{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, 
Task.Result.PARTIAL), 
"{\"result\":\"PARTIAL\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
             Arguments.of(new Failed(AGGREGATE_ID, EVENT_ID), 
"{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-failed\"}"),
@@ -89,4 +89,4 @@ class TaskEventsSerializationTest {
         );
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java 
b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index b065edd..90e88c0 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -133,7 +133,7 @@ public class MemoryTaskManager implements TaskManager {
     @Override
     public void cancel(TaskId id) {
         Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> 
{
-                
updateDetails(id).accept(TaskExecutionDetails::cancelRequested);
+                updateDetails(id).accept(taskExecutionDetails -> 
taskExecutionDetails.cancelRequested(hostname));
                 workQueue.cancel(id);
             }
         );
diff --git 
a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala 
b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
index 95b0d99..d08d879 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
@@ -44,6 +44,7 @@ class TaskExecutionDetails(val taskId: TaskId,
                            private val ranNode: Optional[Hostname] = 
Optional.empty(),
                            private val completedDate: Optional[ZonedDateTime] 
= Optional.empty(),
                            private val canceledDate: Optional[ZonedDateTime] = 
Optional.empty(),
+                           private val cancelRequestedNode: Optional[Hostname] 
= Optional.empty(),
                            private val failedDate: Optional[ZonedDateTime] = 
Optional.empty()) {
   def getTaskId: TaskId = taskId
 
@@ -65,6 +66,8 @@ class TaskExecutionDetails(val taskId: TaskId,
 
   def getCanceledDate: Optional[ZonedDateTime] = canceledDate
 
+  def getCancelRequestedNode: Optional[Hostname] = cancelRequestedNode
+
   def getFailedDate: Optional[ZonedDateTime] = failedDate
 
   def started(hostname: Hostname): TaskExecutionDetails = status match {
@@ -86,8 +89,8 @@ class TaskExecutionDetails(val taskId: TaskId,
   }
 
   def cancelRequested(hostname: Hostname): TaskExecutionDetails = status match 
{
-    case IN_PROGRESS => requestCancel
-    case WAITING => requestCancel
+    case IN_PROGRESS => requestCancel(hostname)
+    case WAITING => requestCancel(hostname)
     case _ => this
   }
 
@@ -113,12 +116,13 @@ class TaskExecutionDetails(val taskId: TaskId,
         Objects.equals(ranNode, that.ranNode) &&
         Objects.equals(completedDate, that.completedDate) &&
         Objects.equals(canceledDate, that.canceledDate) &&
+        Objects.equals(cancelRequestedNode, that.cancelRequestedNode) &&
         Objects.equals(failedDate, that.failedDate)
     case _ => false
   }
 
   override def hashCode(): Int =
-    Objects.hash(taskId, `type`, additionalInformation(), status, 
submittedDate, submittedNode, startedDate, ranNode, completedDate, 
canceledDate, failedDate)
+    Objects.hash(taskId, `type`, additionalInformation(), status, 
submittedDate, submittedNode, startedDate, ranNode, completedDate, 
canceledDate, cancelRequestedNode, failedDate)
 
   override def toString: String =
     MoreObjects.toStringHelper(this)
@@ -132,6 +136,7 @@ class TaskExecutionDetails(val taskId: TaskId,
       .add("ranNode", ranNode)
       .add("completedDate", completedDate)
       .add("canceledDate", canceledDate)
+      .add("cancelRequestedNode", cancelRequestedNode)
       .add("failedDate", failedDate)
       .toString
 
@@ -147,6 +152,7 @@ class TaskExecutionDetails(val taskId: TaskId,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
     ranNode = ranNode,
+    cancelRequestedNode = cancelRequestedNode,
     completedDate = Optional.of(ZonedDateTime.now))
   private def fail = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.FAILED,
     submittedDate = submittedDate,
@@ -154,13 +160,15 @@ class TaskExecutionDetails(val taskId: TaskId,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
     ranNode = ranNode,
+    cancelRequestedNode = cancelRequestedNode,
     failedDate = Optional.of(ZonedDateTime.now))
-  private def requestCancel = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.CANCEL_REQUESTED,
+  private def requestCancel(hostname: Hostname) = new 
TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
     ranNode = ranNode,
+    cancelRequestedNode = Optional.of(hostname),
     canceledDate = Optional.of(ZonedDateTime.now))
   private def cancel = new TaskExecutionDetails(taskId, `type`, 
TaskManager.Status.CANCELLED,
     submittedDate = submittedDate,
@@ -168,5 +176,6 @@ class TaskExecutionDetails(val taskId: TaskId,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
     ranNode = ranNode,
+    cancelRequestedNode = cancelRequestedNode,
     canceledDate = Optional.of(ZonedDateTime.now))
 }
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index cdc241e..d585e74 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -50,11 +50,12 @@ class StartCommandHandler(private val loadHistory: 
TaskAggregateId => History,
   }
 }
 
-class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[RequestCancel] {
+class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => 
History,
+                                  private val hostname: Hostname) extends 
TaskCommandHandler[RequestCancel] {
   override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
 
   override def handle(command: RequestCancel): util.List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).requestCancel()
+    loadAggregate(loadHistory, command.id).requestCancel(hostname)
   }
 }
 
@@ -80,4 +81,4 @@ class FailCommandHandler(private val loadHistory: 
TaskAggregateId => History) ex
   override def handle(command: Fail): util.List[_ <: Event] = {
     loadAggregate(loadHistory, command.id).fail()
   }
-}
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 7302b4b..f57c947 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -42,7 +42,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
     case Created(aggregateId, _, task, _) =>
       val taskWithId = new TaskWithId(aggregateId.taskId, task)
       workQueue.submit(taskWithId)
-    case CancelRequested(aggregateId, _) =>
+    case CancelRequested(aggregateId, _, _) =>
       workQueue.cancel(aggregateId.taskId)
     case _ =>
   }
@@ -54,7 +54,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
     handlers = Set(
       new CreateCommandHandler(loadHistory, hostname),
       new StartCommandHandler(loadHistory, hostname),
-      new RequestCancelCommandHandler(loadHistory),
+      new RequestCancelCommandHandler(loadHistory, hostname),
       new CompleteCommandHandler(loadHistory),
       new CancelCommandHandler(loadHistory),
       new FailCommandHandler(loadHistory)),
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
index 61d7d8f..fa0607e 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -34,7 +34,7 @@ case class Created(aggregateId: TaskAggregateId, override val 
eventId: EventId,
 
 case class Started(aggregateId: TaskAggregateId, override val eventId: 
EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId)
 
-case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: 
EventId) extends TaskEvent(aggregateId, eventId)
+case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: 
EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId)
 
 case class Completed(aggregateId: TaskAggregateId, override val eventId: 
EventId, result: Result) extends TaskEvent(aggregateId, eventId)
 
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 24aef85..ae86433 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -50,9 +50,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, 
private val histor
     }
   }
 
-  def requestCancel(): util.List[Event] = {
+  def requestCancel(hostname: Hostname): util.List[Event] = {
     currentStatus match {
-      case Some(status) if !status.isFinished => 
createEventWithId(CancelRequested(aggregateId, _))
+      case Some(status) if !status.isFinished => 
createEventWithId(CancelRequested(aggregateId, _, hostname))
       case _ => Nil.asJava
     }
   }
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 5654e0f..a6ffff2 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -29,7 +29,7 @@ trait TaskExecutionDetailsProjection {
     case created: Created =>
       update(TaskExecutionDetails.from(created.task, 
created.aggregateId.taskId, created.hostname))
     case cancelRequested: CancelRequested =>
-      update(cancelRequested.aggregateId.taskId)(_.cancelRequested)
+      update(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname))
     case started: Started =>
       update(started.aggregateId.taskId)(_.started(hostname))
     case completed: Completed =>
diff --git 
a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index b8773a8..281e217 100644
--- 
a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -95,4 +95,20 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
                 .extracting("hostname")
                 .containsOnly(HOSTNAME));
     }
+
+    @Test
+    void cancelRequestedTaskShouldKeepOriginHostname() {
+        TaskId taskId = taskManager.submit(() -> {
+            Thread.sleep(100);
+            return Task.Result.COMPLETED;
+        });
+        taskManager.cancel(taskId);
+
+        TaskAggregateId aggregateId = new TaskAggregateId(taskId);
+        CALMLY_AWAIT.untilAsserted(() ->
+            
assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents())
+                .filteredOn(event -> event instanceof CancelRequested)
+                .extracting("hostname")
+                .containsOnly(HOSTNAME));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to