JAMES-2630 Migrate CassandraAsyncExecutor consumers to Reactor for event-sourcing-event-store-cassandra
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2133f0b1 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2133f0b1 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2133f0b1 Branch: refs/heads/master Commit: 2133f0b1cd4362fda2f9541ccc02d4703cc3ddec Parents: 9c96e60 Author: Gautier DI FOLCO <gdifo...@linagora.com> Authored: Wed Dec 12 14:28:21 2018 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Mon Jan 28 15:30:53 2019 +0100 ---------------------------------------------------------------------- .../eventstore/cassandra/CassandraEventStore.java | 2 +- .../eventsourcing/eventstore/cassandra/EventStoreDao.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java index 7804de1..79f6646 100644 --- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java @@ -51,7 +51,7 @@ public class CassandraEventStore implements EventStore { public void doAppendAll(List<Event> events) { Preconditions.checkArgument(Event.belongsToSameAggregate(events)); - boolean success = eventStoreDao.appendAll(events).join(); + boolean success = eventStoreDao.appendAll(events).block(); if (!success) { throw new EventStoreFailedException("Concurrent update to the EventStore detected"); } http://git-wip-us.apache.org/repos/asf/james-project/blob/2133f0b1/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java index feb6ef7..e1e55c7 100644 --- a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java @@ -30,7 +30,6 @@ import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEvent import java.io.IOException; import java.util.List; -import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -48,6 +47,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Mono; public class EventStoreDao { private final CassandraUtils cassandraUtils; @@ -79,10 +79,10 @@ public class EventStoreDao { .where(eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID)))); } - public CompletableFuture<Boolean> appendAll(List<Event> events) { + public Mono<Boolean> appendAll(List<Event> events) { BatchStatement batch = new BatchStatement(); events.forEach(event -> batch.add(insertEvent(event))); - return cassandraAsyncExecutor.executeReturnApplied(batch); + return cassandraAsyncExecutor.executeReturnAppliedReactor(batch); } private BoundStatement insertEvent(Event event) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org