This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b2a0fd184619f4c6bbddec6062e6b162b1d45b21 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Mon Jun 6 15:31:31 2022 +0700 JAMES-3774 Migrate server/task/task-distributed to Cassandra driver 4 --- .../eventstore/cassandra/EventStoreDao.scala | 6 +- .../distributed/RabbitMQWorkQueue.java | 6 +- ...assandraTaskExecutionDetailsProjectionDAO.scala | 67 +++++++------ ...andraTaskExecutionDetailsProjectionModule.scala | 35 +++---- .../distributed/DistributedTaskManagerTest.java | 109 ++++++++++++++++++++- .../apache/james/task/SerialTaskManagerWorker.java | 41 +++++--- .../eventsourcing/EventSourcingTaskManager.scala | 5 +- .../TaskExecutionDetailsProjection.scala | 39 +++++--- 8 files changed, 223 insertions(+), 85 deletions(-) diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala index 33c8219bbe..f99a231d02 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala @@ -29,6 +29,7 @@ import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.eventsourcing.eventstore.History import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID} import org.apache.james.eventsourcing.{AggregateId, Event} +import org.apache.james.util.ReactorUtils import reactor.core.scala.publisher.{SFlux, SMono} class EventStoreDao @Inject() (val session: CqlSession, @@ -80,12 +81,13 @@ class EventStoreDao @Inject() (val session: CqlSession, .setExecutionProfile(executionProfile) val rows: SFlux[Row] = SFlux[Row](cassandraAsyncExecutor.executeRows(preparedStatement)) - val events: SFlux[Event] = rows.map(toEvent) + val events: SFlux[Event] = rows.concatMap(toEvent) val listEvents: SMono[List[Event]] = events.collectSeq() .map(_.toList) listEvents.map(History.of(_)) } - private def toEvent(row: Row): Event = jsonEventSerializer.deserialize(row.getString(EVENT)) + private def toEvent(row: Row): SMono[Event] = SMono.fromCallable(() => jsonEventSerializer.deserialize(row.getString(EVENT))) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) } \ No newline at end of file diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index f74f5a81d3..3d6497b642 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -182,8 +182,10 @@ public class RabbitMQWorkQueue implements WorkQueue { .onErrorResume(error -> { String errorMessage = String.format("Unable to run submitted Task %s", taskId.asString()); LOGGER.warn(errorMessage, error); - return Mono.from(worker.fail(taskId, task.details(), errorMessage, error)) - .then(Mono.empty()); + return Mono.fromCallable(task::details) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(worker.fail(taskId, details, errorMessage, error)) + .then(Mono.empty())); }); } diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala index f2332c19da..8e2422dd01 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala @@ -20,20 +20,23 @@ package org.apache.james.task.eventsourcing.cassandra import java.util.Optional -import com.datastax.driver.core.querybuilder.QueryBuilder -import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select} -import com.datastax.driver.core.{BoundStatement, Row, Session, UDTValue} +import com.datastax.oss.driver.api.core.CqlSession +import com.datastax.oss.driver.api.core.cql.{BoundStatement, Row} +import com.datastax.oss.driver.api.core.data.UdtValue +import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto, selectFrom} import javax.inject.Inject import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule} import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer import org.apache.james.task._ import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._ +import org.apache.james.util.ReactorUtils import reactor.core.publisher.{Flux, Mono} +import reactor.core.scala.publisher.SMono import scala.compat.java8.OptionConverters._ -class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) { +class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: CqlSession, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) { private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session) private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME) @@ -50,19 +53,23 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ .value(CANCEL_REQUESTED_NODE, bindMarker(CANCEL_REQUESTED_NODE)) .value(FAILED_DATE, bindMarker(FAILED_DATE)) .value(ADDITIONAL_INFORMATION, bindMarker(ADDITIONAL_INFORMATION)) - ) + .build()) - private val selectStatement = session.prepare(select().from(TABLE_NAME) - .where(QueryBuilder.eq(TASK_ID, bindMarker(TASK_ID)))) + private val selectStatement = session.prepare(selectFrom(TABLE_NAME) + .all() + .whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID)) + .build()) - private val listStatement = session.prepare(select().from(TABLE_NAME)) + private val listStatement = session.prepare(selectFrom(TABLE_NAME).all().build()) - def saveDetails(details: TaskExecutionDetails): Mono[Void] = { - val boundStatement = insertStatement.bind - .setUUID(TASK_ID, details.getTaskId.getValue) + def saveDetails(details: TaskExecutionDetails): Mono[Void] = + Mono.from(serializeAdditionalInformation(details) + .flatMap(serializeAdditionalInformation => { + val boundStatement = insertStatement.bind() + .setUuid(TASK_ID, details.getTaskId.getValue) .setString(TYPE, details.getType.asString()) .setString(STATUS, details.getStatus.getValue) - .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate)) + .setUdtValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate)) .setString(SUBMITTED_NODE, details.getSubmittedNode.asString) val bindOptionalFieldOperations = List( @@ -72,36 +79,35 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ (statement: BoundStatement) => bindOptionalUDTValue(statement, CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate)), (statement: BoundStatement) => bindOptionalStringValue(statement, CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)), (statement: BoundStatement) => bindOptionalUDTValue(statement, FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getFailedDate)), - (statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation(details)), + (statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation), ) val fullyBoundStatement = bindOptionalFieldOperations.foldLeft(boundStatement)((statement, bindFieldOperation) => { bindFieldOperation(statement) }) - cassandraAsyncExecutor.executeVoid(fullyBoundStatement); - } + SMono(cassandraAsyncExecutor.executeVoid(fullyBoundStatement)) + })) - private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) = { + private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) = fieldValue.asScala match { case Some(value) => statement.setString(fieldName, value) case None => statement } - } - private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UDTValue]) = { + private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UdtValue]) = fieldValue.asScala match { - case Some(value) => statement.setUDTValue(fieldName, value) + case Some(value) => statement.setUdtValue(fieldName, value) case None => statement } - } - private def serializeAdditionalInformation(details: TaskExecutionDetails): Optional[String] = details + private def serializeAdditionalInformation(details: TaskExecutionDetails): SMono[Optional[String]] = SMono.fromCallable(() =>details .getAdditionalInformation - .map(jsonTaskAdditionalInformationSerializer.serialize) + .map(jsonTaskAdditionalInformationSerializer.serialize(_))) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = cassandraAsyncExecutor - .executeSingleRow(selectStatement.bind().setUUID(TASK_ID, taskId.getValue)) + .executeSingleRow(selectStatement.bind().setUuid(TASK_ID, taskId.getValue)) .map(readRow) def listDetails(): Flux[TaskExecutionDetails] = cassandraAsyncExecutor @@ -111,22 +117,21 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ private def readRow(row: Row): TaskExecutionDetails = { val taskType = TaskType.of(row.getString(TYPE)) new TaskExecutionDetails( - taskId = TaskId.fromUUID(row.getUUID(TASK_ID)), + taskId = TaskId.fromUUID(row.getUuid(TASK_ID)), `type` = TaskType.of(row.getString(TYPE)), status = TaskManager.Status.fromString(row.getString(STATUS)), - submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)), + submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUdtValue(SUBMITTED_DATE)), submittedNode = Hostname(row.getString(SUBMITTED_NODE)), - startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)), + startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(STARTED_DATE)), ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)), - completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)), - canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)), + completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(COMPLETED_DATE)), + canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(CANCELED_DATE)), cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)), - failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)), + failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUdtValue(FAILED_DATE)), additionalInformation = () => deserializeAdditionalInformation(taskType, row)) } - private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] = { + private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] = Optional.ofNullable(row.getString(ADDITIONAL_INFORMATION)) .map(additionalInformation => jsonTaskAdditionalInformationSerializer.deserialize(additionalInformation)) - } } diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala index 229cbe67fa..080ff13e23 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala @@ -18,8 +18,8 @@ ****************************************************************/ package org.apache.james.task.eventsourcing.cassandra -import com.datastax.driver.core.DataType.{text, uuid} -import com.datastax.driver.core.schemabuilder.{Create, SchemaBuilder} +import com.datastax.oss.driver.api.core.`type`.DataTypes +import com.datastax.oss.driver.api.querybuilder.SchemaBuilder import org.apache.james.backends.cassandra.components.CassandraModule import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule @@ -44,22 +44,19 @@ object CassandraTaskExecutionDetailsProjectionModule { val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME) .comment("Projection of TaskExecutionDetails used by the distributed task manager") - .options((options: Create.Options) => options - .caching( - SchemaBuilder.KeyCaching.ALL, - SchemaBuilder.noRows())) - .statement((statement: Create) => statement - .addPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, uuid) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, text) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text) - .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, text) - .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text) - .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) - .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) - .addColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, text) - .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))) + .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE)) + .statement(statement => types => statement + .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, DataTypes.UUID) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, DataTypes.TEXT) + .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))) .build } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index de2a3caf4d..10c500c012 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -31,8 +31,10 @@ import static org.awaitility.Durations.FIVE_SECONDS; import static org.awaitility.Durations.ONE_SECOND; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -56,6 +58,7 @@ import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.json.DTOConverter; +import org.apache.james.json.DTOModule; import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.server.task.json.dto.AdditionalInformationDTO; @@ -77,6 +80,7 @@ import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; +import org.apache.james.task.TaskType; import org.apache.james.task.TaskWithId; import org.apache.james.task.WorkQueue; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; @@ -91,6 +95,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.datastax.oss.driver.api.core.CqlSession; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -156,6 +162,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { ImmutableSet<TaskDTOModule<?, ?>> taskDTOModules = ImmutableSet.of( + CassandraExecutingTask.module(CASSANDRA_CLUSTER.getCassandraCluster().getConf()), TestTaskDTOModules.FAILS_DESERIALIZATION_TASK_MODULE, TestTaskDTOModules.COMPLETED_TASK_MODULE, TestTaskDTOModules.FAILED_TASK_MODULE, @@ -538,16 +545,16 @@ class DistributedTaskManagerTest implements TaskManagerContract { cassandra.getConf().registerScenario(Scenario.combine( executeNormally() .times(2) // submit + inProgress - .whenQueryStartsWith("INSERT INTO eventStore"), + .whenQueryStartsWith("INSERT INTO eventstore"), executeNormally() .times(2) // submit + inProgress - .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection"), + .whenQueryStartsWith("INSERT INTO taskexecutiondetailsprojection"), fail() .forever() - .whenQueryStartsWith("INSERT INTO eventStore"), + .whenQueryStartsWith("INSERT INTO eventstore"), fail() .forever() - .whenQueryStartsWith("INSERT INTO taskExecutionDetailsProjection"))); + .whenQueryStartsWith("INSERT INTO taskexecutiondetailsprojection"))); taskManager.submit(new FailedTask()); Thread.sleep(1000); @@ -559,6 +566,100 @@ class DistributedTaskManagerTest implements TaskManagerContract { awaitUntilTaskHasStatus(id2, TaskManager.Status.COMPLETED, taskManager); } + @Test + void cassandraTasksShouldSucceed(CassandraCluster cassandra) throws Exception { + TaskManager taskManager = taskManager(HOSTNAME); + + TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), false)); + + TaskExecutionDetails await = taskManager.await(taskId, Duration.ofSeconds(30)); + + assertThat(await.getStatus()).isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + void cassandraTasksShouldBeCancealable(CassandraCluster cassandra) { + TaskManager taskManager = taskManager(HOSTNAME); + + TaskId taskId = taskManager.submit(new CassandraExecutingTask(cassandra.getConf(), true)); + + taskManager.cancel(taskId); + + awaitAtMostTwoSeconds.untilAsserted(() -> + assertThat(taskManager.getExecutionDetails(taskId).getStatus()) + .isIn(TaskManager.Status.CANCELLED, TaskManager.Status.CANCEL_REQUESTED)); + } + + static class CassandraExecutingTask implements Task { + public static class CassandraExecutingTaskDTO implements TaskDTO { + private final String type; + private final boolean pause; + + public CassandraExecutingTaskDTO(@JsonProperty("type") String type, @JsonProperty("pause") boolean pause) { + this.type = type; + this.pause = pause; + } + + public boolean isPause() { + return pause; + } + + @Override + public String getType() { + return type; + } + } + + public static TaskDTOModule<CassandraExecutingTask, CassandraExecutingTaskDTO> module(CqlSession session) { + return DTOModule + .forDomainObject(CassandraExecutingTask.class) + .convertToDTO(CassandraExecutingTaskDTO.class) + .toDomainObjectConverter(dto -> new CassandraExecutingTask(session, dto.isPause())) + .toDTOConverter((task, typeName) -> new CassandraExecutingTaskDTO(typeName, task.pause)) + .typeName("CassandraExecutingTask") + .withFactory(TaskDTOModule::new); + } + + private final CqlSession session; + private final boolean pause; + + CassandraExecutingTask(CqlSession session, boolean pause) { + this.session = session; + this.pause = pause; + + // Some task requires cassandra query execution upon their creation + Mono.from(session.executeReactive("SELECT dateof(now()) FROM system.local ;")) + .block(); + } + + @Override + public Result run() throws InterruptedException { + // Task often execute Cassandra logic + Mono.from(session.executeReactive("SELECT dateof(now()) FROM system.local ;")) + .block(); + + if (pause) { + Thread.sleep(120000); + } + + return Result.COMPLETED; + } + + @Override + public TaskType type() { + return TaskType.of("CassandraExecutingTask"); + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + // Some task requires cassandra query execution upon detail generation + Mono.from(session.executeReactive("SELECT dateof(now()) FROM system.local ;")) + .block(); + + return Optional.empty(); + } + } + private Hostname getOtherNode(ImmutableBiMap<EventSourcingTaskManager, Hostname> hostNameByTaskManager, Hostname node) { return hostNameByTaskManager .values() diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 2a8f4aa207..06d77c20ef 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.apache.james.util.MDCBuilder; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.concurrent.NamedThreadFactory; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -68,7 +69,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { @Override public Mono<Task.Result> executeTask(TaskWithId taskWithId) { if (!cancelledTasks.remove(taskWithId.getId())) { - Mono<Task.Result> taskMono = Mono.fromCallable(() -> runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor); + Mono<Task.Result> taskMono = runWithMdc(taskWithId, listener).subscribeOn(taskExecutor); CompletableFuture<Task.Result> future = taskMono.toFuture(); runningTask.set(Tuples.of(taskWithId.getId(), future)); @@ -79,21 +80,28 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .thenReturn(Task.Result.PARTIAL)), Disposable::dispose); } else { - return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())) + return Mono.fromCallable(() -> taskWithId.getTask().details()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(listener.cancelled(taskWithId.getId(), details))) .then(Mono.empty()); } } private Publisher<Void> handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) { if (exception instanceof CancellationException) { - return listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); + return Mono.fromCallable(() -> taskWithId.getTask().details()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(listener.cancelled(taskWithId.getId(),details))); } else { - return listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); + return Mono.fromCallable(() -> taskWithId.getTask().details()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(listener.failed(taskWithId.getId(), details, exception))); } } private Flux<TaskExecutionDetails.AdditionalInformation> pollAdditionalInformation(TaskWithId taskWithId) { return Mono.fromCallable(() -> taskWithId.getTask().details()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) .delayElement(pollingInterval, Schedulers.parallel()) .repeat() .handle(publishIfPresent()) @@ -101,13 +109,12 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { } - private Task.Result runWithMdc(TaskWithId taskWithId, Listener listener) { - return MDCBuilder.withMdc( - MDCBuilder.create() - .addToContext(Task.TASK_ID, taskWithId.getId().asString()) - .addToContext(Task.TASK_TYPE, taskWithId.getTask().type().asString()) - .addToContext(Task.TASK_DETAILS, taskWithId.getTask().details().toString()), - () -> run(taskWithId, listener).block()); + private Mono<Task.Result> runWithMdc(TaskWithId taskWithId, Listener listener) { + return run(taskWithId, listener) + .contextWrite(ReactorUtils.context("task", + MDCBuilder.create() + .addToContext(Task.TASK_ID, taskWithId.getId().asString()) + .addToContext(Task.TASK_TYPE, taskWithId.getTask().type().asString()))); } private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) { @@ -116,7 +123,11 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .onErrorResume(this::isCausedByInterruptedException, e -> cancelled(taskWithId, listener)) .onErrorResume(Exception.class, e -> { LOGGER.error("Error while running task {}", taskWithId.getId(), e); - return Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL); + + return Mono.fromCallable(() -> taskWithId.getTask().details()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(listener.failed(taskWithId.getId(), details, e))) + .thenReturn(Task.Result.PARTIAL); }); } @@ -130,14 +141,16 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener listener) { TaskId id = taskWithId.getId(); - Optional<TaskExecutionDetails.AdditionalInformation> details = taskWithId.getTask().details(); - return Mono.from(listener.cancelled(id, details)) + return Mono.fromCallable(taskWithId.getTask()::details) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .flatMap(details -> Mono.from(listener.cancelled(id, details))) .thenReturn(Task.Result.PARTIAL); } private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener listener) { return Mono.fromCallable(() -> taskWithId.getTask().run()) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) .doOnNext(result -> result .onComplete(any -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block()) .onFailure(() -> { diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 287a25bd79..4af13f7e09 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -84,6 +84,9 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] override def getExecutionDetails(id: TaskId): TaskExecutionDetails = executionDetailsProjection.load(id) .getOrElse(throw new TaskNotFoundException()) + private def getExecutionDetailsReactive(id: TaskId): SMono[TaskExecutionDetails] = SMono(executionDetailsProjection.loadReactive(id)) + .switchIfEmpty(SMono.error(new TaskNotFoundException())) + override def list: util.List[TaskExecutionDetails] = listScala.asJava override def list(status: TaskManager.Status): util.List[TaskExecutionDetails] = listScala @@ -102,7 +105,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] @throws(classOf[ReachedTimeoutException]) override def await(id: TaskId, timeout: Duration): TaskExecutionDetails = { try { - val details = Mono.fromSupplier[TaskExecutionDetails](() => getExecutionDetails(id)) + val details = Mono.from(getExecutionDetailsReactive(id)) .filter(_.getStatus.isFinished) val findEvent = Flux.from(terminationSubscriber.listenEvents) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala index 42fb4ab2c7..6094b6825b 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -20,38 +20,46 @@ package org.apache.james.task.eventsourcing import java.util.concurrent.ConcurrentHashMap -import org.apache.james.eventsourcing.Subscriber +import org.apache.james.eventsourcing.ReactiveSubscriber import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId} +import org.reactivestreams.Publisher +import reactor.core.scala.publisher.{SFlux, SMono} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ trait TaskExecutionDetailsProjection { - def asSubscriber(hostname: Hostname): Subscriber = { + def asSubscriber(hostname: Hostname): ReactiveSubscriber = { case created: Created => - update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname)) + updateReactive(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname)) case cancelRequested: CancelRequested => - update(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname)) + updateReactive(cancelRequested.aggregateId.taskId)(_.cancelRequested(hostname)) case started: Started => - update(started.aggregateId.taskId)(_.started(hostname)) + updateReactive(started.aggregateId.taskId)(_.started(hostname)) case completed: Completed => - update(completed.aggregateId.taskId)(_.completed(completed.additionalInformation.asJava)) + updateReactive(completed.aggregateId.taskId)(_.completed(completed.additionalInformation.asJava)) case failed: Failed => - update(failed.aggregateId.taskId)(_.failed(failed.additionalInformation.asJava)) + updateReactive(failed.aggregateId.taskId)(_.failed(failed.additionalInformation.asJava)) case canceled: Cancelled => - update(canceled.aggregateId.taskId)(_.cancelEffectively(canceled.additionalInformation.asJava)) + updateReactive(canceled.aggregateId.taskId)(_.cancelEffectively(canceled.additionalInformation.asJava)) case updated: AdditionalInformationUpdated => - update(updated.aggregateId.taskId)(_.updateInformation(updated.additionalInformation)) + updateReactive(updated.aggregateId.taskId)(_.updateInformation(updated.additionalInformation)) } - private def update(taskId: TaskId)(updater: TaskExecutionDetails => TaskExecutionDetails): Unit = - load(taskId) + private def updateReactive(taskId: TaskId)(updater: TaskExecutionDetails => TaskExecutionDetails): Publisher[Void] = + SMono.fromPublisher(loadReactive(taskId)) .map(updater) - .foreach(update) + .flatMap(taskExecutionDetails => SMono.fromPublisher(updateReactive(taskExecutionDetails))) def load(taskId: TaskId): Option[TaskExecutionDetails] def list: List[TaskExecutionDetails] def update(details: TaskExecutionDetails): Unit + + def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails] + + def listReactive(): Publisher[TaskExecutionDetails] + + def updateReactive(details: TaskExecutionDetails): Publisher[Void] } class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProjection { @@ -62,4 +70,11 @@ class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProject override def list: List[TaskExecutionDetails] = this.details.values().asScala.toList override def update(details: TaskExecutionDetails): Unit = this.details.put(details.taskId, details) + + + override def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails] = SMono.fromCallable(() => this.details.get(taskId)) + + override def listReactive(): Publisher[TaskExecutionDetails] = SFlux.fromIterable(this.details.values().asScala) + + override def updateReactive(details: TaskExecutionDetails): Publisher[Void] = SMono.fromCallable(() => this.details.put(details.taskId, details)).`then`() } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org