This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 978540c07b Refactor some event bus handlers so that they do not
tightly bind to RabbitMQEventBus (#2656)
978540c07b is described below
commit 978540c07b3fa7afbf2ff0d28633f896548f0a2e
Author: hungphan227 <[email protected]>
AuthorDate: Mon Mar 3 08:02:40 2025 +0700
Refactor some event bus handlers so that they do not tightly bind to
RabbitMQEventBus (#2656)
Co-authored-by: hung phan <[email protected]>
---
.../java/org/apache/james/events/EventBus.java | 9 +++++++++
.../james/events/EventBusReconnectionHandler.java | 7 ++-----
.../james/events/GroupRegistrationHandler.java | 4 ----
...ler.java => GroupRegistrationHandlerGroup.java} | 22 +---------------------
.../james/events/KeyReconnectionHandler.java | 3 ---
.../events/RabbitEventBusConsumerHealthCheck.java | 6 +++---
.../org/apache/james/events/RabbitMQEventBus.java | 2 ++
.../james/CassandraRabbitMQJamesServerMain.java | 4 ++--
.../james/DistributedPOP3JamesServerMain.java | 4 ++--
.../org/apache/james/PostgresJamesServerMain.java | 4 ++--
...ntBusModule.java => MailboxEventBusModule.java} | 16 +++++++++++-----
11 files changed, 34 insertions(+), 47 deletions(-)
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 52b7ef76b4..f46ba06e05 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -100,4 +100,13 @@ public interface EventBus {
default Collection<Group> listRegisteredGroups() {
return ImmutableList.of();
}
+
+ default void start() {
+ }
+
+ default void restart() {
+ }
+
+ default void stop() {
+ }
}
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
index 98c0b03670..1510e635ce 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
@@ -19,8 +19,6 @@
package org.apache.james.events;
-import jakarta.inject.Inject;
-
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
import org.reactivestreams.Publisher;
@@ -29,10 +27,9 @@ import com.rabbitmq.client.Connection;
import reactor.core.publisher.Mono;
public class EventBusReconnectionHandler implements
SimpleConnectionPool.ReconnectionHandler {
- private final RabbitMQEventBus rabbitMQEventBus;
+ private final EventBus rabbitMQEventBus;
- @Inject
- public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) {
+ public EventBusReconnectionHandler(EventBus rabbitMQEventBus) {
this.rabbitMQEventBus = rabbitMQEventBus;
}
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index 5480af0a8f..415bc8355f 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -63,10 +63,6 @@ import reactor.util.retry.Retry;
class GroupRegistrationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupRegistrationHandler.class);
- public static class GroupRegistrationHandlerGroup extends Group {
-
- }
-
static final Group GROUP = new GroupRegistrationHandlerGroup();
private final NamingStrategy namingStrategy;
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
similarity index 65%
copy from
event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
copy to
event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
index 98c0b03670..77626bd3f2 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/EventBusReconnectionHandler.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandlerGroup.java
@@ -19,25 +19,5 @@
package org.apache.james.events;
-import jakarta.inject.Inject;
-
-import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
-import org.reactivestreams.Publisher;
-
-import com.rabbitmq.client.Connection;
-
-import reactor.core.publisher.Mono;
-
-public class EventBusReconnectionHandler implements
SimpleConnectionPool.ReconnectionHandler {
- private final RabbitMQEventBus rabbitMQEventBus;
-
- @Inject
- public EventBusReconnectionHandler(RabbitMQEventBus rabbitMQEventBus) {
- this.rabbitMQEventBus = rabbitMQEventBus;
- }
-
- @Override
- public Publisher<Void> handleReconnection(Connection connection) {
- return Mono.fromRunnable(rabbitMQEventBus::restart);
- }
+public class GroupRegistrationHandlerGroup extends Group {
}
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 34cf26046e..aefa2ce7e9 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -26,8 +26,6 @@ import static
org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
-import jakarta.inject.Inject;
-
import org.apache.james.backends.rabbitmq.QueueArguments;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
@@ -47,7 +45,6 @@ public class KeyReconnectionHandler implements
SimpleConnectionPool.Reconnection
private final EventBusId eventBusId;
private final RabbitMQConfiguration configuration;
- @Inject
public KeyReconnectionHandler(NamingStrategy namingStrategy, EventBusId
eventBusId, RabbitMQConfiguration configuration) {
this.namingStrategy = namingStrategy;
this.eventBusId = eventBusId;
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
index 8478561121..ed0e462cc3 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitEventBusConsumerHealthCheck.java
@@ -36,11 +36,11 @@ import reactor.core.publisher.Mono;
public class RabbitEventBusConsumerHealthCheck implements HealthCheck {
public static final String COMPONENT = "EventbusConsumers";
- private final RabbitMQEventBus eventBus;
+ private final EventBus eventBus;
private final NamingStrategy namingStrategy;
private final SimpleConnectionPool connectionPool;
- public RabbitEventBusConsumerHealthCheck(RabbitMQEventBus eventBus,
NamingStrategy namingStrategy,
+ public RabbitEventBusConsumerHealthCheck(EventBus eventBus, NamingStrategy
namingStrategy,
SimpleConnectionPool
connectionPool) {
this.eventBus = eventBus;
this.namingStrategy = namingStrategy;
@@ -65,7 +65,7 @@ public class RabbitEventBusConsumerHealthCheck implements
HealthCheck {
private Result check(Channel channel) {
Stream<Group> groups = Stream.concat(
eventBus.listRegisteredGroups().stream(),
- Stream.of(new
GroupRegistrationHandler.GroupRegistrationHandlerGroup()));
+ Stream.of(new GroupRegistrationHandlerGroup()));
Optional<String> queueWithoutConsumers = groups
.map(namingStrategy::workQueue)
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index ccd8df7bac..71e3ea757d 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -83,6 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
this.isStopping = false;
}
+ @Override
public void start() {
if (!isRunning && !isStopping) {
@@ -97,6 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
}
}
+ @Override
public void restart() {
keyRegistrationHandler.restart();
groupRegistrationHandler.restart();
diff --git
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index 2abeb519fe..6d0ae07e4e 100644
---
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -51,7 +51,7 @@ import
org.apache.james.modules.data.CassandraSieveRepositoryModule;
import org.apache.james.modules.data.CassandraUsersRepositoryModule;
import org.apache.james.modules.data.CassandraVacationModule;
import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.eventstore.CassandraEventStoreModule;
import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
import org.apache.james.modules.mailbox.CassandraMailboxModule;
@@ -181,7 +181,7 @@ public class CassandraRabbitMQJamesServerMain implements
JamesServerMain {
protected static final Module MODULES =
Modules.override(REQUIRE_TASK_MANAGER_MODULE, new
DistributedTaskManagerModule())
.with(new RabbitMQModule(),
- new RabbitMQEventBusModule(),
+ new MailboxEventBusModule(),
new DistributedTaskSerializationModule());
public static void main(String[] args) throws Exception {
diff --git
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index 9ecac2a5fb..c1247e8474 100644
---
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -57,7 +57,7 @@ import
org.apache.james.modules.data.CassandraSieveRepositoryModule;
import org.apache.james.modules.data.CassandraUsersRepositoryModule;
import org.apache.james.modules.data.CassandraVacationModule;
import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.eventstore.CassandraEventStoreModule;
import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule;
import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
@@ -175,7 +175,7 @@ public class DistributedPOP3JamesServerMain implements
JamesServerMain {
.with(new RabbitMQModule(),
new RabbitMQMailQueueModule(),
new RabbitMailQueueRoutesModule(),
- new RabbitMQEventBusModule(),
+ new MailboxEventBusModule(),
new DistributedTaskSerializationModule());
public static void main(String[] args) throws Exception {
diff --git
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
index 772d321f50..3d7df65509 100644
---
a/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
+++
b/server/apps/postgres-app/src/main/java/org/apache/james/PostgresJamesServerMain.java
@@ -49,7 +49,7 @@ import
org.apache.james.modules.data.PostgresUsersRepositoryModule;
import org.apache.james.modules.data.PostgresVacationModule;
import org.apache.james.modules.data.SievePostgresRepositoryModules;
import org.apache.james.modules.event.JMAPEventBusModule;
-import org.apache.james.modules.event.RabbitMQEventBusModule;
+import org.apache.james.modules.event.MailboxEventBusModule;
import org.apache.james.modules.events.PostgresDeadLetterModule;
import org.apache.james.modules.mailbox.DefaultEventModule;
import org.apache.james.modules.mailbox.PostgresDeletedMessageVaultModule;
@@ -231,7 +231,7 @@ public class PostgresJamesServerMain implements
JamesServerMain {
new ActiveMQQueueModule());
case RABBITMQ:
return List.of(
- Modules.override(new DefaultEventModule()).with(new
RabbitMQEventBusModule()),
+ Modules.override(new DefaultEventModule()).with(new
MailboxEventBusModule()),
new RabbitMQModule(),
new RabbitMQMailQueueModule(),
new FakeMailQueueViewModule(),
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
similarity index 90%
rename from
server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
rename to
server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
index 37522182a1..2e8d3edb0b 100644
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java
@@ -52,7 +52,7 @@ import com.google.inject.multibindings.ProvidesIntoSet;
import reactor.rabbitmq.Sender;
-public class RabbitMQEventBusModule extends AbstractModule {
+public class MailboxEventBusModule extends AbstractModule {
@Override
protected void configure() {
@@ -64,10 +64,6 @@ public class RabbitMQEventBusModule extends AbstractModule {
bind(RetryBackoffConfiguration.class).toInstance(RetryBackoffConfiguration.DEFAULT);
bind(EventBusId.class).toInstance(EventBusId.random());
- Multibinder<SimpleConnectionPool.ReconnectionHandler>
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(),
SimpleConnectionPool.ReconnectionHandler.class);
-
reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class);
-
reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class);
-
Multibinder.newSetBinder(binder(), HealthCheck.class)
.addBinding().to(RabbitMQMailboxEventBusDeadLetterQueueHealthCheck.class);
}
@@ -78,6 +74,16 @@ public class RabbitMQEventBusModule extends AbstractModule {
return new RabbitEventBusConsumerHealthCheck(eventBus, namingStrategy,
connectionPool);
}
+ @ProvidesIntoSet
+ SimpleConnectionPool.ReconnectionHandler
provideReconnectionHandler(RabbitMQEventBus eventBus) {
+ return new EventBusReconnectionHandler(eventBus);
+ }
+
+ @ProvidesIntoSet
+ SimpleConnectionPool.ReconnectionHandler
provideReconnectionHandler(NamingStrategy namingStrategy, EventBusId
eventBusId, RabbitMQConfiguration configuration) {
+ return new KeyReconnectionHandler(namingStrategy, eventBusId,
configuration);
+ }
+
@ProvidesIntoSet
InitializationOperation workQueue(RabbitMQEventBus instance) {
return InitilizationOperationBuilder
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]