This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 76ab140021 [FIX] Prevent WebSocket data race (#2550)
76ab140021 is described below
commit 76ab1400215eb9460ee045076a1c83f9bea29661
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Dec 6 21:16:31 2024 +0100
[FIX] Prevent WebSocket data race (#2550)
---
.../org/apache/james/jmap/change/StateChangeListener.scala | 6 +++++-
.../distributed/RabbitMQTerminationSubscriber.java | 10 ++++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index 1cf9d5d9d4..43e362364c 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -34,7 +34,11 @@ case class StateChangeListener(types: Set[TypeName], sink:
Sinks.Many[OutboundMe
case stateChangeEvent: StateChangeEvent =>
SMono.fromCallable(() =>
stateChangeEvent.asStateChange.filter(types)
- .foreach(next => sink.emitNext(next, FAIL_FAST)))
+ .foreach(next => {
+ sink.synchronized {
+ sink.emitNext(next, FAIL_FAST)
+ }
+ }))
.asJava().`then`()
case _ => SMono.empty
}
diff --git
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 511c93c2dd..606a212f75 100644
---
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -118,7 +118,11 @@ public class RabbitMQTerminationSubscriber implements
TerminationSubscriber, Sta
.subscribeOn(Schedulers.boundedElastic())
.map(this::toEvent)
.handle(publishIfPresent())
- .subscribe(e -> listener.emitNext(e, FAIL_FAST));
+ .subscribe(e -> {
+ synchronized (listener) {
+ listener.emitNext(e, FAIL_FAST);
+ }
+ });
}
@Override
@@ -127,7 +131,9 @@ public class RabbitMQTerminationSubscriber implements
TerminationSubscriber, Sta
byte[] payload =
serializer.serialize(event).getBytes(StandardCharsets.UTF_8);
AMQP.BasicProperties basicProperties = new
AMQP.BasicProperties.Builder().build();
OutboundMessage message = new OutboundMessage(EXCHANGE_NAME,
ROUTING_KEY, basicProperties, payload);
- sendQueue.emitNext(message, FAIL_FAST);
+ synchronized (sendQueue) {
+ sendQueue.emitNext(message, FAIL_FAST);
+ }
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]