This is an automated email from the ASF dual-hosted git repository. chibenwa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8b7faca047a9acacf525a747a6f2ff3a801d15f6 Author: Benoit TELLIER <[email protected]> AuthorDate: Thu Apr 30 10:50:36 2026 +0200 JAMES-4203 Plug Ientity events into CustomIdentityDAO --- server/data/data-jmap-cassandra/pom.xml | 5 + .../identity/CassandraCustomIdentityDAO.scala | 39 +++++-- .../identity/CassandraCustomIdentityTest.java | 20 +++- server/data/data-jmap-postgres/pom.xml | 5 + .../identity/PostgresCustomIdentityDAO.java | 58 ++++++++-- .../identity/PostgresCustomIdentityDAOTest.java | 18 ++- server/data/data-jmap/pom.xml | 5 + .../james/jmap/api/identity}/IdentityEvents.scala | 2 +- .../jmap/change/AccountIdRegistrationKey.scala | 0 .../memory/identity/MemoryCustomIdentityDAO.scala | 42 +++++-- .../api/identity/CustomIdentityDAOContract.scala | 126 +++++++++++++++++++++ .../jmap/api/identity/IdentityRepositoryTest.scala | 8 +- .../IdentityUserDeletionTaskStepTest.scala | 8 +- .../memory/identity/MemoryCustomIdentityTest.scala | 20 ++-- .../jmap/change/IdentityEventsSerializer.scala | 2 +- .../jmap/change/IdentityEventsSerializerTest.scala | 14 +-- server/protocols/webadmin/webadmin-jmap/pom.xml | 5 + .../data/jmap/UserIdentitiesRoutesTest.java | 8 +- 18 files changed, 333 insertions(+), 52 deletions(-) diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml index d8a5efe126..7155076b62 100644 --- a/server/data/data-jmap-cassandra/pom.xml +++ b/server/data/data-jmap-cassandra/pom.xml @@ -38,6 +38,11 @@ <groupId>${james.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>event-bus-in-vm</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> diff --git a/server/data/data-jmap-cassandra/src/main/scala/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityDAO.scala b/server/data/data-jmap-cassandra/src/main/scala/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityDAO.scala index 6652f09134..b947446ac9 100644 --- a/server/data/data-jmap-cassandra/src/main/scala/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityDAO.scala +++ b/server/data/data-jmap-cassandra/src/main/scala/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityDAO.scala @@ -23,22 +23,27 @@ import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.cql.{BoundStatement, PreparedStatement, Row} import com.datastax.oss.driver.api.core.data.UdtValue import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, deleteFrom, insertInto, selectFrom} +import com.google.inject.name.Named import jakarta.inject.Inject import org.apache.james.backends.cassandra.init.CassandraTypesProvider import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.core.{MailAddress, Username} -import org.apache.james.jmap.api.identity.{CustomIdentityDAO, IdentityCreationRequest, IdentityNotFoundException, IdentityUpdate} -import org.apache.james.jmap.api.model.{EmailAddress, EmailerName, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} +import org.apache.james.events.Event +import org.apache.james.events.EventBus +import org.apache.james.jmap.api.identity.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDAO, CustomIdentityDeleted, CustomIdentityUpdated, IdentityCreationRequest, IdentityNotFoundException, IdentityUpdate} +import org.apache.james.jmap.api.model.{AccountId, EmailAddress, EmailerName, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} import org.apache.james.jmap.cassandra.identity.tables.CassandraCustomIdentityTable import org.apache.james.jmap.cassandra.identity.tables.CassandraCustomIdentityTable.{BCC, EMAIL, HTML_SIGNATURE, ID, MAY_DELETE, NAME, REPLY_TO, SORT_ORDER, TABLE_NAME, TEXT_SIGNATURE, USER} import org.apache.james.jmap.cassandra.utils.EmailAddressTupleUtil +import org.apache.james.jmap.change.AccountIdRegistrationKey import reactor.core.publisher.Mono import reactor.core.scala.publisher.{SFlux, SMono} import scala.jdk.javaapi.CollectionConverters case class CassandraCustomIdentityDAO @Inject()(session: CqlSession, - typesProvider: CassandraTypesProvider) extends CustomIdentityDAO { + typesProvider: CassandraTypesProvider, + @Named("JMAP") eventBus: EventBus) extends CustomIdentityDAO { val executor: CassandraAsyncExecutor = new CassandraAsyncExecutor(session) val emailAddressTupleUtil: EmailAddressTupleUtil = EmailAddressTupleUtil(typesProvider) @@ -82,6 +87,9 @@ case class CassandraCustomIdentityDAO @Inject()(session: CqlSession, SMono.just(identityId) .map(creationRequest.asIdentity) .flatMap(identity => insert(user, identity)) + .flatMap(identity => + SMono.fromPublisher(eventBus.dispatch(CustomIdentityCreated(Event.EventId.random(), user, identity), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`(SMono.just(identity))) override def list(user: Username): SFlux[Identity] = SFlux.fromPublisher(executor.executeRows(selectAllStatement.bind().setString(USER, user.asString())) @@ -98,22 +106,33 @@ case class CassandraCustomIdentityDAO @Inject()(session: CqlSession, .switchIfEmpty(Mono.error(() => IdentityNotFoundException(identityId))) .map(toIdentity) .map(identityUpdate.update(_)) - .flatMap(patch => insert(user, patch).`then`().asJava())) + .flatMap(updatedIdentity => insert(user, updatedIdentity) + .flatMap(i => SMono.fromPublisher(eventBus.dispatch(CustomIdentityUpdated(Event.EventId.random(), user, i), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`()).asJava())) override def upsert(user: Username, patch: Identity): SMono[Unit] = - insert(user, patch).`then`() + insert(user, patch) + .flatMap(_ => SMono.fromPublisher(eventBus.dispatch(CustomIdentityUpdated(Event.EventId.random(), user, patch), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`()) override def delete(username: Username, ids: Set[IdentityId]): SMono[Unit] = SFlux.fromIterable(ids) .flatMap(id => executor.executeVoid(deleteOneStatement.bind() .setString(USER, username.asString()) .setUuid(ID, id.id))) - .`then`() + .collectSeq() + .flatMap(_ => SMono.fromPublisher(eventBus.dispatch(CustomIdentityDeleted(Event.EventId.random(), username, ids), AccountIdRegistrationKey(AccountId.fromUsername(username)))) + .`then`()) override def delete(username: Username): SMono[Unit] = - SMono(executor.executeVoid(deleteAllStatement.bind() - .setString(USER, username.asString()))) - .`then`() + list(username) + .map(_.id) + .collectSeq() + .flatMap(ids => + SMono.fromPublisher(executor.executeVoid(deleteAllStatement.bind() + .setString(USER, username.asString())) + .then(Mono.from(eventBus.dispatch(AllCustomIdentitiesDeleted(Event.EventId.random(), username, ids.toSet), AccountIdRegistrationKey(AccountId.fromUsername(username)))))) + .`then`()) private def insert(username: Username, identity: Identity): SMono[Identity] = { val replyTo: java.util.Set[UdtValue] = toJavaSet(identity.replyTo.getOrElse(List())) @@ -163,4 +182,4 @@ case class CassandraCustomIdentityDAO @Inject()(session: CqlSession, private def toEmailAddress(udtValue: UdtValue): EmailAddress = EmailAddress(name = Option(udtValue.getString(CassandraCustomIdentityTable.EmailAddress.NAME)).map(string => EmailerName(string)), email = new MailAddress(udtValue.getString(CassandraCustomIdentityTable.EmailAddress.EMAIL))) -} \ No newline at end of file +} diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityTest.java index d8802bf99e..2697df8250 100644 --- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityTest.java +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/identity/CassandraCustomIdentityTest.java @@ -23,13 +23,20 @@ import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraDataDefinition; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDataDefinition; +import org.apache.james.events.EventBus; +import org.apache.james.events.InVMEventBus; +import org.apache.james.events.MemoryEventDeadLetters; +import org.apache.james.events.RetryBackoffConfiguration; +import org.apache.james.events.delivery.InVmEventDelivery; import org.apache.james.jmap.api.identity.CustomIdentityDAO; import org.apache.james.jmap.api.identity.CustomIdentityDAOContract; +import org.apache.james.metrics.tests.RecordingMetricFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; public class CassandraCustomIdentityTest implements CustomIdentityDAOContract { private CassandraCustomIdentityDAO testee; + private InVMEventBus inVMEventBus; @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraDataDefinition.aggregateModules( @@ -38,12 +45,21 @@ public class CassandraCustomIdentityTest implements CustomIdentityDAOContract { @BeforeEach void setup(CassandraCluster cassandra) { + inVMEventBus = new InVMEventBus( + new InVmEventDelivery(new RecordingMetricFactory()), + RetryBackoffConfiguration.FAST, + new MemoryEventDeadLetters()); testee = new CassandraCustomIdentityDAO(cassandra.getConf(), - cassandra.getTypesProvider()); + cassandra.getTypesProvider(), inVMEventBus); } @Override public CustomIdentityDAO testee() { return testee; } -} \ No newline at end of file + + @Override + public EventBus eventBus() { + return inVMEventBus; + } +} diff --git a/server/data/data-jmap-postgres/pom.xml b/server/data/data-jmap-postgres/pom.xml index 88def77379..7444ba67d0 100644 --- a/server/data/data-jmap-postgres/pom.xml +++ b/server/data/data-jmap-postgres/pom.xml @@ -68,6 +68,11 @@ <artifactId>blob-storage-strategy</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>event-bus-in-vm</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-sourcing-event-store-postgres</artifactId> diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAO.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAO.java index fd42dab139..f11df4f991 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAO.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAO.java @@ -35,17 +35,26 @@ import java.util.List; import java.util.Optional; import jakarta.inject.Inject; +import jakarta.inject.Named; import org.apache.james.backends.postgres.utils.PostgresExecutor; import org.apache.james.core.MailAddress; import org.apache.james.core.Username; +import org.apache.james.events.Event; +import org.apache.james.events.EventBus; +import org.apache.james.jmap.api.identity.AllCustomIdentitiesDeleted; +import org.apache.james.jmap.api.identity.CustomIdentityCreated; import org.apache.james.jmap.api.identity.CustomIdentityDAO; +import org.apache.james.jmap.api.identity.CustomIdentityDeleted; +import org.apache.james.jmap.api.identity.CustomIdentityUpdated; import org.apache.james.jmap.api.identity.IdentityCreationRequest; import org.apache.james.jmap.api.identity.IdentityNotFoundException; import org.apache.james.jmap.api.identity.IdentityUpdate; +import org.apache.james.jmap.api.model.AccountId; import org.apache.james.jmap.api.model.EmailAddress; import org.apache.james.jmap.api.model.Identity; import org.apache.james.jmap.api.model.IdentityId; +import org.apache.james.jmap.change.AccountIdRegistrationKey; import org.jooq.JSON; import org.jooq.Record; import org.reactivestreams.Publisher; @@ -57,6 +66,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -89,11 +99,14 @@ public class PostgresCustomIdentityDAO implements CustomIdentityDAO { } private final PostgresExecutor.Factory executorFactory; + private final EventBus eventBus; private final ObjectMapper objectMapper = new ObjectMapper(); @Inject - public PostgresCustomIdentityDAO(PostgresExecutor.Factory executorFactory) { + public PostgresCustomIdentityDAO(PostgresExecutor.Factory executorFactory, + @Named("JMAP") EventBus eventBus) { this.executorFactory = executorFactory; + this.eventBus = eventBus; } @Override @@ -104,7 +117,11 @@ public class PostgresCustomIdentityDAO implements CustomIdentityDAO { @Override public Publisher<Identity> save(Username user, IdentityId identityId, IdentityCreationRequest creationRequest) { final Identity identity = creationRequest.asIdentity(identityId); - return upsertReturnMono(user, identity); + return upsertReturnMono(user, identity) + .flatMap(saved -> eventBus.dispatch( + new CustomIdentityCreated(Event.EventId.random(), user, saved), + new AccountIdRegistrationKey(AccountId.fromUsername(user))) + .thenReturn(saved)); } @Override @@ -129,14 +146,20 @@ public class PostgresCustomIdentityDAO implements CustomIdentityDAO { return Mono.from(findByIdentityId(user, identityId)) .switchIfEmpty(Mono.error(new IdentityNotFoundException(identityId))) .map(identityUpdate::update) - .flatMap(identity -> upsertReturnMono(user, identity)) - .thenReturn(BoxedUnit.UNIT); + .flatMap(updatedIdentity -> upsertReturnMono(user, updatedIdentity) + .flatMap(saved -> eventBus.dispatch( + new CustomIdentityUpdated(Event.EventId.random(), user, saved), + new AccountIdRegistrationKey(AccountId.fromUsername(user))) + .thenReturn(BoxedUnit.UNIT))); } @Override public SMono<BoxedUnit> upsert(Username user, Identity patch) { return SMono.fromPublisher(upsertReturnMono(user, patch) - .thenReturn(BoxedUnit.UNIT)); + .flatMap(saved -> eventBus.dispatch( + new CustomIdentityUpdated(Event.EventId.random(), user, saved), + new AccountIdRegistrationKey(AccountId.fromUsername(user))) + .thenReturn(BoxedUnit.UNIT))); } private Mono<Identity> upsertReturnMono(Username user, Identity identity) { @@ -168,21 +191,36 @@ public class PostgresCustomIdentityDAO implements CustomIdentityDAO { @Override public Publisher<BoxedUnit> delete(Username username, Set<IdentityId> ids) { if (ids.isEmpty()) { - return Mono.empty(); + return eventBus.dispatch( + new CustomIdentityDeleted(Event.EventId.random(), username, ids), + new AccountIdRegistrationKey(AccountId.fromUsername(username))) + .thenReturn(BoxedUnit.UNIT); } return executorFactory.create(username.getDomainPart()) .executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) .where(USERNAME.eq(username.asString())) .and(ID.in(CollectionConverters.asJavaCollection(ids).stream().map(IdentityId::id).collect(ImmutableList.toImmutableList()))))) + .then(eventBus.dispatch( + new CustomIdentityDeleted(Event.EventId.random(), username, ids), + new AccountIdRegistrationKey(AccountId.fromUsername(username)))) .thenReturn(BoxedUnit.UNIT); } @Override public Publisher<BoxedUnit> delete(Username username) { - return executorFactory.create(username.getDomainPart()) - .executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) - .where(USERNAME.eq(username.asString())))) - .thenReturn(BoxedUnit.UNIT); + return Flux.from(list(username)) + .map(Identity::id) + .collectList() + .flatMap(idList -> { + Set<IdentityId> idSet = CollectionConverters.asScala(ImmutableSet.copyOf(idList)).toSet(); + return executorFactory.create(username.getDomainPart()) + .executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) + .where(USERNAME.eq(username.asString())))) + .then(eventBus.dispatch( + new AllCustomIdentitiesDeleted(Event.EventId.random(), username, idSet), + new AccountIdRegistrationKey(AccountId.fromUsername(username)))) + .thenReturn(BoxedUnit.UNIT); + }); } private Identity readRecord(Record record) throws Exception { diff --git a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAOTest.java b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAOTest.java index c3397172d8..53a1491034 100644 --- a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAOTest.java +++ b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/identity/PostgresCustomIdentityDAOTest.java @@ -20,16 +20,32 @@ package org.apache.james.jmap.postgres.identity; import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.events.EventBus; +import org.apache.james.events.InVMEventBus; +import org.apache.james.events.MemoryEventDeadLetters; +import org.apache.james.events.RetryBackoffConfiguration; +import org.apache.james.events.delivery.InVmEventDelivery; import org.apache.james.jmap.api.identity.CustomIdentityDAO; import org.apache.james.jmap.api.identity.CustomIdentityDAOContract; +import org.apache.james.metrics.tests.RecordingMetricFactory; import org.junit.jupiter.api.extension.RegisterExtension; public class PostgresCustomIdentityDAOTest implements CustomIdentityDAOContract { @RegisterExtension static PostgresExtension postgresExtension = PostgresExtension.withRowLevelSecurity(PostgresCustomIdentityDataDefinition.MODULE); + private final InVMEventBus inVMEventBus = new InVMEventBus( + new InVmEventDelivery(new RecordingMetricFactory()), + RetryBackoffConfiguration.FAST, + new MemoryEventDeadLetters()); + @Override public CustomIdentityDAO testee() { - return new PostgresCustomIdentityDAO(postgresExtension.getExecutorFactory()); + return new PostgresCustomIdentityDAO(postgresExtension.getExecutorFactory(), inVMEventBus); + } + + @Override + public EventBus eventBus() { + return inVMEventBus; } } diff --git a/server/data/data-jmap/pom.xml b/server/data/data-jmap/pom.xml index c7973a8f87..b5d4376e65 100644 --- a/server/data/data-jmap/pom.xml +++ b/server/data/data-jmap/pom.xml @@ -61,6 +61,11 @@ <artifactId>blob-storage-strategy</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>event-bus-in-vm</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-sourcing-core</artifactId> diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/IdentityEvents.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/IdentityEvents.scala similarity index 98% rename from server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/IdentityEvents.scala rename to server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/IdentityEvents.scala index fd0a7ecf5c..628fdd6f7d 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/IdentityEvents.scala +++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/api/identity/IdentityEvents.scala @@ -17,7 +17,7 @@ * under the License. * ****************************************************************/ -package org.apache.james.jmap.mail +package org.apache.james.jmap.api.identity import org.apache.james.core.Username import org.apache.james.events.Event diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala similarity index 100% rename from server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala rename to server/data/data-jmap/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala diff --git a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityDAO.scala b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityDAO.scala index a73047e174..60eb61c349 100644 --- a/server/data/data-jmap/src/main/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityDAO.scala +++ b/server/data/data-jmap/src/main/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityDAO.scala @@ -20,15 +20,20 @@ package org.apache.james.jmap.memory.identity import com.google.common.collect.{HashBasedTable, Table} +import com.google.inject.name.Named +import jakarta.inject.Inject import org.apache.james.core.Username -import org.apache.james.jmap.api.identity.{CustomIdentityDAO, IdentityCreationRequest, IdentityNotFoundException, IdentityUpdate} -import org.apache.james.jmap.api.model.{Identity, IdentityId} +import org.apache.james.events.Event.EventId +import org.apache.james.events.{Event, EventBus} +import org.apache.james.jmap.api.identity.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDAO, CustomIdentityDeleted, CustomIdentityUpdated, IdentityCreationRequest, IdentityNotFoundException, IdentityUpdate} +import org.apache.james.jmap.api.model.{AccountId, Identity, IdentityId} +import org.apache.james.jmap.change.AccountIdRegistrationKey import org.reactivestreams.Publisher import reactor.core.scala.publisher.{SFlux, SMono} import scala.jdk.CollectionConverters._ -class MemoryCustomIdentityDAO extends CustomIdentityDAO { +class MemoryCustomIdentityDAO @Inject()(@Named("JMAP") eventBus: EventBus) extends CustomIdentityDAO { private val table: Table[Username, IdentityId, Identity] = HashBasedTable.create override def save(user: Username, creationRequest: IdentityCreationRequest): Publisher[Identity] = @@ -38,6 +43,9 @@ class MemoryCustomIdentityDAO extends CustomIdentityDAO { SMono.just(identityId) .map(creationRequest.asIdentity) .doOnNext(identity => table.put(user, identity.id, identity)) + .flatMap(identity => + SMono.fromPublisher(eventBus.dispatch(CustomIdentityCreated(Event.EventId.random(), user, identity), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`(SMono.just(identity))) override def list(user: Username): Publisher[Identity] = SFlux.fromIterable(table.row(user).values().asScala) @@ -49,13 +57,27 @@ class MemoryCustomIdentityDAO extends CustomIdentityDAO { override def update(user: Username, identityId: IdentityId, identityUpdate: IdentityUpdate): Publisher[Unit] = Option(table.get(user, identityId)) .map(identityUpdate.update) - .fold(SMono.error[Unit](IdentityNotFoundException(identityId)))(identity => SMono.fromCallable[Unit](() => table.put(user, identityId, identity))) + .fold(SMono.error[Unit](IdentityNotFoundException(identityId)))(updatedIdentity => + SMono.fromCallable[Unit](() => table.put(user, identityId, updatedIdentity)) + .flatMap(_ => SMono.fromPublisher(eventBus.dispatch(CustomIdentityUpdated(Event.EventId.random(), user, updatedIdentity), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`())) - override def upsert(user: Username, patch: Identity): SMono[Unit] = SMono.fromCallable[Unit](() => table.put(user, patch.id, patch)) + override def upsert(user: Username, patch: Identity): SMono[Unit] = + SMono.fromCallable[Unit](() => table.put(user, patch.id, patch)) + .flatMap(_ => SMono.fromPublisher(eventBus.dispatch(CustomIdentityUpdated(Event.EventId.random(), user, patch), AccountIdRegistrationKey(AccountId.fromUsername(user)))) + .`then`()) - override def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] = SFlux.fromIterable(ids) - .doOnNext(id => table.remove(username, id)) - .`then`() + override def delete(username: Username, ids: Set[IdentityId]): Publisher[Unit] = + SMono.fromCallable[Unit](() => ids.foreach(id => table.remove(username, id))) + .flatMap(_ => SMono.fromPublisher(eventBus.dispatch(CustomIdentityDeleted(Event.EventId.random(), username, ids), AccountIdRegistrationKey(AccountId.fromUsername(username)))) + .`then`()) - override def delete(username: Username): Publisher[Unit] = SMono.fromCallable(() => table.rowMap().remove(username)) -} \ No newline at end of file + override def delete(username: Username): Publisher[Unit] = + SMono.fromCallable(() => { + val ids = table.row(username).keySet().asScala.toSet + table.rowMap().remove(username) + ids + }) + .flatMap(ids => SMono.fromPublisher(eventBus.dispatch(AllCustomIdentitiesDeleted(Event.EventId.random(), username, ids), AccountIdRegistrationKey(AccountId.fromUsername(username)))) + .`then`()) +} diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/CustomIdentityDAOContract.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/CustomIdentityDAOContract.scala index 6468bab542..7039f26173 100644 --- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/CustomIdentityDAOContract.scala +++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/CustomIdentityDAOContract.scala @@ -19,14 +19,20 @@ package org.apache.james.jmap.api.identity +import java.util.concurrent.CopyOnWriteArrayList + import org.apache.james.core.{MailAddress, Username} +import org.apache.james.events.EventListener.{ExecutionMode, ReactiveGroupEventListener} +import org.apache.james.events.{Event, EventBus, Group} import org.apache.james.jmap.api.identity.CustomIdentityDAOContract.{CREATION_REQUEST, bob} import org.apache.james.jmap.api.identity.IdentityRepositoryTest.{BOB, IDENTITY1} import org.apache.james.jmap.api.model.{EmailAddress, EmailerName, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher import reactor.core.scala.publisher.{SFlux, SMono} +import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ object CustomIdentityDAOContract { @@ -37,11 +43,31 @@ object CustomIdentityDAOContract { bcc = Some(List(EmailAddress(Some(EmailerName("My Boss 2")), new MailAddress("[email protected]")))), textSignature = Some(TextSignature("text signature")), htmlSignature = Some(HtmlSignature("html signature"))) + + class IdentityEventCollectorGroup extends Group {} + + class IdentityEventCollector extends ReactiveGroupEventListener { + val events: CopyOnWriteArrayList[IdentityEvent] = new CopyOnWriteArrayList[IdentityEvent]() + + override def getDefaultGroup: Group = new IdentityEventCollectorGroup() + + override def isHandling(event: Event): Boolean = event.isInstanceOf[IdentityEvent] + + override def getExecutionMode: ExecutionMode = ExecutionMode.SYNCHRONOUS + + override def reactiveEvent(event: Event): Publisher[Void] = { + events.add(event.asInstanceOf[IdentityEvent]) + SMono.empty[Void] + } + } } + trait CustomIdentityDAOContract { def testee(): CustomIdentityDAO + def eventBus: EventBus + @Test def listShouldReturnEmptyWhenNone(): Unit = { assertThat(SFlux(testee().list(bob)).asJava().collectList().block()) @@ -288,4 +314,104 @@ trait CustomIdentityDAOContract { assertThat(SMono(testee().findByIdentityId(bob, identity.id)).block()) .isEqualTo(identity) } + + @Test + def saveShouldDispatchCreatedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val identity = SMono(testee().save(bob, CREATION_REQUEST)).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[CustomIdentityCreated]) + assertThat(collector.events.asScala.map(_.asInstanceOf[CustomIdentityCreated].identity).asJava) + .containsOnly(identity) + assertThat(collector.events.asScala.map(_.getUsername).asJava) + .containsOnly(bob) + } + + @Test + def saveWithIdShouldDispatchCreatedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val id = IdentityId.generate + val identity = SMono(testee().save(bob, id, CREATION_REQUEST)).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[CustomIdentityCreated]) + assertThat(collector.events.asScala.map(_.asInstanceOf[CustomIdentityCreated].identity).asJava) + .containsOnly(identity) + } + + @Test + def updateShouldDispatchUpdatedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val identity = SMono(testee().save(bob, CREATION_REQUEST)).block() + collector.events.clear() + + SMono(testee().update(bob, identity.id, IdentityUpdateRequest( + name = Some(IdentityNameUpdate(IdentityName("Bob (updated)")))))).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[CustomIdentityUpdated]) + val updatedEvent = collector.events.get(0).asInstanceOf[CustomIdentityUpdated] + assertThat(updatedEvent.getUsername).isEqualTo(bob) + assertThat(updatedEvent.identity.name).isEqualTo(IdentityName("Bob (updated)")) + assertThat(updatedEvent.identity.id).isEqualTo(identity.id) + } + + @Test + def upsertShouldDispatchUpdatedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val identity = SMono(testee().save(bob, CREATION_REQUEST)).block() + collector.events.clear() + + val updatedIdentity = identity.copy(name = IdentityName("Bob (upserted)")) + SMono(testee().upsert(bob, updatedIdentity)).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[CustomIdentityUpdated]) + val updatedEvent = collector.events.get(0).asInstanceOf[CustomIdentityUpdated] + assertThat(updatedEvent.getUsername).isEqualTo(bob) + assertThat(updatedEvent.identity.name).isEqualTo(IdentityName("Bob (upserted)")) + } + + @Test + def deleteShouldDispatchDeletedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val identity = SMono(testee().save(bob, CREATION_REQUEST)).block() + collector.events.clear() + + SMono(testee().delete(bob, Set(identity.id))).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[CustomIdentityDeleted]) + val deletedEvent = collector.events.get(0).asInstanceOf[CustomIdentityDeleted] + assertThat(deletedEvent.getUsername).isEqualTo(bob) + assertThat(deletedEvent.identityIds.asJava).containsOnly(identity.id) + } + + @Test + def deleteAllShouldDispatchAllDeletedEvent(): Unit = { + val collector = new CustomIdentityDAOContract.IdentityEventCollector() + eventBus.register(collector, collector.getDefaultGroup) + + val identity = SMono(testee().save(bob, CREATION_REQUEST)).block() + collector.events.clear() + + SMono(testee().delete(bob)).block() + + assertThat(collector.events.asScala.map(_.getClass).asJava) + .containsOnly(classOf[AllCustomIdentitiesDeleted]) + val deletedEvent = collector.events.get(0).asInstanceOf[AllCustomIdentitiesDeleted] + assertThat(deletedEvent.getUsername).isEqualTo(bob) + assertThat(deletedEvent.identityIds.asJava).containsOnly(identity.id) + } } diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala index 1bc97851a2..c92a8090a9 100644 --- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala +++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/api/identity/IdentityRepositoryTest.scala @@ -22,7 +22,12 @@ package org.apache.james.jmap.api.identity import org.apache.james.core.{MailAddress, Username} import org.apache.james.jmap.api.identity.IdentityRepositoryTest.{BOB, CREATION_REQUEST, IDENTITY1, UPDATE_REQUEST} import org.apache.james.jmap.api.model.{EmailAddress, EmailerName, ForbiddenSendFromException, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} +import org.apache.james.events.InVMEventBus +import org.apache.james.events.MemoryEventDeadLetters +import org.apache.james.events.RetryBackoffConfiguration +import org.apache.james.events.delivery.InVmEventDelivery import org.apache.james.jmap.memory.identity.MemoryCustomIdentityDAO +import org.apache.james.metrics.tests.RecordingMetricFactory import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy} import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.ArgumentMatchers.any @@ -77,7 +82,8 @@ class IdentityRepositoryTest { @BeforeEach def setUp(): Unit = { - customIdentityDAO = new MemoryCustomIdentityDAO() + val eventBus = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.FAST, new MemoryEventDeadLetters()) + customIdentityDAO = new MemoryCustomIdentityDAO(eventBus) identityFactory = mock(classOf[DefaultIdentitySupplier]) testee = new IdentityRepository(customIdentityDAO, identityFactory) } diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/IdentityUserDeletionTaskStepTest.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/IdentityUserDeletionTaskStepTest.scala index 23e9db7df9..e4b47b9203 100644 --- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/IdentityUserDeletionTaskStepTest.scala +++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/IdentityUserDeletionTaskStepTest.scala @@ -19,8 +19,13 @@ package org.apache.james.jmap.memory.identity +import org.apache.james.events.InVMEventBus +import org.apache.james.events.MemoryEventDeadLetters +import org.apache.james.events.RetryBackoffConfiguration +import org.apache.james.events.delivery.InVmEventDelivery import org.apache.james.jmap.api.identity.CustomIdentityDAOContract.{CREATION_REQUEST, bob} import org.apache.james.jmap.api.identity.IdentityUserDeletionTaskStep +import org.apache.james.metrics.tests.RecordingMetricFactory import org.assertj.core.api.Assertions.{assertThat, assertThatCode} import org.junit.jupiter.api.{BeforeEach, Test} import reactor.core.publisher.Flux @@ -32,7 +37,8 @@ class IdentityUserDeletionTaskStepTest { @BeforeEach def setUp(): Unit = { - identityDAO = new MemoryCustomIdentityDAO() + val eventBus = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.FAST, new MemoryEventDeadLetters()) + identityDAO = new MemoryCustomIdentityDAO(eventBus) testee = new IdentityUserDeletionTaskStep(identityDAO) } diff --git a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityTest.scala b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityTest.scala index 9ffd3b478b..abd093b9ae 100644 --- a/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityTest.scala +++ b/server/data/data-jmap/src/test/scala/org/apache/james/jmap/memory/identity/MemoryCustomIdentityTest.scala @@ -19,14 +19,20 @@ package org.apache.james.jmap.memory.identity -import org.apache.james.jmap.api.identity.CustomIdentityDAOContract -import org.junit.jupiter.api.BeforeEach +import org.apache.james.events.delivery.InVmEventDelivery +import org.apache.james.events.{EventBus, InVMEventBus, MemoryEventDeadLetters, RetryBackoffConfiguration} +import org.apache.james.jmap.api.identity.{CustomIdentityDAO, CustomIdentityDAOContract} +import org.apache.james.metrics.tests.RecordingMetricFactory class MemoryCustomIdentityTest extends CustomIdentityDAOContract { - var testee: MemoryCustomIdentityDAO = _ + val inVMEventBus: InVMEventBus = new InVMEventBus( + new InVmEventDelivery(new RecordingMetricFactory()), + RetryBackoffConfiguration.FAST, + new MemoryEventDeadLetters()) - @BeforeEach - def setUp(): Unit = { - testee = new MemoryCustomIdentityDAO() - } + val dao: MemoryCustomIdentityDAO = new MemoryCustomIdentityDAO(inVMEventBus) + + override def testee(): CustomIdentityDAO = dao + + override def eventBus: EventBus = inVMEventBus } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/IdentityEventsSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/IdentityEventsSerializer.scala index 0615e41033..927b4c248e 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/IdentityEventsSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/IdentityEventsSerializer.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty import org.apache.james.core.{MailAddress, Username} import org.apache.james.events.Event.EventId import org.apache.james.jmap.api.model.{EmailAddress, EmailerName, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} -import org.apache.james.jmap.mail.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDeleted, CustomIdentityUpdated} +import org.apache.james.jmap.api.identity.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDeleted, CustomIdentityUpdated} import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/IdentityEventsSerializerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/IdentityEventsSerializerTest.scala index 9f9a53fb1c..e4db4e743f 100644 --- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/IdentityEventsSerializerTest.scala +++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/IdentityEventsSerializerTest.scala @@ -26,7 +26,7 @@ import org.apache.james.core.{MailAddress, Username} import org.apache.james.events.Event.EventId import org.apache.james.jmap.api.model.{EmailAddress, EmailerName, HtmlSignature, Identity, IdentityId, IdentityName, MayDeleteIdentity, TextSignature} import org.apache.james.jmap.change.IdentityEventsSerializerTest._ -import org.apache.james.jmap.mail.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDeleted, CustomIdentityUpdated} +import org.apache.james.jmap.api.identity.{AllCustomIdentitiesDeleted, CustomIdentityCreated, CustomIdentityDeleted, CustomIdentityUpdated} import org.apache.james.json.JsonGenericSerializer import org.apache.james.json.JsonGenericSerializer.UnknownTypeException import org.assertj.core.api.Assertions.assertThatThrownBy @@ -63,7 +63,7 @@ object IdentityEventsSerializerTest { val CREATED_EVENT: CustomIdentityCreated = CustomIdentityCreated(EVENT_ID, USERNAME, IDENTITY) val CREATED_EVENT_JSON: String = """{ - | "type": "org.apache.james.jmap.mail.CustomIdentityCreated", + | "type": "org.apache.james.jmap.api.identity.CustomIdentityCreated", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identity": { @@ -82,7 +82,7 @@ object IdentityEventsSerializerTest { val CREATED_EVENT_NO_OPTIONAL: CustomIdentityCreated = CustomIdentityCreated(EVENT_ID, USERNAME, IDENTITY_NO_OPTIONAL) val CREATED_EVENT_NO_OPTIONAL_JSON: String = """{ - | "type": "org.apache.james.jmap.mail.CustomIdentityCreated", + | "type": "org.apache.james.jmap.api.identity.CustomIdentityCreated", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identity": { @@ -99,7 +99,7 @@ object IdentityEventsSerializerTest { val UPDATED_EVENT: CustomIdentityUpdated = CustomIdentityUpdated(EVENT_ID, USERNAME, IDENTITY) val UPDATED_EVENT_JSON: String = """{ - | "type": "org.apache.james.jmap.mail.CustomIdentityUpdated", + | "type": "org.apache.james.jmap.api.identity.CustomIdentityUpdated", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identity": { @@ -118,7 +118,7 @@ object IdentityEventsSerializerTest { val DELETED_EVENT: CustomIdentityDeleted = CustomIdentityDeleted(EVENT_ID, USERNAME, Set(IDENTITY_ID)) val DELETED_EVENT_JSON: String = """{ - | "type": "org.apache.james.jmap.mail.CustomIdentityDeleted", + | "type": "org.apache.james.jmap.api.identity.CustomIdentityDeleted", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identityIds": ["2c9f1b12-b35a-43e6-9af2-0106fb53a943"] @@ -127,7 +127,7 @@ object IdentityEventsSerializerTest { val ALL_DELETED_EVENT: AllCustomIdentitiesDeleted = AllCustomIdentitiesDeleted(EVENT_ID, USERNAME, Set(IDENTITY_ID)) val ALL_DELETED_EVENT_JSON: String = """{ - | "type": "org.apache.james.jmap.mail.AllCustomIdentitiesDeleted", + | "type": "org.apache.james.jmap.api.identity.AllCustomIdentitiesDeleted", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identityIds": ["2c9f1b12-b35a-43e6-9af2-0106fb53a943"] @@ -178,7 +178,7 @@ class IdentityEventsSerializerTest { .withoutNestedType() .deserialize( """{ - | "type": "org.apache.james.jmap.mail.UnknownEvent", + | "type": "org.apache.james.jmap.api.identity.UnknownEvent", | "eventId": "6e0dd59d-660e-4d9b-b22f-0354479f47b4", | "username": "bob", | "identityIds": [] diff --git a/server/protocols/webadmin/webadmin-jmap/pom.xml b/server/protocols/webadmin/webadmin-jmap/pom.xml index 6237479a2d..6003d730e2 100644 --- a/server/protocols/webadmin/webadmin-jmap/pom.xml +++ b/server/protocols/webadmin/webadmin-jmap/pom.xml @@ -51,6 +51,11 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>event-bus-in-vm</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>james-json</artifactId> diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java index 628c131eb7..2873767992 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/UserIdentitiesRoutesTest.java @@ -32,6 +32,10 @@ import java.util.UUID; import org.apache.james.core.MailAddress; import org.apache.james.core.Username; +import org.apache.james.events.InVMEventBus; +import org.apache.james.events.MemoryEventDeadLetters; +import org.apache.james.events.RetryBackoffConfiguration; +import org.apache.james.events.delivery.InVmEventDelivery; import org.apache.james.jmap.api.identity.DefaultIdentitySupplier; import org.apache.james.jmap.api.identity.IdentityCreationRequest; import org.apache.james.jmap.api.identity.IdentityRepository; @@ -40,6 +44,7 @@ import org.apache.james.jmap.api.model.EmailAddress; import org.apache.james.jmap.api.model.Identity; import org.apache.james.jmap.memory.identity.MemoryCustomIdentityDAO; import org.apache.james.json.DTOConverter; +import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.task.Hostname; import org.apache.james.task.MemoryTaskManager; import org.apache.james.webadmin.WebAdminServer; @@ -73,7 +78,8 @@ class UserIdentitiesRoutesTest { identityFactory = mock(DefaultIdentitySupplier.class); Mockito.when(identityFactory.userCanSendFrom(any(), any())).thenReturn(SMono.just(true).hasElement()); - identityRepository = new IdentityRepository(new MemoryCustomIdentityDAO(), identityFactory); + InVMEventBus eventBus = new InVMEventBus(new InVmEventDelivery(new RecordingMetricFactory()), RetryBackoffConfiguration.FAST, new MemoryEventDeadLetters()); + identityRepository = new IdentityRepository(new MemoryCustomIdentityDAO(eventBus), identityFactory); JsonTransformer jsonTransformer = new JsonTransformer(); TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(UploadCleanupTaskAdditionalInformationDTO.SERIALIZATION_MODULE)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
