This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 812875de7124ef524f7c8e19c0a0c40883244e44 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed Apr 19 14:15:57 2023 +0700 JAMES-3777 Snapshots for Filter event sourcing Snapshots are stored as events. The offset of the event is stored in a static column. Non-invasive and easy to use. --- .../org/apache/james/eventsourcing/Event.scala | 2 + .../eventstore/cassandra/CassandraEventStore.scala | 17 +++--- .../cassandra/CassandraEventStoreModule.scala | 1 + .../cassandra/CassandraEventStoreTable.scala | 1 + .../eventstore/cassandra/EventStoreDao.scala | 71 ++++++++++++++++++---- ...CassandraEventStoreExtensionForTestEvents.scala | 2 +- .../cassandra/CassandraEventStoreTest.scala | 37 ++++++++++- .../SnapshotEvent.scala} | 21 ++++--- .../{TestEventDTO.scala => SnapshotEventDTO.scala} | 15 +++-- .../eventstore/cassandra/dto/TestEventDTO.scala | 7 +-- .../cassandra/dto/TestEventDTOModules.scala | 12 ++++ .../sample-configuration/jvm.properties | 5 +- .../sample-configuration/jvm.properties | 3 +- .../api/filtering/impl/FilteringAggregate.java | 16 ++++- .../jmap/api/filtering/impl/RuleSetDefined.java | 5 ++ 15 files changed, 165 insertions(+), 50 deletions(-) diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala index 5440ab03a2..4fbb7f8215 100644 --- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala +++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala @@ -33,5 +33,7 @@ trait Event extends Comparable[Event] { def getAggregateId: AggregateId + def isASnapshot: Boolean = false + override def compareTo(o: Event): Int = eventId.compareTo(o.eventId) } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala index bdd3f713f0..f05d54b2d3 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala @@ -20,25 +20,23 @@ package org.apache.james.eventsourcing.eventstore.cassandra import com.google.common.base.Preconditions import javax.inject.Inject - import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException, History} -import org.apache.james.eventsourcing.{AggregateId, Event} +import org.apache.james.eventsourcing.{AggregateId, Event, EventId} import org.reactivestreams.Publisher - import reactor.core.scala.publisher.SMono class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends EventStore { - override def appendAll(events: Iterable[Event]): Publisher[Void] = { + override def appendAll(events: Iterable[Event]): Publisher[Void] = if (events.nonEmpty) { doAppendAll(events) } else { SMono.empty } - } private def doAppendAll(events: Iterable[Event]): SMono[Void] = { Preconditions.checkArgument(Event.belongsToSameAggregate(events)) - eventStoreDao.appendAll(events) + val snapshotId = events.filter(_.isASnapshot).map(_.eventId).headOption + eventStoreDao.appendAll(events, snapshotId) .filter(success => success) .single() .onErrorMap({ @@ -48,9 +46,10 @@ class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends Event .`then`(SMono.empty) } - override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = { - eventStoreDao.getEventsOfAggregate(aggregateId) - } + override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = + eventStoreDao.getSnapshot(aggregateId) + .flatMap(snapshotId => eventStoreDao.getEventsOfAggregate(aggregateId, snapshotId)) + .switchIfEmpty(eventStoreDao.getEventsOfAggregate(aggregateId)) override def remove(aggregateId: AggregateId): Publisher[Void] = eventStoreDao.delete(aggregateId) } diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala index c92e350ba6..ad2a7eb2b5 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala @@ -34,6 +34,7 @@ object CassandraEventStoreModule { SchemaBuilder.RowsPerPartition.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) .statement(statement => _ => statement.withPartitionKey(CassandraEventStoreTable.AGGREGATE_ID, DataTypes.TEXT) .withClusteringColumn(CassandraEventStoreTable.EVENT_ID, DataTypes.INT) + .withStaticColumn(CassandraEventStoreTable.SNAPSHOT, DataTypes.INT) .withColumn(CassandraEventStoreTable.EVENT, DataTypes.TEXT)) .build } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala index 57c6754976..f60fce1143 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala @@ -25,4 +25,5 @@ object CassandraEventStoreTable { val AGGREGATE_ID = CqlIdentifier.fromCql("aggregateId") val EVENT = CqlIdentifier.fromCql("event") val EVENT_ID = CqlIdentifier.fromCql("eventId") + val SNAPSHOT = CqlIdentifier.fromCql("snapshot") } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala index b83bb626b2..39abbbac15 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala @@ -23,21 +23,25 @@ import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.`type`.codec.TypeCodecs import com.datastax.oss.driver.api.core.cql.{BatchStatementBuilder, BatchType, BoundStatement, PreparedStatement, Row, Statement} import com.datastax.oss.driver.api.querybuilder.QueryBuilder -import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto} +import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto, update} import javax.inject.Inject import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.eventsourcing.eventstore.History -import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID} -import org.apache.james.eventsourcing.{AggregateId, Event} +import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID, SNAPSHOT} +import org.apache.james.eventsourcing.{AggregateId, Event, EventId} import org.apache.james.util.ReactorUtils +import org.reactivestreams.Publisher import reactor.core.scala.publisher.{SFlux, SMono} class EventStoreDao @Inject() (val session: CqlSession, val jsonEventSerializer: JsonEventSerializer) { private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session) private val insert = prepareInsert(session) + private val insertSnapshot = prepareInsertSnapshot(session) private val select = prepareSelect(session) + private val selectFrom = prepareSelectFrom(session) + private val selectSnapshot = prepareSelectSnapshot(session) private val deleteByAggregateId = prepareDelete(session) private val executionProfile = JamesExecutionProfiles.getLWTProfile(session) @@ -50,6 +54,13 @@ class EventStoreDao @Inject() (val session: CqlSession, .ifNotExists .build()) + private def prepareInsertSnapshot(session: CqlSession): PreparedStatement = + session.prepare( + update(EVENTS_TABLE) + .setColumn(SNAPSHOT, bindMarker(SNAPSHOT)) + .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID)) + .build()) + private def prepareSelect(session: CqlSession): PreparedStatement = session.prepare(QueryBuilder .selectFrom(EVENTS_TABLE) @@ -57,14 +68,34 @@ class EventStoreDao @Inject() (val session: CqlSession, .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID)) .build()) + private def prepareSelectSnapshot(session: CqlSession): PreparedStatement = + session.prepare(QueryBuilder + .selectFrom(EVENTS_TABLE) + .column(SNAPSHOT) + .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID)) + .build()) + + private def prepareSelectFrom(session: CqlSession): PreparedStatement = + session.prepare(QueryBuilder + .selectFrom(EVENTS_TABLE) + .column(EVENT) + .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID)) + .whereColumn(EVENT_ID).isGreaterThanOrEqualTo(bindMarker(EVENT_ID)) + .build()) + private def prepareDelete(session: CqlSession): PreparedStatement = session.prepare(QueryBuilder.deleteFrom(EVENTS_TABLE) .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID)) .build()) - private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] = + private[cassandra] def appendAll(events: Iterable[Event], lastSnapShot: Option[EventId]): SMono[Boolean] = SMono(cassandraAsyncExecutor.executeReturnApplied(appendQuery(events)) .map(_.booleanValue())) + .flatMap((success: Boolean) => lastSnapShot + .filter(_ => success) + .map(id => SMono(cassandraAsyncExecutor.executeVoid(insertSnapshot(events.head.getAggregateId, id)))) + .getOrElse(SMono.empty) + .`then`(SMono.just(success))) private def appendQuery(events: Iterable[Event]): Statement[_] = if (events.size == 1) @@ -82,18 +113,34 @@ class EventStoreDao @Inject() (val session: CqlSession, .setInt(EVENT_ID, event.eventId.serialize) .setString(EVENT, jsonEventSerializer.serialize(event)) - private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = { - val preparedStatement = select.bind() + private def insertSnapshot(aggregateId: AggregateId, snapshotId: EventId): BoundStatement = + insertSnapshot + .bind() + .setString(AGGREGATE_ID, aggregateId.asAggregateKey) + .setInt(SNAPSHOT,snapshotId.serialize) + + private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = + asHistory(cassandraAsyncExecutor.executeRows(select.bind() + .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT) + .setExecutionProfile(executionProfile))) + + private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId, snapshotId: EventId): SMono[History] = + asHistory(cassandraAsyncExecutor.executeRows(selectFrom.bind() .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT) - .setExecutionProfile(executionProfile) - val rows: SFlux[Row] = SFlux[Row](cassandraAsyncExecutor.executeRows(preparedStatement)) + .setInt(EVENT_ID, snapshotId.value) + .setExecutionProfile(executionProfile))) - val events: SFlux[Event] = rows.concatMap(toEvent) - val listEvents: SMono[List[Event]] = events.collectSeq() + private def asHistory(rows: Publisher[Row]): SMono[History] = + SFlux[Row](rows) + .concatMap(toEvent) + .collectSeq() .map(_.toList) + .map(History.of(_)) - listEvents.map(History.of(_)) - } + private[cassandra] def getSnapshot(aggregateId: AggregateId): SMono[EventId] = + SMono(cassandraAsyncExecutor.executeSingleRow(selectSnapshot.bind() + .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT))) + .map(row => EventId.fromSerialized(row.get(0, TypeCodecs.INT))) def delete(aggregateId: AggregateId): SMono[Unit] = SMono(cassandraAsyncExecutor.executeVoid(deleteByAggregateId diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala index 9b1b7a6e32..c7002efd5b 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala @@ -21,4 +21,4 @@ package org.apache.james.eventsourcing.eventstore.cassandra import org.apache.james.eventsourcing.eventstore.cassandra.dto.TestEventDTOModules import org.junit.jupiter.api.extension.Extension -class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE).withoutNestedType) with Extension +class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE, TestEventDTOModules.SNAPSHOT_TYPE).withoutNestedType) with Extension diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala index 47c266ef4a..5d20fbeebe 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala @@ -18,8 +18,41 @@ ****************************************************************/ package org.apache.james.eventsourcing.eventstore.cassandra -import org.apache.james.eventsourcing.eventstore.EventStoreContract +import org.apache.james.eventsourcing.eventstore.cassandra.dto.SnapshotEvent +import org.apache.james.eventsourcing.{EventId, TestEvent} +import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreContract, History} +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import reactor.core.scala.publisher.SMono @ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents])) -class CassandraEventStoreTest extends EventStoreContract \ No newline at end of file +class CassandraEventStoreTest extends EventStoreContract { + @Test + def getEventsOfAggregateShouldResumeFromSnapshot(testee: EventStore) : Unit = { + val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") + val event2 = SnapshotEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second") + val event3 = TestEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third") + + SMono(testee.append(event1)).block() + SMono(testee.append(event2)).block() + SMono(testee.append(event3)).block() + + assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block()) + .isEqualTo(History.of(event2, event3)) + } + + @Test + def getEventsOfAggregateShouldResumeFromLatestSnapshot(testee: EventStore) : Unit = { + val event1 = SnapshotEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") + val event2 = TestEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second") + val event3 = SnapshotEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third") + + SMono(testee.append(event1)).block() + SMono(testee.append(event2)).block() + SMono(testee.append(event3)).block() + + assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block()) + .isEqualTo(History.of(event3)) + } +} \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala similarity index 63% copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala index 47c266ef4a..07f9cfa4bb 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala @@ -1,4 +1,4 @@ - /*************************************************************** +/**************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * * * - * http://www.apache.org/licenses/LICENSE-2.0 * + * http://www.apache.org/licenses/LICENSE-2.0 * * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * @@ -15,11 +15,16 @@ * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - ****************************************************************/ -package org.apache.james.eventsourcing.eventstore.cassandra + * ***************************************************************/ +package org.apache.james.eventsourcing.eventstore.cassandra.dto -import org.apache.james.eventsourcing.eventstore.EventStoreContract -import org.junit.jupiter.api.extension.ExtendWith +import org.apache.james.eventsourcing.{Event, EventId, TestAggregateId} -@ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents])) -class CassandraEventStoreTest extends EventStoreContract \ No newline at end of file +final case class SnapshotEvent(override val eventId: EventId, aggregateId: TestAggregateId, payload: String) extends Event { + + override def getAggregateId = aggregateId + + def getPayload = payload + + override val isASnapshot = true +} \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala similarity index 73% copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala index c3fa627064..dedec58367 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala @@ -21,18 +21,17 @@ package org.apache.james.eventsourcing.eventstore.cassandra.dto import com.fasterxml.jackson.annotation.{JsonCreator, JsonIgnore, JsonProperty} import org.apache.james.eventsourcing.{EventId, TestAggregateId, TestEvent} -final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: String, - @JsonProperty("data") data: String, - @JsonProperty("eventId") eventId: Int, - @JsonProperty("aggregate") aggregate: Int) extends EventDTO { - override def getType: String = { - `type` -} +final case class SnapshotEventDTO @JsonCreator()(@JsonProperty("type") `type`: String, + @JsonProperty("data") data: String, + @JsonProperty("eventId") eventId: Int, + @JsonProperty("aggregate") aggregate: Int) extends EventDTO { + override def getType: String = `type` + def getData: String = data def getEventId: Long = eventId def getAggregate: Int = aggregate - @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data) + @JsonIgnore def toEvent: SnapshotEvent = SnapshotEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data) } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala index c3fa627064..d676aa8295 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala @@ -25,14 +25,13 @@ final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: Str @JsonProperty("data") data: String, @JsonProperty("eventId") eventId: Int, @JsonProperty("aggregate") aggregate: Int) extends EventDTO { - override def getType: String = { - `type` -} + override val getType: String = `type` + def getData: String = data def getEventId: Long = eventId def getAggregate: Int = aggregate - @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data) + @JsonIgnore def toEvent: TestEvent = TestEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data) } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala index 2df32a0b57..90d5f27908 100644 --- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala +++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala @@ -43,4 +43,16 @@ object TestEventDTOModules { event.getAggregateId.getId)) .typeName("other-type") .withFactory(EventDTOModule.apply) + + val SNAPSHOT_TYPE: EventDTOModule[SnapshotEvent, SnapshotEventDTO] = EventDTOModule + .forEvent(classOf[SnapshotEvent]) + .convertToDTO(classOf[SnapshotEventDTO]) + .toDomainObjectConverter(_.toEvent) + .toDTOConverter((event: SnapshotEvent, typeName: String) => SnapshotEventDTO( + typeName, + event.getPayload, + event.eventId.serialize, + event.getAggregateId.getId)) + .typeName("snapshot-type") + .withFactory(EventDTOModule.apply) } \ No newline at end of file diff --git a/server/apps/cassandra-app/sample-configuration/jvm.properties b/server/apps/cassandra-app/sample-configuration/jvm.properties index 7055fc8731..c4cf7f2ef3 100644 --- a/server/apps/cassandra-app/sample-configuration/jvm.properties +++ b/server/apps/cassandra-app/sample-configuration/jvm.properties @@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false # Disabling JMAP filters event source increments is necessary during rolling adoption of this change. # Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly -# improving JMAP filter storage efficiency. -# james.jmap.filters.eventsource.increments.enabled=true \ No newline at end of file +# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events. +# james.jmap.filters.eventsource.increments.enabled=true +# james.jmap.filters.eventsource.snapshots.enabled=true \ No newline at end of file diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties b/server/apps/distributed-app/sample-configuration/jvm.properties index 2d5c47318e..1a462fc95d 100644 --- a/server/apps/distributed-app/sample-configuration/jvm.properties +++ b/server/apps/distributed-app/sample-configuration/jvm.properties @@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false # Disabling JMAP filters event source increments is necessary during rolling adoption of this change. # Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly -# improving JMAP filter storage efficiency. +# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events. # james.jmap.filters.eventsource.increments.enabled=true +# james.jmap.filters.eventsource.snapshots.enabled=true diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java index d5ce85418d..2828613713 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableList; public class FilteringAggregate { private static final boolean ENABLE_INCREMENTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.increments.enabled", "true")); + private static final boolean ENABLE_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.snapshots.enabled", "true")); public static FilteringAggregate load(FilteringAggregateId aggregateId, History eventsOfAggregate) { return new FilteringAggregate(aggregateId, eventsOfAggregate); @@ -77,15 +78,24 @@ public class FilteringAggregate { } private ImmutableList<Event> generateEvents(DefineRulesCommand storeCommand) { + EventId nextEventId = history.getNextEventId(); if (ENABLE_INCREMENTS) { - return IncrementalRuleChange.ofDiff(aggregateId, history.getNextEventId(), state.rules, storeCommand.getRules()) + // SNAPSHOT periodically + if (ENABLE_SNAPSHOTS && history.getEvents().size() >= 100) { + return resetRules(storeCommand, nextEventId); + } + return IncrementalRuleChange.ofDiff(aggregateId, nextEventId, state.rules, storeCommand.getRules()) .map(ImmutableList::<Event>of) - .orElseGet(() -> ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules())))); + .orElseGet(() -> resetRules(storeCommand, nextEventId)); } else { - return ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules()))); + return resetRules(storeCommand, nextEventId); } } + private ImmutableList<Event> resetRules(DefineRulesCommand storeCommand, EventId nextEventId) { + return ImmutableList.of(new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules()))); + } + private boolean shouldNotContainDuplicates(List<Rule> rules) { long uniqueIdCount = rules.stream() .map(Rule::getId) diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java index a60736910e..1aa513d0e8 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java @@ -55,6 +55,11 @@ public class RuleSetDefined implements Event { return rules; } + @Override + public boolean isASnapshot() { + return true; + } + @Override public final boolean equals(Object o) { if (this == o) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org