This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b2a0fd184619f4c6bbddec6062e6b162b1d45b21
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Mon Jun 6 15:31:31 2022 +0700

    JAMES-3774 Migrate server/task/task-distributed to Cassandra driver 4
---
 .../eventstore/cassandra/EventStoreDao.scala       |   6 +-
 .../distributed/RabbitMQWorkQueue.java             |   6 +-
 ...assandraTaskExecutionDetailsProjectionDAO.scala |  67 +++++++------
 ...andraTaskExecutionDetailsProjectionModule.scala |  35 +++----
 .../distributed/DistributedTaskManagerTest.java    | 109 ++++++++++++++++++++-
 .../apache/james/task/SerialTaskManagerWorker.java |  41 +++++---
 .../eventsourcing/EventSourcingTaskManager.scala   |   5 +-
 .../TaskExecutionDetailsProjection.scala           |  39 +++++---
 8 files changed, 223 insertions(+), 85 deletions(-)

diff --git 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
index 33c8219bbe..f99a231d02 100644
--- 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
+++ 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
@@ -29,6 +29,7 @@ import 
org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
 import org.apache.james.eventsourcing.eventstore.History
 import 
org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID,
 EVENT, EVENTS_TABLE, EVENT_ID}
 import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.apache.james.util.ReactorUtils
 import reactor.core.scala.publisher.{SFlux, SMono}
 
 class EventStoreDao @Inject() (val session: CqlSession,
@@ -80,12 +81,13 @@ class EventStoreDao @Inject() (val session: CqlSession,
       .setExecutionProfile(executionProfile)
     val rows: SFlux[Row] = 
SFlux[Row](cassandraAsyncExecutor.executeRows(preparedStatement))
 
-    val events: SFlux[Event] = rows.map(toEvent)
+    val events: SFlux[Event] = rows.concatMap(toEvent)
     val listEvents: SMono[List[Event]] = events.collectSeq()
       .map(_.toList)
 
     listEvents.map(History.of(_))
   }
 
-  private def toEvent(row: Row): Event = 
jsonEventSerializer.deserialize(row.getString(EVENT))
+  private def toEvent(row: Row): SMono[Event] = SMono.fromCallable(() => 
jsonEventSerializer.deserialize(row.getString(EVENT)))
+    .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
 }
\ No newline at end of file
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index f74f5a81d3..3d6497b642 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -182,8 +182,10 @@ public class RabbitMQWorkQueue implements WorkQueue {
             .onErrorResume(error -> {
                 String errorMessage = String.format("Unable to run submitted 
Task %s", taskId.asString());
                 LOGGER.warn(errorMessage, error);
-                return Mono.from(worker.fail(taskId, task.details(), 
errorMessage, error))
-                    .then(Mono.empty());
+                return Mono.fromCallable(task::details)
+                    .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                    .flatMap(details ->  Mono.from(worker.fail(taskId, 
details, errorMessage, error))
+                    .then(Mono.empty()));
             });
     }
 
diff --git 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index f2332c19da..8e2422dd01 100644
--- 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -20,20 +20,23 @@ package org.apache.james.task.eventsourcing.cassandra
 
 import java.util.Optional
 
-import com.datastax.driver.core.querybuilder.QueryBuilder
-import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, 
insertInto, select}
-import com.datastax.driver.core.{BoundStatement, Row, Session, UDTValue}
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.{BoundStatement, Row}
+import com.datastax.oss.driver.api.core.data.UdtValue
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, 
insertInto, selectFrom}
 import javax.inject.Inject
 import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, 
CassandraZonedDateTimeModule}
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
 import 
org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer
 import org.apache.james.task._
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
+import org.apache.james.util.ReactorUtils
 import reactor.core.publisher.{Flux, Mono}
+import reactor.core.scala.publisher.SMono
 
 import scala.compat.java8.OptionConverters._
 
-class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, 
typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: 
JsonTaskAdditionalInformationSerializer) {
+class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: 
CqlSession, typesProvider: CassandraTypesProvider, 
jsonTaskAdditionalInformationSerializer: 
JsonTaskAdditionalInformationSerializer) {
   private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
   private val dateType = 
typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
 
@@ -50,19 +53,23 @@ class CassandraTaskExecutionDetailsProjectionDAO 
@Inject()(session: Session, typ
     .value(CANCEL_REQUESTED_NODE, bindMarker(CANCEL_REQUESTED_NODE))
     .value(FAILED_DATE, bindMarker(FAILED_DATE))
     .value(ADDITIONAL_INFORMATION, bindMarker(ADDITIONAL_INFORMATION))
-  )
+    .build())
 
