This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a133d0771ab48f36171294a8640877790fc00969 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Apr 21 14:39:13 2023 +0700 JAMES-3777 Populate filtering projection upon Incremental change --- .../filtering/CassandraFilteringProjection.java | 59 ++++++++++++++-------- .../impl/EventSourcingFilteringManagement.java | 8 +-- .../api/filtering/impl/FilteringAggregateId.java | 4 ++ .../data/jmap/PopulateFilteringProjectionTask.java | 2 +- ...pulateFilteringProjectionRequestToTaskTest.java | 2 +- 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java index 2841a99878..486dddf655 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java @@ -10,17 +10,22 @@ import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjec import java.util.List; import java.util.Optional; +import java.util.function.Function; import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.Username; +import org.apache.james.eventsourcing.AggregateId; import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; import org.apache.james.eventsourcing.ReactiveSubscriber; +import org.apache.james.jmap.api.filtering.Rule; import org.apache.james.jmap.api.filtering.Rules; import org.apache.james.jmap.api.filtering.Version; import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement; import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId; +import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange; import org.apache.james.jmap.api.filtering.impl.RuleSetDefined; import org.reactivestreams.Publisher; @@ -30,10 +35,11 @@ import com.datastax.oss.driver.api.core.cql.Row; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Mono; -public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber { +public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection { private final CassandraAsyncExecutor executor; private final PreparedStatement insertStatement; @@ -82,27 +88,40 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag } @Override - public Publisher<Void> handleReactive(Event event) { - if (event instanceof RuleSetDefined) { - return persist((RuleSetDefined) event); - } - throw new RuntimeException("Unsupported event"); - } + public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) { + return Optional.of(new ReactiveSubscriber() { + @Override + public Publisher<Void> handleReactive(Event event) { + if (event instanceof RuleSetDefined) { + return persist((RuleSetDefined) event); + } + if (event instanceof IncrementalRuleChange) { + return persist((IncrementalRuleChange) event); + } + throw new RuntimeException("Unsupported event"); + } - @Override - public Optional<ReactiveSubscriber> subscriber() { - return Optional.of(this); - } + private Mono<Void> persist(RuleSetDefined ruleSetDefined) { + return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules()); + } - private Mono<Void> persist(RuleSetDefined ruleSetDefined) { - try { - return executor.executeVoid(insertStatement.bind() - .setString(AGGREGATE_ID, ruleSetDefined.getAggregateId().asAggregateKey()) - .setInt(EVENT_ID, ruleSetDefined.eventId().value()) - .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(ruleSetDefined.getRules())))); - } catch (JsonProcessingException e) { - return Mono.error(e); - } + private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) { + try { + return executor.executeVoid(insertStatement.bind() + .setString(AGGREGATE_ID, aggregateId.asAggregateKey()) + .setInt(EVENT_ID, eventId.value()) + .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules)))); + } catch (JsonProcessingException e) { + return Mono.error(e); + } + } + + private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) { + FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId(); + return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername())) + .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules()))); + } + }); } private Version parseVersion(Row row) { diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java index 402c532c6c..08ac0d372f 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java @@ -22,6 +22,7 @@ package org.apache.james.jmap.api.filtering.impl; import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.function.Function; import javax.inject.Inject; @@ -50,7 +51,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement { Publisher<Version> getLatestVersion(Username username); - Optional<ReactiveSubscriber> subscriber(); + Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader); } public static class NoReadProjection implements ReadProjection { @@ -85,7 +86,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement { } @Override - public Optional<ReactiveSubscriber> subscriber() { + public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) { return Optional.empty(); } } @@ -104,7 +105,8 @@ public class EventSourcingFilteringManagement implements FilteringManagement { this.readProjection = new NoReadProjection(eventStore); this.eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(new DefineRulesCommandHandler(eventStore)), - readProjection.subscriber().map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER), + readProjection.subscriber(aggregateId -> new NoReadProjection(eventStore).listRulesForUser(aggregateId)) + .map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER), eventStore); } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java index 3d20912b20..a43c10ecfa 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java @@ -49,6 +49,10 @@ public class FilteringAggregateId implements AggregateId { return PREFIX + SEPARATOR + username.asString(); } + public Username getUsername() { + return username; + } + @Override public final boolean equals(Object o) { if (o instanceof FilteringAggregateId) { diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java index fb408500fe..3d0bc6d2c5 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java +++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java @@ -133,7 +133,7 @@ public class PopulateFilteringProjectionTask implements Task { .concatMap(user -> Mono.from(noReadProjection.listRulesForUser(user)) .flatMap(rules -> rules.getVersion().asEventId() - .flatMap(eventId -> readProjection.subscriber() + .flatMap(eventId -> readProjection.subscriber(any -> Mono.empty()) .map(s -> Mono.from(s.handleReactive(asEvent(user, rules, eventId))))) .orElse(Mono.empty())) .thenReturn(Result.COMPLETED) diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java index 58b5fc547d..337d63f965 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java +++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java @@ -225,7 +225,7 @@ class PopulateFilteringProjectionRequestToTaskTest { Mockito.when(noReadProjection.listRulesForUser(any())) .thenReturn(Mono.just(new Rules(ImmutableList.of(rule), new Version(4)))); ReactiveSubscriber subscriber = mock(ReactiveSubscriber.class); - Mockito.when(readProjection.subscriber()).thenReturn(Optional.of(subscriber)); + Mockito.when(readProjection.subscriber(any())).thenReturn(Optional.of(subscriber)); Mockito.when(subscriber.handleReactive(any())).thenReturn(Mono.empty()); String taskId = with() --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org