This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 23243b83b7 JAMES-4080 Refactor EventDeadLetters redeliver code to work
with multiple serializers (#2465)
23243b83b7 is described below
commit 23243b83b776cab8a0cd6d11963622a3ea683ce7
Author: hungphan227 <[email protected]>
AuthorDate: Mon Oct 28 10:30:49 2024 +0700
JAMES-4080 Refactor EventDeadLetters redeliver code to work with multiple
serializers (#2465)
---
.../james/events/EventSerializersAggregator.java | 70 ++++++++++++++++++++++
.../org/apache/james/events/RabbitMQEventBus.java | 2 -
.../james/modules/event/JMAPEventBusModule.java | 6 ++
.../modules/event/RabbitMQEventBusModule.java | 43 +++++++++----
.../james/modules/mailbox/DefaultEventModule.java | 16 ++---
5 files changed, 117 insertions(+), 20 deletions(-)
diff --git
a/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
new file mode 100644
index 0000000000..ca2e6b383d
--- /dev/null
+++
b/event-bus/api/src/main/java/org/apache/james/events/EventSerializersAggregator.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import java.util.Optional;
+import java.util.Set;
+
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+
+@Singleton
+public class EventSerializersAggregator implements EventSerializer {
+ private final Set<EventSerializer> allEventSerializers;
+
+ @Inject
+ public EventSerializersAggregator(Set<EventSerializer>
allEventSerializers) {
+ this.allEventSerializers = allEventSerializers;
+ }
+
+ @Override
+ public String toJson(Event event) {
+ return allEventSerializers.stream()
+ .map(eventSerializer -> serialize(event, eventSerializer))
+ .flatMap(Optional::stream)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Could not serialize
event: " + event));
+ }
+
+ @Override
+ public Event asEvent(String serialized) {
+ return allEventSerializers.stream()
+ .map(eventSerializer -> deserialize(serialized, eventSerializer))
+ .flatMap(Optional::stream)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Could not deserialize
event: " + serialized));
+ }
+
+ private Optional<String> serialize(Event event, EventSerializer
eventSerializer) {
+ try {
+ return Optional.of(eventSerializer.toJson(event));
+ } catch (Exception ex) {
+ return Optional.empty();
+ }
+ }
+
+ private Optional<Event> deserialize(String json, EventSerializer
eventSerializer) {
+ try {
+ return Optional.of(eventSerializer.asEvent(json));
+ } catch (Exception ex) {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index f115fd5eca..a81ec7a8a3 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.Set;
import jakarta.annotation.PreDestroy;
-import jakarta.inject.Inject;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
@@ -62,7 +61,6 @@ public class RabbitMQEventBus implements EventBus, Startable {
private KeyRegistrationHandler keyRegistrationHandler;
private EventDispatcher eventDispatcher;
- @Inject
public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender,
ReceiverProvider receiverProvider, EventSerializer eventSerializer,
RetryBackoffConfiguration retryBackoff,
RoutingKeyConverter routingKeyConverter,
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index ec41bf49ec..3a50397202 100644
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -32,6 +32,7 @@ import org.apache.james.events.EventBus;
import org.apache.james.events.EventBusId;
import org.apache.james.events.EventBusReconnectionHandler;
import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.EventSerializer;
import org.apache.james.events.KeyReconnectionHandler;
import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
import org.apache.james.events.RabbitMQEventBus;
@@ -120,4 +121,9 @@ public class JMAPEventBusModule extends AbstractModule {
EventBus registerEventBus(@Named(InjectionKeys.JMAP) EventBus eventBus) {
return eventBus;
}
+
+ @ProvidesIntoSet
+ EventSerializer registerEventSerializers(JmapEventSerializer
eventSerializer) {
+ return eventSerializer;
+ }
}
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 599eb666a1..37522182a1 100644
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -21,13 +21,16 @@ package org.apache.james.modules.event;
import static
org.apache.james.events.NamingStrategy.MAILBOX_EVENT_NAMING_STRATEGY;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.event.json.MailboxEventSerializer;
import org.apache.james.events.EventBus;
import org.apache.james.events.EventBusId;
import org.apache.james.events.EventBusReconnectionHandler;
-import org.apache.james.events.EventSerializer;
+import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.KeyReconnectionHandler;
import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitEventBusConsumerHealthCheck;
@@ -35,29 +38,25 @@ import org.apache.james.events.RabbitMQEventBus;
import
org.apache.james.events.RabbitMQMailboxEventBusDeadLetterQueueHealthCheck;
import org.apache.james.events.RegistrationKey;
import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
+import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
import com.google.inject.AbstractModule;
-import com.google.inject.Scopes;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.ProvidesIntoSet;
+import reactor.rabbitmq.Sender;
+
public class RabbitMQEventBusModule extends AbstractModule {
@Override
protected void configure() {
- bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
- bind(EventSerializer.class).to(MailboxEventSerializer.class);
-
bind(NamingStrategy.class).toInstance(MAILBOX_EVENT_NAMING_STRATEGY);
- bind(RabbitMQEventBus.class).in(Scopes.SINGLETON);
- bind(EventBus.class).to(RabbitMQEventBus.class);
-
- Multibinder.newSetBinder(binder(), EventBus.class)
- .addBinding()
- .to(EventBus.class);
Multibinder.newSetBinder(binder(), RegistrationKey.Factory.class)
.addBinding().to(MailboxIdRegistrationKey.Factory.class);
@@ -85,4 +84,26 @@ public class RabbitMQEventBusModule extends AbstractModule {
.forClass(RabbitMQEventBus.class)
.init(instance::start);
}
+
+ @Provides
+ @Singleton
+ RabbitMQEventBus provideRabbitMQEventBus(NamingStrategy namingStrategy,
Sender sender, ReceiverProvider receiverProvider, MailboxEventSerializer
eventSerializer,
+ RetryBackoffConfiguration
retryBackoff,
+ RoutingKeyConverter
routingKeyConverter,
+ EventDeadLetters eventDeadLetters,
MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
+ EventBusId eventBusId,
RabbitMQConfiguration configuration) {
+ return new RabbitMQEventBus(namingStrategy, sender, receiverProvider,
eventSerializer, retryBackoff, routingKeyConverter,
+ eventDeadLetters, metricFactory, channelPool, eventBusId,
configuration);
+ }
+
+ @Provides
+ @Singleton
+ EventBus provideEventBus(RabbitMQEventBus rabbitMQEventBus) {
+ return rabbitMQEventBus;
+ }
+
+ @ProvidesIntoSet
+ EventBus registerEventBus(EventBus eventBus) {
+ return eventBus;
+ }
}
diff --git
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
index c71c90a301..e68106e868 100644
---
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
+++
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/mailbox/DefaultEventModule.java
@@ -24,6 +24,7 @@ import org.apache.james.event.json.MailboxEventSerializer;
import org.apache.james.events.EventBus;
import org.apache.james.events.EventListener;
import org.apache.james.events.EventSerializer;
+import org.apache.james.events.EventSerializersAggregator;
import org.apache.james.events.InVMEventBus;
import org.apache.james.events.RetryBackoffConfiguration;
import org.apache.james.events.delivery.EventDelivery;
@@ -44,25 +45,26 @@ import com.google.inject.multibindings.ProvidesIntoSet;
public class DefaultEventModule extends AbstractModule {
@Override
protected void configure() {
- bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
- bind(EventSerializer.class).to(MailboxEventSerializer.class);
-
bind(MailboxListenerFactory.class).in(Scopes.SINGLETON);
bind(MailboxListenersLoaderImpl.class).in(Scopes.SINGLETON);
bind(InVmEventDelivery.class).in(Scopes.SINGLETON);
bind(InVMEventBus.class).in(Scopes.SINGLETON);
+ bind(MailboxEventSerializer.class).in(Scopes.SINGLETON);
+
+ bind(EventSerializer.class).to(EventSerializersAggregator.class);
Multibinder.newSetBinder(binder(),
GuiceProbe.class).addBinding().to(EventDeadLettersProbe.class);
bind(MailboxListenersLoader.class).to(MailboxListenersLoaderImpl.class);
bind(EventDelivery.class).to(InVmEventDelivery.class);
bind(EventBus.class).to(InVMEventBus.class);
- Multibinder.newSetBinder(binder(), EventBus.class)
- .addBinding()
- .to(EventBus.class);
-
bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
+ Multibinder.newSetBinder(binder(), EventSerializer.class)
+ .addBinding()
+ .to(MailboxEventSerializer.class);
+
+ Multibinder.newSetBinder(binder(), EventBus.class);
Multibinder.newSetBinder(binder(),
EventListener.GroupEventListener.class);
Multibinder.newSetBinder(binder(),
EventListener.ReactiveGroupEventListener.class);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]