This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ab140021 [FIX] Prevent WebSocket data race (#2550)
76ab140021 is described below

commit 76ab1400215eb9460ee045076a1c83f9bea29661
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Dec 6 21:16:31 2024 +0100

    [FIX] Prevent WebSocket data race (#2550)
---
 .../org/apache/james/jmap/change/StateChangeListener.scala     |  6 +++++-
 .../distributed/RabbitMQTerminationSubscriber.java             | 10 ++++++++--
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index 1cf9d5d9d4..43e362364c 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -34,7 +34,11 @@ case class StateChangeListener(types: Set[TypeName], sink: 
Sinks.Many[OutboundMe
       case stateChangeEvent: StateChangeEvent =>
         SMono.fromCallable(() =>
           stateChangeEvent.asStateChange.filter(types)
-            .foreach(next => sink.emitNext(next, FAIL_FAST)))
+            .foreach(next => {
+              sink.synchronized {
+                sink.emitNext(next, FAIL_FAST)
+              }
+            }))
           .asJava().`then`()
       case _ => SMono.empty
     }
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 511c93c2dd..606a212f75 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -118,7 +118,11 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
             .subscribeOn(Schedulers.boundedElastic())
             .map(this::toEvent)
             .handle(publishIfPresent())
-            .subscribe(e -> listener.emitNext(e, FAIL_FAST));
+            .subscribe(e -> {
+                synchronized (listener) {
+                    listener.emitNext(e, FAIL_FAST);
+                }
+            });
     }
 
     @Override
@@ -127,7 +131,9 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
             byte[] payload = 
serializer.serialize(event).getBytes(StandardCharsets.UTF_8);
             AMQP.BasicProperties basicProperties = new 
AMQP.BasicProperties.Builder().build();
             OutboundMessage message = new OutboundMessage(EXCHANGE_NAME, 
ROUTING_KEY, basicProperties, payload);
-            sendQueue.emitNext(message, FAIL_FAST);
+            synchronized (sendQueue) {
+                sendQueue.emitNext(message, FAIL_FAST);
+            }
         } catch (JsonProcessingException e) {
             throw new RuntimeException(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to