This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5967f8fed6e22de9bd7df6b88498c5777098cbc3 Author: datph <[email protected]> AuthorDate: Fri Feb 22 09:41:46 2019 +0700 MAILBOX-380 Create CassandraEventDeadLettersModule With DAO and Table --- mailbox/event/event-cassandra/pom.xml | 51 ++++++++ .../events/CassandraEventDeadLettersDAO.java | 131 +++++++++++++++++++++ .../events/CassandraEventDeadLettersModule.java | 41 +++++++ .../tables/CassandraEventDeadLettersTable.java | 29 +++++ .../events/CassandraEventDeadLettersDAOTest.java | 124 +++++++++++++++++++ mailbox/pom.xml | 1 + 6 files changed, 377 insertions(+) diff --git a/mailbox/event/event-cassandra/pom.xml b/mailbox/event/event-cassandra/pom.xml new file mode 100644 index 0000000..1bc6222 --- /dev/null +++ b/mailbox/event/event-cassandra/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>apache-james-mailbox</artifactId> + <groupId>org.apache.james</groupId> + <version>3.4.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>apache-james-mailbox-event-cassandra</artifactId> + <name>Apache James :: Mailbox :: Event :: In Cassandra implementation</name> + <description>In Cassandra implementation for the eventDeadLetter API</description> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-backends-cassandra</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-api</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-event-json</artifactId> + </dependency> + </dependencies> + + +</project> \ No newline at end of file diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java new file mode 100644 index 0000000..0eab33b --- /dev/null +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java @@ -0,0 +1,131 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP; +import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.event.json.EventSerializer; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.github.fge.lambdas.Throwing; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class CassandraEventDeadLettersDAO { + private final CassandraAsyncExecutor executor; + private final EventSerializer eventSerializer; + private final PreparedStatement insertStatement; + private final PreparedStatement deleteStatement; + private final PreparedStatement selectAllGroupStatement; + private final PreparedStatement selectEventStatement; + private final PreparedStatement selectEventIdsWithGroupStatement; + + @Inject + CassandraEventDeadLettersDAO(Session session, EventSerializer eventSerializer) { + this.executor = new CassandraAsyncExecutor(session); + this.eventSerializer = eventSerializer; + this.insertStatement = prepareInsertStatement(session); + this.deleteStatement = prepareDeleteStatement(session); + this.selectAllGroupStatement = prepareSelectAllGroupStatement(session); + this.selectEventStatement = prepareSelectEventStatement(session); + this.selectEventIdsWithGroupStatement = prepareSelectEventIdsWithGroupStatement(session); + } + + private PreparedStatement prepareInsertStatement(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(GROUP, bindMarker(GROUP)) + .value(EVENT_ID, bindMarker(EVENT_ID)) + .value(EVENT, bindMarker(EVENT))); + } + + private PreparedStatement prepareDeleteStatement(Session session) { + return session.prepare(delete() + .from(TABLE_NAME) + .where(eq(GROUP, bindMarker(GROUP))) + .and(eq(EVENT_ID, bindMarker(EVENT_ID)))); + } + + private PreparedStatement prepareSelectAllGroupStatement(Session session) { + return session.prepare(select(GROUP) + .from(TABLE_NAME)); + } + + private PreparedStatement prepareSelectEventStatement(Session session) { + return session.prepare(select(EVENT) + .from(TABLE_NAME) + .where(eq(GROUP, bindMarker(GROUP))) + .and(eq(EVENT_ID, bindMarker(EVENT_ID)))); + } + + private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session session) { + return session.prepare(select(EVENT_ID) + .from(TABLE_NAME) + .where(eq(GROUP, bindMarker(GROUP)))); + } + + Mono<Void> store(Group group, Event failedEvent) { + return executor.executeVoid(insertStatement.bind() + .setString(GROUP, group.asString()) + .setUUID(EVENT_ID, failedEvent.getEventId().getId()) + .setString(EVENT, eventSerializer.toJson(failedEvent))); + } + + Mono<Void> removeEvent(Group group, Event.EventId failedEventId) { + return executor.executeVoid(deleteStatement.bind() + .setString(GROUP, group.asString()) + .setUUID(EVENT_ID, failedEventId.getId())); + } + + Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) { + return executor.executeSingleRow(selectEventStatement.bind() + .setString(GROUP, group.asString()) + .setUUID(EVENT_ID, failedEventId.getId())) + .map(row -> deserializeEvent(row.getString(EVENT))); + } + + Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) { + return executor.executeRows(selectEventIdsWithGroupStatement.bind() + .setString(GROUP, group.asString())) + .map(row -> Event.EventId.of(row.getUUID(EVENT_ID))); + } + + Flux<Group> retrieveAllGroups() { + return executor.executeRows(selectAllGroupStatement.bind()) + .map(Throwing.function(row -> Group.deserialize(row.getString(GROUP)))) + .distinct(); + } + + private Event deserializeEvent(String serializedEvent) { + return eventSerializer.fromJson(serializedEvent).get(); + } +} diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java new file mode 100644 index 0000000..f8b1e30 --- /dev/null +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersModule.java @@ -0,0 +1,41 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.utils.CassandraConstants; +import org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; + +public interface CassandraEventDeadLettersModule { + CassandraModule MODULE = CassandraModule.builder() + .table(CassandraEventDeadLettersTable.TABLE_NAME) + .comment("Holds event dead letter") + .options(options -> options + .caching(SchemaBuilder.KeyCaching.ALL, + SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) + .statement(statement -> statement + .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) + .addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid()) + .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) + .build(); +} diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java new file mode 100644 index 0000000..2418ffe --- /dev/null +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/tables/CassandraEventDeadLettersTable.java @@ -0,0 +1,29 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events.tables; + +public interface CassandraEventDeadLettersTable { + + String TABLE_NAME = "event_dead_letters"; + + String GROUP = "group"; + String EVENT_ID = "eventId"; + String EVENT = "event"; +} diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java new file mode 100644 index 0000000..7bc66a7 --- /dev/null +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAOTest.java @@ -0,0 +1,124 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1; +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2; +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3; +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1; +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2; +import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3; +import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A; +import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B; +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.event.json.EventSerializer; +import org.apache.james.mailbox.model.TestId; +import org.apache.james.mailbox.model.TestMessageId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class CassandraEventDeadLettersDAOTest { + + @RegisterExtension + static CassandraClusterExtension cassandraClusterExtension = new CassandraClusterExtension(CassandraEventDeadLettersModule.MODULE); + + private CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO; + + @BeforeEach + void setUp(CassandraCluster cassandraCluster) { + EventSerializer eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory()); + cassandraEventDeadLettersDAO = new CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer); + } + + @Test + void removeEventShouldSucceededWhenRemoveStoredEvent() { + cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); + + cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block(); + + assertThat(cassandraEventDeadLettersDAO + .retrieveAllGroups() + .collectList().block()) + .isEmpty(); + } + + @Test + void retrieveFailedEventShouldReturnEmptyWhenDefault() { + assertThat(cassandraEventDeadLettersDAO + .retrieveFailedEvent(GROUP_A, EVENT_ID_1) + .blockOptional().isPresent()) + .isFalse(); + } + + @Test + void retrieveFailedEventShouldReturnStoredEvent() { + cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); + + assertThat(cassandraEventDeadLettersDAO + .retrieveFailedEvent(GROUP_B, EVENT_ID_2) + .blockOptional().get()) + .isEqualTo(EVENT_2); + } + + @Test + void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() { + assertThat(cassandraEventDeadLettersDAO + .retrieveEventIdsWithGroup(GROUP_A) + .collectList().block()) + .isEmpty(); + } + + @Test + void retrieveEventIdsWithGroupShouldReturnStoredEventId() { + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block(); + + assertThat(cassandraEventDeadLettersDAO + .retrieveEventIdsWithGroup(GROUP_B) + .collectList().block()) + .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); + } + + @Test + void retrieveAllGroupsShouldReturnEmptyWhenDefault() { + assertThat(cassandraEventDeadLettersDAO + .retrieveAllGroups() + .collectList().block()) + .isEmpty(); + } + + @Test + void retrieveAllGroupsShouldReturnStoredGroups() { + cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); + cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block(); + + assertThat(cassandraEventDeadLettersDAO + .retrieveAllGroups() + .collectList().block()) + .containsOnly(GROUP_A, GROUP_B); + } +} diff --git a/mailbox/pom.xml b/mailbox/pom.xml index 08102cc..55fa3ab 100644 --- a/mailbox/pom.xml +++ b/mailbox/pom.xml @@ -41,6 +41,7 @@ <module>cassandra</module> <module>elasticsearch</module> + <module>event/event-cassandra</module> <module>event/event-memory</module> <module>event/event-rabbitmq</module> <module>event/json</module> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
