This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4965b71ff4d4ec81afb7100716195b76efbc8c20
Author: Matthieu Baechler <[email protected]>
AuthorDate: Thu Nov 28 16:44:17 2019 +0100

    JAMES-2813 support nested task in serialization of task events
---
 .../james/modules/TaskSerializationModule.java     | 47 ++++++++++++++++------
 .../distributed/TasksSerializationModule.java      | 30 ++++++++------
 .../eventsourcing/distributed/TaskEventDTO.scala   | 28 ++++++++-----
 .../distributed/DistributedTaskManagerTest.java    | 36 ++++++++++++-----
 .../RabbitMQTerminationSubscriberTest.java         |  2 +-
 .../distributed/TaskEventsSerializationTest.java   | 11 +++--
 .../james/server/task/json/JsonTaskSerializer.java |  4 +-
 7 files changed, 109 insertions(+), 49 deletions(-)

diff --git 
a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/TaskSerializationModule.java
 
b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/TaskSerializationModule.java
index a30fe93..f75f0d8 100644
--- 
a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/TaskSerializationModule.java
+++ 
b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/TaskSerializationModule.java
@@ -44,7 +44,9 @@ import 
org.apache.james.rrt.cassandra.migration.MappingsSourcesMigrationTaskAddi
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
 import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import 
org.apache.james.task.eventsourcing.distributed.TasksSerializationModule;
 import org.apache.james.vault.blob.BlobStoreVaultGarbageCollectionTask;
@@ -107,34 +109,53 @@ public class TaskSerializationModule extends 
AbstractModule {
         return new DTOConverter<>(modules);
     }
 