-  private val selectStatement = session.prepare(select().from(TABLE_NAME)
-    .where(QueryBuilder.eq(TASK_ID, bindMarker(TASK_ID))))
+  private val selectStatement = session.prepare(selectFrom(TABLE_NAME)
+    .all()
+    .whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID))
+    .build())
 
-  private val listStatement = session.prepare(select().from(TABLE_NAME))
+  private val listStatement = 
session.prepare(selectFrom(TABLE_NAME).all().build())
 
-  def saveDetails(details: TaskExecutionDetails): Mono[Void] = {
-    val boundStatement =  insertStatement.bind
-      .setUUID(TASK_ID, details.getTaskId.getValue)
+  def saveDetails(details: TaskExecutionDetails): Mono[Void] =
+    Mono.from(serializeAdditionalInformation(details)
+      .flatMap(serializeAdditionalInformation => {
+    val boundStatement =  insertStatement.bind()
+      .setUuid(TASK_ID, details.getTaskId.getValue)
       .setString(TYPE, details.getType.asString())
       .setString(STATUS, details.getStatus.getValue)
-      .setUDTValue(SUBMITTED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
+      .setUdtValue(SUBMITTED_DATE, 
CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
       .setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
 
     val bindOptionalFieldOperations = List(
@@ -72,36 +79,35 @@ class CassandraTaskExecutionDetailsProjectionDAO 
@Inject()(session: Session, typ
       (statement: BoundStatement) => bindOptionalUDTValue(statement, 
CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getCanceledDate)),
       (statement: BoundStatement) => bindOptionalStringValue(statement, 
CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)),
       (statement: BoundStatement) => bindOptionalUDTValue(statement, 
FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, 
details.getFailedDate)),
-      (statement: BoundStatement) => bindOptionalStringValue(statement, 
ADDITIONAL_INFORMATION, serializeAdditionalInformation(details)),
+      (statement: BoundStatement) => bindOptionalStringValue(statement, 
ADDITIONAL_INFORMATION, serializeAdditionalInformation),
     )
 
     val fullyBoundStatement = 
bindOptionalFieldOperations.foldLeft(boundStatement)((statement, 
bindFieldOperation) => {
       bindFieldOperation(statement)
     })
 
-    cassandraAsyncExecutor.executeVoid(fullyBoundStatement);
-  }
+    SMono(cassandraAsyncExecutor.executeVoid(fullyBoundStatement))
+  }))
 
-  private def bindOptionalStringValue(statement: BoundStatement, fieldName: 
String, fieldValue: Optional[String]) = {
+  private def bindOptionalStringValue(statement: BoundStatement, fieldName: 
String, fieldValue: Optional[String]) =
     fieldValue.asScala match {
       case Some(value) => statement.setString(fieldName, value)
       case None => statement
     }
-  }
 
-  private def bindOptionalUDTValue(statement: BoundStatement, fieldName: 
String, fieldValue: Optional[UDTValue]) = {
+  private def bindOptionalUDTValue(statement: BoundStatement, fieldName: 
String, fieldValue: Optional[UdtValue]) =
     fieldValue.asScala match {
-      case Some(value) => statement.setUDTValue(fieldName, value)
+      case Some(value) => statement.setUdtValue(fieldName, value)
       case None => statement
     }
-  }
 
-  private def serializeAdditionalInformation(details: TaskExecutionDetails): 
Optional[String] = details
+  private def serializeAdditionalInformation(details: TaskExecutionDetails): 
SMono[Optional[String]] = SMono.fromCallable(() =>details
     .getAdditionalInformation
-    .map(jsonTaskAdditionalInformationSerializer.serialize)
+    .map(jsonTaskAdditionalInformationSerializer.serialize(_)))
+    .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
 
   def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = 
cassandraAsyncExecutor
-    .executeSingleRow(selectStatement.bind().setUUID(TASK_ID, taskId.getValue))
+    .executeSingleRow(selectStatement.bind().setUuid(TASK_ID, taskId.getValue))
     .map(readRow)
 
   def listDetails(): Flux[TaskExecutionDetails] = cassandraAsyncExecutor
