This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1da890c47c4cc0eae8686a304bf2a923bec60819
Author: RĂ©mi KOWALSKI <[email protected]>
AuthorDate: Tue Oct 15 17:54:38 2019 +0200

    JAMES-2813 use timestamp in additional information to refuse stalled update 
event
---
 .../task/eventsourcing/DecisionProjection.scala    | 24 +++++++------
 .../james/task/eventsourcing/TaskAggregate.scala   | 21 ++++++-----
 .../task/eventsourcing/TaskAggregateTest.java      | 41 ++++++++++++++++++----
 3 files changed, 58 insertions(+), 28 deletions(-)

diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
index b5a460f..ce2f6d2 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -18,27 +18,29 @@
   * ***************************************************************/
 package org.apache.james.task.eventsourcing
 
+import java.time.Instant
+
 import org.apache.james.eventsourcing.Event
 import org.apache.james.task.TaskManager.Status
 
-case class DecisionProjection(status: Option[Status]) {
+case class DecisionProjection(status: Status, 
latestUpdateAdditionalInformationUpdate : Option[Instant]) {
   def update(event: Event): DecisionProjection = {
-    DecisionProjection(
       event match {
-        case event: Created => Some(Status.WAITING)
-        case event: Started => Some(Status.IN_PROGRESS)
-        case event: CancelRequested => Some(Status.CANCEL_REQUESTED)
-        case event: Cancelled => Some(Status.CANCELLED)
-        case event: Completed => Some(Status.COMPLETED)
-        case event: Failed => Some(Status.FAILED)
-        case event: AdditionalInformationUpdated => status
+        case _: Created => this
+        case _: Started => DecisionProjection(Status.IN_PROGRESS, None)
+        case _: CancelRequested => 
DecisionProjection(Status.CANCEL_REQUESTED,latestUpdateAdditionalInformationUpdate)
+        case event: Cancelled => DecisionProjection(Status.CANCELLED, 
event.additionalInformation.map(_.timestamp))
+        case event: Completed => DecisionProjection(Status.COMPLETED, 
event.additionalInformation.map(_.timestamp))
+        case event: Failed => DecisionProjection(Status.FAILED, 
event.additionalInformation.map(_.timestamp))
+        case event: AdditionalInformationUpdated => DecisionProjection(status, 
Some(event.additionalInformation.timestamp))
       }
-    )
   }
 
+  def additionalInformationIsOlderThan(timestamp: Instant) : Boolean = 
latestUpdateAdditionalInformationUpdate.forall(timestamp.isAfter)
+
 }
 
 object DecisionProjection {
-  def empty: DecisionProjection = DecisionProjection(None)
+  def initial(created : Created): DecisionProjection = 
DecisionProjection(Status.WAITING, None)
 }
 
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 1a4bbbb..d73cf65 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -31,22 +31,21 @@ import scala.collection.JavaConverters._
 
 class TaskAggregate private(val aggregateId: TaskAggregateId, private val 
history: History) {
 
-  history.getEvents.asScala.headOption match {
-    case Some(Created(_, _, _, _)) =>
+  val initialEvent = history.getEvents.asScala.headOption match {
+    case Some(created @ Created(_, _, _, _)) => created
     case _ => throw new IllegalArgumentException("History must start with 
Created event")
   }
 
-  private val currentStatus: Status = history
+  private val currentDecisionProjection: DecisionProjection = history
     .getEvents
     .asScala
-    .foldLeft(DecisionProjection.empty)((decision, event) => 
decision.update(event))
-    .status
-    .get
+    .tail
+    .foldLeft(DecisionProjection.initial(initialEvent))((decision, event) => 
decision.update(event))
 
   private def optionToJavaList[T](element: Option[T]): util.List[T] = 
element.toList.asJava
 
   private def createEventIfNotFinished(event: EventId => Event): Option[Event] 
= {
-    if (!currentStatus.isFinished) {
+    if (!currentDecisionProjection.status.isFinished) {
       Some(event(history.getNextEventId))
     } else
       None
@@ -63,11 +62,11 @@ class TaskAggregate private(val aggregateId: 
TaskAggregateId, private val histor
     createEventIfNotFinishedAsJavaList(CancelRequested(aggregateId, _, 
hostname))
 
   private[eventsourcing] def update(additionalInformation: 
AdditionalInformation): util.List[Event] =
-    (currentStatus match {
-      case Status.IN_PROGRESS => 
createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
-      case Status.CANCEL_REQUESTED => 
createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+    optionToJavaList(currentDecisionProjection.status match {
+      case Status.IN_PROGRESS if 
currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp)
 => createEvent(AdditionalInformationUpdated(aggregateId, _, 
additionalInformation))
+      case Status.CANCEL_REQUESTED if 
currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp)
 => createEvent(AdditionalInformationUpdated(aggregateId, _, 
additionalInformation))
       case _ => None
-    }).toList.asJava
+    })
 
   private[eventsourcing] def complete(result: Result, additionalInformation: 
Option[AdditionalInformation]): util.List[Event] =
     createEventIfNotFinishedAsJavaList(Completed(aggregateId, _, result, 
additionalInformation))
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
index bb98b26..e12f8bc 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -66,7 +66,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() 
{
+    void 
givenNoStartedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
         );
@@ -75,7 +75,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+    void 
givenInProgressTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME)
@@ -86,7 +86,36 @@ class TaskAggregateTest {
     }
 
     @Test
-    void 
givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+    void 
givenInProgressTaskWithOneNewerUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand()
 {
+        History history = buildHistory(
+            eventId -> Created.apply(ID, eventId, new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+            eventId -> Started.apply(ID, eventId, HOSTNAME),
+            eventId -> AdditionalInformationUpdated.apply(ID, eventId, new 
MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+        );
+        TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+        Instant newEventTime = TaskAggregateTest.timestamp.plusSeconds(3);
+        MemoryReferenceWithCounterTask.AdditionalInformation 
youngerAdditionalInformation = new 
MemoryReferenceWithCounterTask.AdditionalInformation(3, newEventTime);
+        assertThat(aggregate.update(youngerAdditionalInformation))
+            .isNotEmpty()
+            .anySatisfy(event -> assertThat(event)
+                .isInstanceOfSatisfying(AdditionalInformationUpdated.class,
+                    additionalInformationUpdated -> 
assertThat(additionalInformationUpdated.additionalInformation().timestamp()).isEqualTo(newEventTime)));
+    }
+
+    @Test
+    void 
givenInProgressTaskWithOneStalledUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand()
 {
+        History history = buildHistory(
+            eventId -> Created.apply(ID, eventId, new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+            eventId -> Started.apply(ID, eventId, HOSTNAME),
+            eventId -> AdditionalInformationUpdated.apply(ID, eventId, new 
MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+        );
+        TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+        MemoryReferenceWithCounterTask.AdditionalInformation 
olderAdditionalInformation = new 
MemoryReferenceWithCounterTask.AdditionalInformation(3, 
timestamp.minusSeconds(3));
+        assertThat(aggregate.update(olderAdditionalInformation)).isEmpty();
+    }
+
+    @Test
+    void 
givenCancelRequestedTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() 
{
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME),
@@ -98,7 +127,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() 
{
+    void 
givenCompletedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -110,7 +139,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void 
givenFailedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -122,7 +151,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void 
givenCancelTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new 
MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to