+    @Provides
+    @Singleton
+    public DTOConverter<Task, TaskDTO> taskDTOConverter(Set<TaskDTOModule<?, 
?>> taskDTOModules) {
+        return new DTOConverter<>(taskDTOModules);
+
+    }
+
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> taskCreatedSerialization(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return TasksSerializationModule.CREATED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> taskCreatedSerialization(JsonTaskSerializer 
jsonTaskSerializer,
+                                                         
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                         DTOConverter<Task, 
TaskDTO> taskConverter) {
+        return TasksSerializationModule.CREATED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> taskStartedSerialization(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return TasksSerializationModule.STARTED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> taskStartedSerialization(JsonTaskSerializer 
jsonTaskSerializer,
+                                                         
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                         DTOConverter<Task, 
TaskDTO> taskConverter) {
+        return TasksSerializationModule.STARTED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> 
taskCancelRequestedSerialization(JsonTaskSerializer jsonTaskSerializer, 
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return 
TasksSerializationModule.CANCEL_REQUESTED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> 
taskCancelRequestedSerialization(JsonTaskSerializer jsonTaskSerializer,
+                                                                 
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                                 
DTOConverter<Task, TaskDTO> taskConverter) {
+        return 
TasksSerializationModule.CANCEL_REQUESTED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> taskCancelledSerialization(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return TasksSerializationModule.CANCELLED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> taskCancelledSerialization(JsonTaskSerializer 
jsonTaskSerializer,
+                                                           
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                           DTOConverter<Task, 
TaskDTO> taskConverter) {
+        return TasksSerializationModule.CANCELLED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> taskCompletedSerialization(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return TasksSerializationModule.COMPLETED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> taskCompletedSerialization(JsonTaskSerializer 
jsonTaskSerializer,
+                                                           
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                           DTOConverter<Task, 
TaskDTO> taskConverter) {
+        return TasksSerializationModule.COMPLETED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
-    public EventDTOModule<?, ?> taskFailedSerialization(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
-        return TasksSerializationModule.FAILED.create(jsonTaskSerializer, 
additionalInformationConverter);
+    public EventDTOModule<?, ?> taskFailedSerialization(JsonTaskSerializer 
jsonTaskSerializer,
+                                                        
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                                        DTOConverter<Task, 
TaskDTO> taskConverter) {
+        return TasksSerializationModule.FAILED.create(jsonTaskSerializer, 
additionalInformationConverter, taskConverter);
     }
 
     @ProvidesIntoSet
@@ -350,7 +371,7 @@ public class TaskSerializationModule extends AbstractModule 
{
     @Named(EVENT_NESTED_TYPES_INJECTION_NAME)
     @Provides
     public Set<DTOModule<?, ?>> 
eventNestedTypes(Set<AdditionalInformationDTOModule<?, ?>> 
additionalInformationDTOModules,
-                                            Set<TaskDTOModule<?, ?>> 
taskDTOModules) {
+                                                 Set<TaskDTOModule<?, ?>> 
taskDTOModules) {
         return Sets.union(additionalInformationDTOModules, taskDTOModules);
     }
 }
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
index f613479..f158d0f 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
@@ -26,6 +26,8 @@ import 
org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.json.DTOConverter;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.eventsourcing.AdditionalInformationUpdated;
 import org.apache.james.task.eventsourcing.CancelRequested;
@@ -40,18 +42,20 @@ import com.github.steveash.guavate.Guavate;
 public interface TasksSerializationModule {
     @FunctionalInterface
     interface TaskSerializationModuleFactory {
-        EventDTOModule<?, ?> create(JsonTaskSerializer taskSerializer, 
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter);
+        EventDTOModule<?, ?> create(JsonTaskSerializer taskSerializer,
+                                    
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                    DTOConverter<Task, TaskDTO> dtoConverter);
     }
 
-    TaskSerializationModuleFactory CREATED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory CREATED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(Created.class)
         .convertToDTO(CreatedDTO.class)
-        .toDomainObjectConverter(dto -> dto.toDomainObject(jsonTaskSerializer))
-        .toDTOConverter((event, typeName) -> 
CreatedDTO.fromDomainObject(event, typeName, jsonTaskSerializer))
+        .toDomainObjectConverter(dto -> dto.toDomainObject(dtoConverter))
+        .toDTOConverter((event, typeName) -> 
CreatedDTO.fromDomainObject(dtoConverter, event, typeName))
         .typeName("task-manager-created")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory STARTED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory STARTED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(Started.class)
         .convertToDTO(StartedDTO.class)
         .toDomainObjectConverter(StartedDTO::toDomainObject)
@@ -59,7 +63,7 @@ public interface TasksSerializationModule {
         .typeName("task-manager-started")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory CANCEL_REQUESTED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory CANCEL_REQUESTED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(CancelRequested.class)
         .convertToDTO(CancelRequestedDTO.class)
         .toDomainObjectConverter(CancelRequestedDTO::toDomainObject)
@@ -67,7 +71,7 @@ public interface TasksSerializationModule {
         .typeName("task-manager-cancel-requested")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory COMPLETED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory COMPLETED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(Completed.class)
         .convertToDTO(CompletedDTO.class)
         .toDomainObjectConverter(dto -> 
dto.toDomainObject(additionalInformationConverter))
@@ -75,7 +79,7 @@ public interface TasksSerializationModule {
         .typeName("task-manager-completed")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory FAILED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory FAILED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(Failed.class)
         .convertToDTO(FailedDTO.class)
         .toDomainObjectConverter(dto -> 
dto.toDomainObject(additionalInformationConverter))
@@ -83,7 +87,7 @@ public interface TasksSerializationModule {
         .typeName("task-manager-failed")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory CANCELLED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory CANCELLED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(Cancelled.class)
         .convertToDTO(CancelledDTO.class)
         .toDomainObjectConverter(dto -> 
dto.toDomainObject(additionalInformationConverter))
@@ -91,7 +95,7 @@ public interface TasksSerializationModule {
         .typeName("task-manager-cancelled")
         .withFactory(EventDTOModule::new);
 
-    TaskSerializationModuleFactory UPDATED = (jsonTaskSerializer, 
additionalInformationConverter) -> EventDTOModule
+    TaskSerializationModuleFactory UPDATED = (jsonTaskSerializer, 
additionalInformationConverter, dtoConverter) -> EventDTOModule
         .forEvent(AdditionalInformationUpdated.class)
         .convertToDTO(AdditionalInformationUpdatedDTO.class)
         .toDomainObjectConverter(dto -> 
dto.toDomainObject(additionalInformationConverter))
@@ -99,10 +103,12 @@ public interface TasksSerializationModule {
         .typeName("task-manager-updated")
         .withFactory(EventDTOModule::new);
 
-    static Set<EventDTOModule<?, ?>> list(JsonTaskSerializer 
jsonTaskSerializer, DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter) {
+    static Set<EventDTOModule<?, ?>> list(JsonTaskSerializer 
jsonTaskSerializer,
+                                          
DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> additionalInformationConverter,
+                                          DTOConverter<Task, TaskDTO> 
dtoConverter) {
         return Stream
             .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, 
FAILED, UPDATED)
-            .map(moduleFactory -> moduleFactory.create(jsonTaskSerializer, 
additionalInformationConverter))
+            .map(moduleFactory -> moduleFactory.create(jsonTaskSerializer, 
additionalInformationConverter, dtoConverter))
             .collect(Guavate.toImmutableSet());
     }
 }
diff --git 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index a886427..fa058c1 100644
--- 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -27,23 +27,26 @@ import org.apache.james.eventsourcing.EventId
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO
 import org.apache.james.json.DTOConverter
 import org.apache.james.server.task.json.JsonTaskSerializer
-import org.apache.james.server.task.json.dto.AdditionalInformationDTO
+import org.apache.james.server.task.json.dto.{AdditionalInformationDTO, 
TaskDTO}
 import org.apache.james.task.TaskExecutionDetails.AdditionalInformation
 import org.apache.james.task.eventsourcing._
-import 
org.apache.james.task.eventsourcing.distributed.distributed.AdditionalInformationConverter
+import 
org.apache.james.task.eventsourcing.distributed.distributed.{AdditionalInformationConverter,
 TaskConverter}
 import org.apache.james.task.{Hostname, Task, TaskExecutionDetails, TaskId}
 
 import scala.compat.java8.OptionConverters._
 
 package object distributed {
   type AdditionalInformationConverter = 
DTOConverter[TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO]
+  type TaskConverter = DTOConverter[Task, TaskDTO]
 }
 
-class NestedAdditionalInformationDTODeserializerNotFound(val dto: 
AdditionalInformationDTO) extends RuntimeException("Unable to find a 
deserializer for " + dto) {
-}
+class NestedTaskDTOSerializerNotFound(val domainObject: Task) extends 
RuntimeException("Unable to find a serializer for " + domainObject) { }
 
-class NestedAdditionalInformationDTOSerializerNotFound(val domainObject: 
AdditionalInformation) extends RuntimeException("Unable to find a serializer 
for " + domainObject) {
-}
+class NestedTaskDTODeserializerNotFound(val dto: TaskDTO) extends 
RuntimeException("Unable to find a deserializer for " + dto) { }
+
+class NestedAdditionalInformationDTODeserializerNotFound(val dto: 
AdditionalInformationDTO) extends RuntimeException("Unable to find a 
deserializer for " + dto) { }
+
+class NestedAdditionalInformationDTOSerializerNotFound(val domainObject: 
AdditionalInformation) extends RuntimeException("Unable to find a serializer 
for " + domainObject) { }
 
 sealed abstract class TaskEventDTO(val getType: String, val getAggregate: 
String, val getEvent: Int) extends EventDTO {
   protected def domainAggregateId: TaskAggregateId = 
TaskAggregateId(TaskId.fromString(getAggregate))
@@ -54,15 +57,20 @@ sealed abstract class TaskEventDTO(val getType: String, val 
getAggregate: String
 case class CreatedDTO(@JsonProperty("type") typeName: String,
                       @JsonProperty("aggregate") aggregateId: String,
                       @JsonProperty("event") eventId: Int,
-                      @JsonProperty("task") getTask: String,
+                      @JsonProperty("task") getTask: TaskDTO,
                       @JsonProperty("hostname") getHostname: String)
   extends TaskEventDTO(typeName, aggregateId, eventId) {
-  def toDomainObject(serializer: JsonTaskSerializer): Created = 
Created(domainAggregateId, domainEventId, serializer.deserialize(getTask), 
Hostname(getHostname))
+  def toDomainObject(taskConverter: TaskConverter): Created = {
+    val task: Task = taskConverter.toDomainObject(getTask).orElseThrow(() => 
new NestedTaskDTODeserializerNotFound(getTask))
+    Created(domainAggregateId, domainEventId, task, Hostname(getHostname))
+  }
 }
 
 object CreatedDTO {
-  def fromDomainObject(event: Created, typeName: String, serializer: 
JsonTaskSerializer): CreatedDTO =
-    CreatedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize(), serializer.serialize(event.task), 
event.hostname.asString)
+  def fromDomainObject(taskConverter: TaskConverter)(event: Created, typeName: 
String): CreatedDTO = {
+    val taskDTO = taskConverter.toDTO(event.task).orElseThrow(() => new 
NestedTaskDTOSerializerNotFound(event.task))
+    CreatedDTO(typeName, event.aggregateId.taskId.asString(), 
event.eventId.serialize(), taskDTO, event.hostname.asString)
+  }
 }
 
 case class StartedDTO(@JsonProperty("type") typeName: String,
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 1b5a5de..d479199 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -49,6 +49,8 @@ import 
org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
 import org.apache.james.server.task.json.dto.MemoryReferenceTaskStore;
 import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO;
 import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskStore;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
 import org.apache.james.task.CountDownLatchExtension;
@@ -74,6 +76,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
 
@@ -101,8 +105,9 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     }
 
     public static final AdditionalInformationDTOModule<?, ?> 
ADDITIONAL_INFORMATION_MODULE = 
MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE;
+
     static final JsonTaskAdditionalInformationSerializer 
JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = 
JsonTaskAdditionalInformationSerializer.of(ADDITIONAL_INFORMATION_MODULE);
-    static final DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> DTO_CONVERTER = 
DTOConverter.of(ADDITIONAL_INFORMATION_MODULE);
+    static final DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER = 
DTOConverter.of(ADDITIONAL_INFORMATION_MODULE);
     static final Hostname HOSTNAME = new Hostname("foo");
     static final Hostname HOSTNAME_2 = new Hostname("bar");
 
@@ -118,18 +123,31 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             CassandraZonedDateTimeModule.MODULE,
             CassandraTaskExecutionDetailsProjectionModule.MODULE()));
 
-    JsonTaskSerializer taskSerializer = JsonTaskSerializer.of(
-        TestTaskDTOModules.COMPLETED_TASK_MODULE,
-        TestTaskDTOModules.FAILED_TASK_MODULE,
-        TestTaskDTOModules.THROWING_TASK_MODULE,
-        TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(new 
MemoryReferenceTaskStore()),
-        TestTaskDTOModules.MEMORY_REFERENCE_WITH_COUNTER_TASK_MODULE.apply(new 
MemoryReferenceWithCounterTaskStore()));
+    MemoryReferenceTaskStore memoryReferenceTaskStore = new 
MemoryReferenceTaskStore();
+    MemoryReferenceWithCounterTaskStore memoryReferenceWithCounterTaskStore = 
new MemoryReferenceWithCounterTaskStore();
+
+    ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules =
+        ImmutableSet.of(
+            TestTaskDTOModules.COMPLETED_TASK_MODULE,
+            TestTaskDTOModules.FAILED_TASK_MODULE,
+            TestTaskDTOModules.THROWING_TASK_MODULE,
+            
TestTaskDTOModules.MEMORY_REFERENCE_TASK_MODULE.apply(memoryReferenceTaskStore),
+            
TestTaskDTOModules.MEMORY_REFERENCE_WITH_COUNTER_TASK_MODULE.apply(memoryReferenceWithCounterTaskStore));
+
+    JsonTaskSerializer taskSerializer = new JsonTaskSerializer(taskDTOModules);
+
+    DTOConverter<Task, TaskDTO> taskDTOConverter = new 
DTOConverter<>(taskDTOModules);
 
-    Set<EventDTOModule<?, ?>> eventDtoModule = 
TasksSerializationModule.list(taskSerializer, DTO_CONVERTER);
+    Set<EventDTOModule<?, ?>> eventDtoModule = 
TasksSerializationModule.list(taskSerializer, 
TASK_ADDITIONAL_INFORMATION_DTO_CONVERTER, taskDTOConverter);
 
     @RegisterExtension
     CassandraEventStoreExtension eventStoreExtension = new 
CassandraEventStoreExtension(CASSANDRA_CLUSTER,
-        
JsonEventSerializer.forModules(eventDtoModule).withNestedTypeModules(ADDITIONAL_INFORMATION_MODULE));
+        JsonEventSerializer.forModules(eventDtoModule)
+            .withNestedTypeModules(
+                Sets.union(
+                    ImmutableSet.of(ADDITIONAL_INFORMATION_MODULE),
+                    taskDTOModules
+                )));
 
     @RegisterExtension
     CountDownLatchExtension countDownLatchExtension = new 
CountDownLatchExtension();
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 7e358cd..dcb5a36 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -43,7 +43,7 @@ import reactor.core.publisher.Flux;
 
 class RabbitMQTerminationSubscriberTest implements 
TerminationSubscriberContract {
     private static final JsonTaskSerializer TASK_SERIALIZER = 
JsonTaskSerializer.of();
-    private static final Set<EventDTOModule<?, ?>> MODULES = 
TasksSerializationModule.list(TASK_SERIALIZER, DTOConverter.of());
+    private static final Set<EventDTOModule<?, ?>> MODULES = 
TasksSerializationModule.list(TASK_SERIALIZER, DTOConverter.of(), 
DTOConverter.of());
     private static final JsonEventSerializer SERIALIZER = 
JsonEventSerializer.forModules(MODULES).withoutNestedType();
 
     @RegisterExtension
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
index 0b5d1e0..bad55dd 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
@@ -34,6 +34,7 @@ import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO;
 import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskStore;
+import org.apache.james.server.task.json.dto.TaskDTO;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
 import org.apache.james.task.CompletedTask;
 import org.apache.james.task.Hostname;
@@ -59,6 +60,7 @@ import scala.Option;
 class TaskEventsSerializationTest {
     static final Instant TIMESTAMP = Instant.parse("2018-11-13T12:00:55Z");
     static final DTOConverter<TaskExecutionDetails.AdditionalInformation, 
AdditionalInformationDTO> ADDITIONAL_INFORMATION_CONVERTER = 
DTOConverter.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE);
+    static final DTOConverter<Task, TaskDTO> TASK_CONVERTER = 
DTOConverter.of(TestTaskDTOModules.COMPLETED_TASK_MODULE);
     static final JsonTaskAdditionalInformationSerializer 
TASK_ADDITIONNAL_INFORMATION_SERIALIZER = 
JsonTaskAdditionalInformationSerializer.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE);
     static final TaskAggregateId AGGREGATE_ID = new 
TaskAggregateId(TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd"));
     static final EventId EVENT_ID = EventId.fromSerialized(42);
@@ -70,11 +72,14 @@ class TaskEventsSerializationTest {
         JsonTaskSerializer.of(
             TestTaskDTOModules.COMPLETED_TASK_MODULE,
             
TestTaskDTOModules.MEMORY_REFERENCE_WITH_COUNTER_TASK_MODULE.apply(new 
MemoryReferenceWithCounterTaskStore())),
-        ADDITIONAL_INFORMATION_CONVERTER);
+        ADDITIONAL_INFORMATION_CONVERTER,
+        TASK_CONVERTER);
 
     JsonEventSerializer serializer = JsonEventSerializer
         .forModules(list)
-        
.withNestedTypeModules(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE);
+        .withNestedTypeModules(
+            
MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE,
+            TestTaskDTOModules.COMPLETED_TASK_MODULE);
 
     @ParameterizedTest
     @MethodSource
@@ -98,7 +103,7 @@ class TaskEventsSerializationTest {
 
     static Stream<Arguments> validTasks() throws Exception {
         return Stream.of(
-            Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), 
"{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
+            Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), 
"{\"task\":{\"type\":\"completed-task\"},\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
             Arguments.of(new Started(AGGREGATE_ID, EVENT_ID, HOSTNAME), 
"{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\",\"hostname\":\"foo\"}"),
             Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID, 
HOSTNAME), 
"{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, 
Task.Result.COMPLETED, Option.empty()), 
"{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
diff --git 
a/server/task/task-json/src/main/java/org/apache/james/server/task/json/JsonTaskSerializer.java
 
b/server/task/task-json/src/main/java/org/apache/james/server/task/json/JsonTaskSerializer.java
index 31f4a77..f87bcff 100644
--- 
a/server/task/task-json/src/main/java/org/apache/james/server/task/json/JsonTaskSerializer.java
+++ 
b/server/task/task-json/src/main/java/org/apache/james/server/task/json/JsonTaskSerializer.java
@@ -30,6 +30,7 @@ import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.task.Task;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
 public class JsonTaskSerializer {
@@ -53,7 +54,8 @@ public class JsonTaskSerializer {
     private JsonGenericSerializer<Task, TaskDTO> jsonGenericSerializer;
 
     @Inject
-    private JsonTaskSerializer(Set<TaskDTOModule<?, ?>> modules) {
+    @VisibleForTesting
+    public JsonTaskSerializer(Set<TaskDTOModule<?, ?>> modules) {
         jsonGenericSerializer = 
JsonGenericSerializer.forModules(modules).withoutNestedType();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to