@@ -111,22 +117,21 @@ class CassandraTaskExecutionDetailsProjectionDAO 
@Inject()(session: Session, typ
   private def readRow(row: Row): TaskExecutionDetails = {
     val taskType = TaskType.of(row.getString(TYPE))
     new TaskExecutionDetails(
-      taskId = TaskId.fromUUID(row.getUUID(TASK_ID)),
+      taskId = TaskId.fromUUID(row.getUuid(TASK_ID)),
       `type` = TaskType.of(row.getString(TYPE)),
       status = TaskManager.Status.fromString(row.getString(STATUS)),
-      submittedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
+      submittedDate = 
CassandraZonedDateTimeModule.fromUDT(row.getUdtValue(SUBMITTED_DATE)),
       submittedNode = Hostname(row.getString(SUBMITTED_NODE)),
-      startedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)),
+      startedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(STARTED_DATE)),
       ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)),
-      completedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)),
-      canceledDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
+      completedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(COMPLETED_DATE)),
+      canceledDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(CANCELED_DATE)),
       cancelRequestedNode = 
Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
-      failedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
+      failedDate = 
CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(FAILED_DATE)),
       additionalInformation = () => deserializeAdditionalInformation(taskType, 
row))
   }
 
-  private def deserializeAdditionalInformation(taskType: TaskType, row: Row): 
Optional[TaskExecutionDetails.AdditionalInformation] = {
+  private def deserializeAdditionalInformation(taskType: TaskType, row: Row): 
Optional[TaskExecutionDetails.AdditionalInformation] =
     Optional.ofNullable(row.getString(ADDITIONAL_INFORMATION))
       .map(additionalInformation => 
jsonTaskAdditionalInformationSerializer.deserialize(additionalInformation))
-  }
 }
diff --git 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
index 229cbe67fa..080ff13e23 100644
--- 
a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
+++ 
b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -18,8 +18,8 @@
  ****************************************************************/
 package org.apache.james.task.eventsourcing.cassandra
 
-import com.datastax.driver.core.DataType.{text, uuid}
-import com.datastax.driver.core.schemabuilder.{Create, SchemaBuilder}
+import com.datastax.oss.driver.api.core.`type`.DataTypes
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder
 import org.apache.james.backends.cassandra.components.CassandraModule
 import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule
 
@@ -44,22 +44,19 @@ object CassandraTaskExecutionDetailsProjectionModule {
 
   val MODULE: CassandraModule = 
CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
     .comment("Projection of TaskExecutionDetails used by the distributed task 
manager")
-    .options((options: Create.Options) => options
-      .caching(
-        SchemaBuilder.KeyCaching.ALL,
-        SchemaBuilder.noRows()))
-    .statement((statement: Create) => statement
-      .addPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, 
uuid)
-      
.addColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, 
text)
-      .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text)
-      .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text)
-      
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, 
text)
-      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text)
-      
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      
.addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      
.addColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, 
text)
-      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, 
SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+    .options(options => options.withCaching(true, 
SchemaBuilder.RowsPerPartition.NONE))
+    .statement(statement => types => statement
+      .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, 
DataTypes.UUID)
+      
.withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION,
 DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, 
DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, 
DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, 
types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, 
DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, 
types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, 
DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, 
types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, 
types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      
.withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, 
DataTypes.TEXT)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, 
types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
     .build
 }
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index de2a3caf4d..10c500c012 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -31,8 +31,10 @@ import static org.awaitility.Durations.FIVE_SECONDS;
 import static org.awaitility.Durations.ONE_SECOND;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +58,7 @@ import 
org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.json.DTOConverter;
+import org.apache.james.json.DTOModule;
 import 
org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
@@ -77,6 +80,7 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
+import org.apache.james.task.TaskType;
 import org.apache.james.task.TaskWithId;
 import org.apache.james.task.WorkQueue;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
