This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.9.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 735e6cd7928ef93896c651b7aae2c9c1a040c31b
Author: Benoit TELLIER <[email protected]>
AuthorDate: Sat Dec 20 14:58:24 2025 +0100

    [ENHANCEMENT] Distributed task manager logs
---
 .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index 79a93d3134..d2ac9f53c2 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -165,7 +165,8 @@ public class RabbitMQWorkQueue implements WorkQueue {
             .flatMap(taskId -> Mono.fromCallable(() -> new 
String(delivery.getBody(), StandardCharsets.UTF_8))
                 .flatMap(bodyValue -> deserialize(bodyValue, taskId))
                 .doOnNext(task -> delivery.ack())
-                .flatMap(task -> executeOnWorker(taskId, task)))
+                .flatMap(task -> executeOnWorker(taskId, task))
+                .doOnSuccess(result -> LOGGER.info("Executed task {} yield 
{}", taskId, result)))
             .onErrorResume(error -> {
                 Optional<Object> taskId = 
Optional.ofNullable(delivery.getProperties())
                     .flatMap(props -> Optional.ofNullable(props.getHeaders()))
@@ -187,6 +188,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
     }
 
     private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) {
+        LOGGER.info("Executing task {} ({}) ", taskId, task.getClass());
         return worker.executeTask(new TaskWithId(taskId, task))
             .timeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout())
             .onErrorResume(error -> {
@@ -251,6 +253,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
             OutboundMessage outboundMessage = new 
OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, basicProperties, payload);
             sender.send(Mono.just(outboundMessage)).block();
+            LOGGER.info("Submitted task {} ({})", taskWithId.getId(), 
taskWithId.getTask().getClass());
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);
         }
@@ -258,6 +261,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     @Override
     public void cancel(TaskId taskId) {
+        LOGGER.info("Requesting cancel for task {}", taskId);
         sendCancelRequestsQueue.emitNext(taskId, FAIL_FAST);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to