This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1da890c47c4cc0eae8686a304bf2a923bec60819 Author: Rémi KOWALSKI <[email protected]> AuthorDate: Tue Oct 15 17:54:38 2019 +0200 JAMES-2813 use timestamp in additional information to refuse stalled update event --- .../task/eventsourcing/DecisionProjection.scala | 24 +++++++------ .../james/task/eventsourcing/TaskAggregate.scala | 21 ++++++----- .../task/eventsourcing/TaskAggregateTest.java | 41 ++++++++++++++++++---- 3 files changed, 58 insertions(+), 28 deletions(-) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala index b5a460f..ce2f6d2 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala @@ -18,27 +18,29 @@ * ***************************************************************/ package org.apache.james.task.eventsourcing +import java.time.Instant + import org.apache.james.eventsourcing.Event import org.apache.james.task.TaskManager.Status -case class DecisionProjection(status: Option[Status]) { +case class DecisionProjection(status: Status, latestUpdateAdditionalInformationUpdate : Option[Instant]) { def update(event: Event): DecisionProjection = { - DecisionProjection( event match { - case event: Created => Some(Status.WAITING) - case event: Started => Some(Status.IN_PROGRESS) - case event: CancelRequested => Some(Status.CANCEL_REQUESTED) - case event: Cancelled => Some(Status.CANCELLED) - case event: Completed => Some(Status.COMPLETED) - case event: Failed => Some(Status.FAILED) - case event: AdditionalInformationUpdated => status + case _: Created => this + case _: Started => DecisionProjection(Status.IN_PROGRESS, None) + case _: CancelRequested => DecisionProjection(Status.CANCEL_REQUESTED,latestUpdateAdditionalInformationUpdate) + case event: Cancelled => DecisionProjection(Status.CANCELLED, event.additionalInformation.map(_.timestamp)) + case event: Completed => DecisionProjection(Status.COMPLETED, event.additionalInformation.map(_.timestamp)) + case event: Failed => DecisionProjection(Status.FAILED, event.additionalInformation.map(_.timestamp)) + case event: AdditionalInformationUpdated => DecisionProjection(status, Some(event.additionalInformation.timestamp)) } - ) } + def additionalInformationIsOlderThan(timestamp: Instant) : Boolean = latestUpdateAdditionalInformationUpdate.forall(timestamp.isAfter) + } object DecisionProjection { - def empty: DecisionProjection = DecisionProjection(None) + def initial(created : Created): DecisionProjection = DecisionProjection(Status.WAITING, None) } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 1a4bbbb..d73cf65 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -31,22 +31,21 @@ import scala.collection.JavaConverters._ class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) { - history.getEvents.asScala.headOption match { - case Some(Created(_, _, _, _)) => + val initialEvent = history.getEvents.asScala.headOption match { + case Some(created @ Created(_, _, _, _)) => created case _ => throw new IllegalArgumentException("History must start with Created event") } - private val currentStatus: Status = history + private val currentDecisionProjection: DecisionProjection = history .getEvents .asScala - .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event)) - .status - .get + .tail + .foldLeft(DecisionProjection.initial(initialEvent))((decision, event) => decision.update(event)) private def optionToJavaList[T](element: Option[T]): util.List[T] = element.toList.asJava private def createEventIfNotFinished(event: EventId => Event): Option[Event] = { - if (!currentStatus.isFinished) { + if (!currentDecisionProjection.status.isFinished) { Some(event(history.getNextEventId)) } else None @@ -63,11 +62,11 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor createEventIfNotFinishedAsJavaList(CancelRequested(aggregateId, _, hostname)) private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = - (currentStatus match { - case Status.IN_PROGRESS => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) - case Status.CANCEL_REQUESTED => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + optionToJavaList(currentDecisionProjection.status match { + case Status.IN_PROGRESS if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + case Status.CANCEL_REQUESTED if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) case _ => None - }).toList.asJava + }) private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = createEventIfNotFinishedAsJavaList(Completed(aggregateId, _, result, additionalInformation)) diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java index bb98b26..e12f8bc 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java @@ -66,7 +66,7 @@ class TaskAggregateTest { } @Test - void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + void givenNoStartedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() { History history = buildHistory( eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME) ); @@ -75,7 +75,7 @@ class TaskAggregateTest { } @Test - void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() { + void givenInProgressTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() { History history = buildHistory( eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), eventId -> Started.apply(ID, eventId, HOSTNAME) @@ -86,7 +86,36 @@ class TaskAggregateTest { } @Test - void givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() { + void givenInProgressTaskWithOneNewerUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() { + History history = buildHistory( + eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp)) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + Instant newEventTime = TaskAggregateTest.timestamp.plusSeconds(3); + MemoryReferenceWithCounterTask.AdditionalInformation youngerAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, newEventTime); + assertThat(aggregate.update(youngerAdditionalInformation)) + .isNotEmpty() + .anySatisfy(event -> assertThat(event) + .isInstanceOfSatisfying(AdditionalInformationUpdated.class, + additionalInformationUpdated -> assertThat(additionalInformationUpdated.additionalInformation().timestamp()).isEqualTo(newEventTime))); + } + + @Test + void givenInProgressTaskWithOneStalledUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() { + History history = buildHistory( + eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp)) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + MemoryReferenceWithCounterTask.AdditionalInformation olderAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp.minusSeconds(3)); + assertThat(aggregate.update(olderAdditionalInformation)).isEmpty(); + } + + @Test + void givenCancelRequestedTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() { History history = buildHistory( eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), eventId -> Started.apply(ID, eventId, HOSTNAME), @@ -98,7 +127,7 @@ class TaskAggregateTest { } @Test - void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + void givenCompletedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() { MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), @@ -110,7 +139,7 @@ class TaskAggregateTest { } @Test - void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + void givenFailedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() { MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), @@ -122,7 +151,7 @@ class TaskAggregateTest { } @Test - void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + void givenCancelTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() { MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