@@ -91,6 +95,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -156,6 +162,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
     ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules =
         ImmutableSet.of(
+            
CassandraExecutingTask.module(CASSANDRA_CLUSTER.getCassandraCluster().getConf()),
             TestTaskDTOModules.FAILS_DESERIALIZATION_TASK_MODULE,
             TestTaskDTOModules.COMPLETED_TASK_MODULE,
             TestTaskDTOModules.FAILED_TASK_MODULE,
@@ -538,16 +545,16 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
         cassandra.getConf().registerScenario(Scenario.combine(
             executeNormally()
                 .times(2) // submit + inProgress
-                .whenQueryStartsWith("INSERT INTO eventStore"),
+                .whenQueryStartsWith("INSERT INTO eventstore"),
             executeNormally()
                 .times(2) // submit + inProgress
-                .whenQueryStartsWith("INSERT INTO 
taskExecutionDetailsProjection"),
+                .whenQueryStartsWith("INSERT INTO 
taskexecutiondetailsprojection"),
             fail()
                 .forever()
-                .whenQueryStartsWith("INSERT INTO eventStore"),
+                .whenQueryStartsWith("INSERT INTO eventstore"),
             fail()
                 .forever()
-                .whenQueryStartsWith("INSERT INTO 
taskExecutionDetailsProjection")));
+                .whenQueryStartsWith("INSERT INTO 
taskexecutiondetailsprojection")));
         taskManager.submit(new FailedTask());
 
         Thread.sleep(1000);
@@ -559,6 +566,100 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
         awaitUntilTaskHasStatus(id2, TaskManager.Status.COMPLETED, 
taskManager);
     }
 
+    @Test
+    void cassandraTasksShouldSucceed(CassandraCluster cassandra) throws 
Exception {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        TaskId taskId = taskManager.submit(new 
CassandraExecutingTask(cassandra.getConf(), false));
+
+        TaskExecutionDetails await = taskManager.await(taskId, 
Duration.ofSeconds(30));
+
+        assertThat(await.getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) {
+        TaskManager taskManager = taskManager(HOSTNAME);
+
+        TaskId taskId = taskManager.submit(new 
CassandraExecutingTask(cassandra.getConf(), true));
+
+        taskManager.cancel(taskId);
+
+        awaitAtMostTwoSeconds.untilAsserted(() ->
+            assertThat(taskManager.getExecutionDetails(taskId).getStatus())
+                .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED));
+    }
+
+    static class CassandraExecutingTask implements Task {
+        public static class CassandraExecutingTaskDTO implements TaskDTO {
+            private final String type;
+            private final boolean pause;
+
+            public CassandraExecutingTaskDTO(@JsonProperty("type") String 
type, @JsonProperty("pause") boolean pause) {
+                this.type = type;
+                this.pause = pause;
+            }
+
+            public boolean isPause() {
+                return pause;
+            }
+
+            @Override
+            public String getType() {
+                return type;
+            }
+        }
+
+        public static TaskDTOModule<CassandraExecutingTask, 
CassandraExecutingTaskDTO> module(CqlSession session) {
+            return DTOModule
+                .forDomainObject(CassandraExecutingTask.class)
+                .convertToDTO(CassandraExecutingTaskDTO.class)
+                .toDomainObjectConverter(dto -> new 
CassandraExecutingTask(session, dto.isPause()))
+                .toDTOConverter((task, typeName) -> new 
CassandraExecutingTaskDTO(typeName, task.pause))
+                .typeName("CassandraExecutingTask")
+                .withFactory(TaskDTOModule::new);
+        }
+
+        private final CqlSession session;
+        private final boolean pause;
+
+        CassandraExecutingTask(CqlSession session, boolean pause) {
+            this.session = session;
+            this.pause = pause;
+
+            // Some task requires cassandra query execution upon their creation
+            Mono.from(session.executeReactive("SELECT dateof(now()) FROM 
system.local ;"))
+                .block();
+        }
+
+        @Override
+        public Result run() throws InterruptedException {
+            // Task often execute Cassandra logic
+            Mono.from(session.executeReactive("SELECT dateof(now()) FROM 
system.local ;"))
+                .block();
+
+            if (pause) {
+                Thread.sleep(120000);
+            }
+
+            return Result.COMPLETED;
+        }
+
+        @Override
+        public TaskType type() {
+            return TaskType.of("CassandraExecutingTask");
+        }
+
+        @Override
+        public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+            // Some task requires cassandra query execution upon detail 
generation
+            Mono.from(session.executeReactive("SELECT dateof(now()) FROM 
system.local ;"))
+                .block();
+
+            return Optional.empty();
+        }
+    }
+
     private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, 
Hostname> hostNameByTaskManager, Hostname node) {
         return hostNameByTaskManager
             .values()
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 2a8f4aa207..06d77c20ef 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import org.apache.james.util.MDCBuilder;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
@@ -68,7 +69,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
         if (!cancelledTasks.remove(taskWithId.getId())) {
-            Mono<Task.Result> taskMono = Mono.fromCallable(() -> 
runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor);
+            Mono<Task.Result> taskMono = runWithMdc(taskWithId, 
listener).subscribeOn(taskExecutor);
             CompletableFuture<Task.Result> future = taskMono.toFuture();
             runningTask.set(Tuples.of(taskWithId.getId(), future));
 
@@ -79,21 +80,28 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
                             .thenReturn(Task.Result.PARTIAL)),
                 Disposable::dispose);
         } else {
-            return Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details()))
+            return Mono.fromCallable(() -> taskWithId.getTask().details())
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                .flatMap(details -> 
Mono.from(listener.cancelled(taskWithId.getId(), details)))
                 .then(Mono.empty());
         }
     }
 
     private Publisher<Void> handleExecutionError(TaskWithId taskWithId, 
Listener listener, Throwable exception) {
         if (exception instanceof CancellationException) {
-            return listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
+            return Mono.fromCallable(() -> taskWithId.getTask().details())
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                .flatMap(details -> 
Mono.from(listener.cancelled(taskWithId.getId(),details)));
         } else {
-            return listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), exception);
+            return Mono.fromCallable(() -> taskWithId.getTask().details())
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                .flatMap(details -> 
Mono.from(listener.failed(taskWithId.getId(), details, exception)));
         }
     }
 
     private Flux<TaskExecutionDetails.AdditionalInformation> 
pollAdditionalInformation(TaskWithId taskWithId) {
         return Mono.fromCallable(() -> taskWithId.getTask().details())
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
             .delayElement(pollingInterval, Schedulers.parallel())
             .repeat()
             .handle(publishIfPresent())
@@ -101,13 +109,12 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     }
 
 
-    private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) {
-        return MDCBuilder.withMdc(
-            MDCBuilder.create()
-                .addToContext(Task.TASK_ID, taskWithId.getId().asString())
-                .addToContext(Task.TASK_TYPE, 
taskWithId.getTask().type().asString())
-                .addToContext(Task.TASK_DETAILS, 
taskWithId.getTask().details().toString()),
-            () -> run(taskWithId, listener).block());
+    private Mono<Task.Result> runWithMdc(TaskWithId taskWithId, Listener 
listener) {
+        return run(taskWithId, listener)
+            .contextWrite(ReactorUtils.context("task",
+                MDCBuilder.create()
+                    .addToContext(Task.TASK_ID, taskWithId.getId().asString())
+                    .addToContext(Task.TASK_TYPE, 
taskWithId.getTask().type().asString())));
     }
 
     private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) {
@@ -116,7 +123,11 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
             .onErrorResume(this::isCausedByInterruptedException, e -> 
cancelled(taskWithId, listener))
             .onErrorResume(Exception.class, e -> {
                 LOGGER.error("Error while running task {}", 
taskWithId.getId(), e);
-                return Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL);
+
+                return Mono.fromCallable(() -> taskWithId.getTask().details())
+                    .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+                    .flatMap(details -> 
Mono.from(listener.failed(taskWithId.getId(), details, e)))
+                    .thenReturn(Task.Result.PARTIAL);
             });
     }
 
@@ -130,14 +141,16 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
 
     private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener 
listener) {
         TaskId id = taskWithId.getId();
-        Optional<TaskExecutionDetails.AdditionalInformation> details = 
taskWithId.getTask().details();
 
-        return Mono.from(listener.cancelled(id, details))
+        return Mono.fromCallable(taskWithId.getTask()::details)
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+            .flatMap(details ->  Mono.from(listener.cancelled(id, details)))
             .thenReturn(Task.Result.PARTIAL);
     }
 
     private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener 
listener) {
         return Mono.fromCallable(() -> taskWithId.getTask().run())
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
             .doOnNext(result -> result
                 .onComplete(any -> 
Mono.from(listener.completed(taskWithId.getId(), result, 
taskWithId.getTask().details())).block())
                 .onFailure(() -> {
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 287a25bd79..4af13f7e09 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -84,6 +84,9 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
   override def getExecutionDetails(id: TaskId): TaskExecutionDetails = 
executionDetailsProjection.load(id)
     .getOrElse(throw new TaskNotFoundException())
 
+  private def getExecutionDetailsReactive(id: TaskId): 
SMono[TaskExecutionDetails] = SMono(executionDetailsProjection.loadReactive(id))
+    .switchIfEmpty(SMono.error(new TaskNotFoundException()))
+
   override def list: util.List[TaskExecutionDetails] = listScala.asJava
 
   override def list(status: TaskManager.Status): 
util.List[TaskExecutionDetails] = listScala
@@ -102,7 +105,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
   @throws(classOf[ReachedTimeoutException])
   override def await(id: TaskId, timeout: Duration): TaskExecutionDetails = {
     try {
-      val details = Mono.fromSupplier[TaskExecutionDetails](() => 
getExecutionDetails(id))
+      val details = Mono.from(getExecutionDetailsReactive(id))
         .filter(_.getStatus.isFinished)
 
       val findEvent = Flux.from(terminationSubscriber.listenEvents)
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 42fb4ab2c7..6094b6825b 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -20,38 +20,46 @@ package org.apache.james.task.eventsourcing
 
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.james.eventsourcing.Subscriber
+import org.apache.james.eventsourcing.ReactiveSubscriber
 import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId}
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.{SFlux, SMono}
 
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait TaskExecutionDetailsProjection {
-  def asSubscriber(hostname: Hostname): Subscriber = {
+  def asSubscriber(hostname: Hostname): ReactiveSubscriber = {
     case created: Created =>
-      update(TaskExecutionDetails.from(created.task, 
created.aggregateId.taskId, created.hostname))
+      updateReactive(TaskExecutionDetails.from(created.task, 
created.aggregateId.taskId, created.hostname))
     case cancelRequested: CancelRequested =>
-      update(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname))
+      
updateReactive(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname))
     case started: Started =>
-      update(started.aggregateId.taskId)(_.started(hostname))
+      updateReactive(started.aggregateId.taskId)(_.started(hostname))
     case completed: Completed =>
-      
update(completed.aggregateId.taskId)(_.completed(completed.additionalInformation.asJava))
+      
updateReactive(completed.aggregateId.taskId)(_.completed(completed.additionalInformation.asJava))
     case failed: Failed =>
-      
update(failed.aggregateId.taskId)(_.failed(failed.additionalInformation.asJava))
+      
updateReactive(failed.aggregateId.taskId)(_.failed(failed.additionalInformation.asJava))
     case canceled: Cancelled =>
-      
update(canceled.aggregateId.taskId)(_.cancelEffectively(canceled.additionalInformation.asJava))
+      
updateReactive(canceled.aggregateId.taskId)(_.cancelEffectively(canceled.additionalInformation.asJava))
     case updated: AdditionalInformationUpdated =>
-      
update(updated.aggregateId.taskId)(_.updateInformation(updated.additionalInformation))
+      
updateReactive(updated.aggregateId.taskId)(_.updateInformation(updated.additionalInformation))
   }
 
-  private def update(taskId: TaskId)(updater: TaskExecutionDetails => 
TaskExecutionDetails): Unit =
-    load(taskId)
+  private def updateReactive(taskId: TaskId)(updater: TaskExecutionDetails => 
TaskExecutionDetails): Publisher[Void] =
+    SMono.fromPublisher(loadReactive(taskId))
       .map(updater)
-      .foreach(update)
+      .flatMap(taskExecutionDetails => 
SMono.fromPublisher(updateReactive(taskExecutionDetails)))
 
   def load(taskId: TaskId): Option[TaskExecutionDetails]
   def list: List[TaskExecutionDetails]
   def update(details: TaskExecutionDetails): Unit
+
+  def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails]
+
+  def listReactive(): Publisher[TaskExecutionDetails]
+
+  def updateReactive(details: TaskExecutionDetails): Publisher[Void]
 }
 
 class MemoryTaskExecutionDetailsProjection() extends 
TaskExecutionDetailsProjection {
@@ -62,4 +70,11 @@ class MemoryTaskExecutionDetailsProjection() extends 
TaskExecutionDetailsProject
   override def list: List[TaskExecutionDetails] = 
this.details.values().asScala.toList
 
   override def update(details: TaskExecutionDetails): Unit = 
this.details.put(details.taskId, details)
+
+
+  override def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails] = 
SMono.fromCallable(() => this.details.get(taskId))
+
+  override def listReactive(): Publisher[TaskExecutionDetails] = 
SFlux.fromIterable(this.details.values().asScala)
+
+  override def updateReactive(details: TaskExecutionDetails): Publisher[Void] 
= SMono.fromCallable(() => this.details.put(details.taskId, details)).`then`()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to