This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7dcebd095bf65759bffb9c0d9a83893d6b007eee Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Mon Aug 16 14:20:33 2021 +0700 JAMES-3774 Migrate event-bus/cassandra to Cassandra driver 4 --- .../james/events/CassandraEventDeadLettersDAO.java | 67 +++++++++++----------- .../events/CassandraEventDeadLettersGroupDAO.java | 25 ++++---- .../events/CassandraEventDeadLettersModule.java | 20 +++---- 3 files changed, 59 insertions(+), 53 deletions(-) diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java index 096cce625b..9af07010c2 100644 --- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java +++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersDAO.java @@ -19,11 +19,10 @@ package org.apache.james.events; -import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; -import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; -import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static org.apache.james.events.tables.CassandraEventDeadLettersTable.EVENT; import static org.apache.james.events.tables.CassandraEventDeadLettersTable.GROUP; import static org.apache.james.events.tables.CassandraEventDeadLettersTable.INSERTION_ID; @@ -33,8 +32,8 @@ import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,7 +48,7 @@ public class CassandraEventDeadLettersDAO { private final PreparedStatement containEventsStatement; @Inject - CassandraEventDeadLettersDAO(Session session, EventSerializer eventSerializer) { + CassandraEventDeadLettersDAO(CqlSession session, EventSerializer eventSerializer) { this.executor = new CassandraAsyncExecutor(session); this.eventSerializer = eventSerializer; this.insertStatement = prepareInsertStatement(session); @@ -59,63 +58,67 @@ public class CassandraEventDeadLettersDAO { this.containEventsStatement = prepareContainEventStatement(session); } - private PreparedStatement prepareInsertStatement(Session session) { + private PreparedStatement prepareInsertStatement(CqlSession session) { return session.prepare(insertInto(TABLE_NAME) .value(GROUP, bindMarker(GROUP)) .value(INSERTION_ID, bindMarker(INSERTION_ID)) - .value(EVENT, bindMarker(EVENT))); + .value(EVENT, bindMarker(EVENT)) + .build()); } - private PreparedStatement prepareDeleteStatement(Session session) { - return session.prepare(delete() - .from(TABLE_NAME) - .where(eq(GROUP, bindMarker(GROUP))) - .and(eq(INSERTION_ID, bindMarker(INSERTION_ID)))); + private PreparedStatement prepareDeleteStatement(CqlSession session) { + return session.prepare(deleteFrom(TABLE_NAME) + .whereColumn(GROUP).isEqualTo(bindMarker(GROUP)) + .whereColumn(INSERTION_ID).isEqualTo(bindMarker(INSERTION_ID)) + .build()); } - private PreparedStatement prepareSelectEventStatement(Session session) { - return session.prepare(select(EVENT) - .from(TABLE_NAME) - .where(eq(GROUP, bindMarker(GROUP))) - .and(eq(INSERTION_ID, bindMarker(INSERTION_ID)))); + private PreparedStatement prepareSelectEventStatement(CqlSession session) { + return session.prepare(selectFrom(TABLE_NAME) + .column(EVENT) + .whereColumn(GROUP).isEqualTo(bindMarker(GROUP)) + .whereColumn(INSERTION_ID).isEqualTo(bindMarker(INSERTION_ID)) + .build()); } - private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) { - return session.prepare(select(INSERTION_ID) - .from(TABLE_NAME) - .where(eq(GROUP, bindMarker(GROUP)))); + private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(CqlSession session) { + return session.prepare(selectFrom(TABLE_NAME) + .column(INSERTION_ID) + .whereColumn(GROUP).isEqualTo(bindMarker(GROUP)) + .build()); } - private PreparedStatement prepareContainEventStatement(Session session) { - return session.prepare(select(EVENT) - .from(TABLE_NAME) - .limit(1)); + private PreparedStatement prepareContainEventStatement(CqlSession session) { + return session.prepare(selectFrom(TABLE_NAME) + .column(EVENT) + .limit(1) + .build()); } Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) { return executor.executeVoid(insertStatement.bind() .setString(GROUP, group.asString()) - .setUUID(INSERTION_ID, insertionId.getId()) + .setUuid(INSERTION_ID, insertionId.getId()) .setString(EVENT, eventSerializer.toJson(failedEvent))); } Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) { return executor.executeVoid(deleteStatement.bind() .setString(GROUP, group.asString()) - .setUUID(INSERTION_ID, failedInsertionId.getId())); + .setUuid(INSERTION_ID, failedInsertionId.getId())); } Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) { return executor.executeSingleRow(selectEventStatement.bind() .setString(GROUP, group.asString()) - .setUUID(INSERTION_ID, insertionId.getId())) + .setUuid(INSERTION_ID, insertionId.getId())) .map(row -> deserializeEvent(row.getString(EVENT))); } Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) { return executor.executeRows(selectEventIdsWithGroupStatement.bind() .setString(GROUP, group.asString())) - .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID))); + .map(row -> EventDeadLetters.InsertionId.of(row.getUuid(INSERTION_ID))); } Mono<Boolean> containEvents() { diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java index bd0c27e195..80747f7dd0 100644 --- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java +++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersGroupDAO.java @@ -19,7 +19,9 @@ package org.apache.james.events; -import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.GROUP; import static org.apache.james.events.tables.CassandraEventDeadLettersGroupTable.TABLE_NAME; @@ -27,9 +29,8 @@ import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; @@ -41,20 +42,22 @@ public class CassandraEventDeadLettersGroupDAO { private final PreparedStatement selectAllStatement; @Inject - CassandraEventDeadLettersGroupDAO(Session session) { + CassandraEventDeadLettersGroupDAO(CqlSession session) { this.executor = new CassandraAsyncExecutor(session); this.insertStatement = prepareInsertStatement(session); this.selectAllStatement = prepareSelectStatement(session); } - private PreparedStatement prepareInsertStatement(Session session) { - return session.prepare(QueryBuilder.insertInto(TABLE_NAME) - .value(GROUP, bindMarker(GROUP))); + private PreparedStatement prepareInsertStatement(CqlSession session) { + return session.prepare(insertInto(TABLE_NAME) + .value(GROUP, bindMarker(GROUP)) + .build()); } - private PreparedStatement prepareSelectStatement(Session session) { - return session.prepare(QueryBuilder.select(GROUP) - .from(TABLE_NAME)); + private PreparedStatement prepareSelectStatement(CqlSession session) { + return session.prepare(selectFrom(TABLE_NAME) + .column(GROUP) + .build()); } Mono<Void> storeGroup(Group group) { diff --git a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersModule.java b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersModule.java index 53053e8eb1..4a5becd5f4 100644 --- a/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersModule.java +++ b/event-bus/cassandra/src/main/java/org/apache/james/events/CassandraEventDeadLettersModule.java @@ -19,28 +19,28 @@ package org.apache.james.events; +import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.RowsPerPartition.rows; + import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.utils.CassandraConstants; import org.apache.james.events.tables.CassandraEventDeadLettersGroupTable; import org.apache.james.events.tables.CassandraEventDeadLettersTable; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.datastax.oss.driver.api.core.type.DataTypes; public interface CassandraEventDeadLettersModule { CassandraModule MODULE = CassandraModule.builder() .table(CassandraEventDeadLettersTable.TABLE_NAME) .comment("Holds event dead letter") .options(options -> options - .caching(SchemaBuilder.KeyCaching.ALL, - SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) - .statement(statement -> statement - .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) - .addClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataType.uuid()) - .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) + .withCaching(true, rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) + .statement(statement -> types -> statement + .withPartitionKey(CassandraEventDeadLettersTable.GROUP, DataTypes.TEXT) + .withClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataTypes.UUID) + .withColumn(CassandraEventDeadLettersTable.EVENT, DataTypes.TEXT)) .table(CassandraEventDeadLettersGroupTable.TABLE_NAME) .comment("Projection table for retrieving groups for all failed events") - .statement(statement -> statement - .addPartitionKey(CassandraEventDeadLettersGroupTable.GROUP, DataType.text())) + .statement(statement -> types -> statement + .withPartitionKey(CassandraEventDeadLettersGroupTable.GROUP, DataTypes.TEXT)) .build(); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org