This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.9.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 735e6cd7928ef93896c651b7aae2c9c1a040c31b Author: Benoit TELLIER <[email protected]> AuthorDate: Sat Dec 20 14:58:24 2025 +0100 [ENHANCEMENT] Distributed task manager logs --- .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index 79a93d3134..d2ac9f53c2 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -165,7 +165,8 @@ public class RabbitMQWorkQueue implements WorkQueue { .flatMap(taskId -> Mono.fromCallable(() -> new String(delivery.getBody(), StandardCharsets.UTF_8)) .flatMap(bodyValue -> deserialize(bodyValue, taskId)) .doOnNext(task -> delivery.ack()) - .flatMap(task -> executeOnWorker(taskId, task))) + .flatMap(task -> executeOnWorker(taskId, task)) + .doOnSuccess(result -> LOGGER.info("Executed task {} yield {}", taskId, result))) .onErrorResume(error -> { Optional<Object> taskId = Optional.ofNullable(delivery.getProperties()) .flatMap(props -> Optional.ofNullable(props.getHeaders())) @@ -187,6 +188,7 @@ public class RabbitMQWorkQueue implements WorkQueue { } private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) { + LOGGER.info("Executing task {} ({}) ", taskId, task.getClass()); return worker.executeTask(new TaskWithId(taskId, task)) .timeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout()) .onErrorResume(error -> { @@ -251,6 +253,7 @@ public class RabbitMQWorkQueue implements WorkQueue { OutboundMessage outboundMessage = new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload); sender.send(Mono.just(outboundMessage)).block(); + LOGGER.info("Submitted task {} ({})", taskWithId.getId(), taskWithId.getTask().getClass()); } catch (JsonProcessingException e) { throw new RuntimeException(e); } @@ -258,6 +261,7 @@ public class RabbitMQWorkQueue implements WorkQueue { @Override public void cancel(TaskId taskId) { + LOGGER.info("Requesting cancel for task {}", taskId); sendCancelRequestsQueue.emitNext(taskId, FAIL_FAST); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
