This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3037a2b2c348dbfafdd49f47566c654970acc13d Author: Tung Tran <[email protected]> AuthorDate: Thu Apr 6 08:08:21 2023 +0700 AnnotationMapper support more reactive api & Cassandra implement --- .../cassandra/mail/CassandraAnnotationMapper.java | 156 +++++++++++++-------- .../james/mailbox/store/mail/AnnotationMapper.java | 36 +++++ 2 files changed, 132 insertions(+), 60 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java index cce1d2b520..179cc060ac 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java @@ -26,11 +26,11 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column; import java.util.List; -import java.util.Optional; import java.util.Set; import javax.inject.Inject; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.table.CassandraAnnotationTable; import org.apache.james.mailbox.model.MailboxAnnotation; @@ -46,21 +46,25 @@ import com.datastax.oss.driver.api.querybuilder.select.Select; import com.google.common.base.Ascii; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class CassandraAnnotationMapper extends NonTransactionalMapper implements AnnotationMapper { - private final CqlSession session; + private final CassandraAsyncExecutor asyncExecutor; private final PreparedStatement delete; private final PreparedStatement insert; + private final PreparedStatement getStoredAnnotationsQuery; + private final PreparedStatement countStoredAnnotationsQuery; private final PreparedStatement getStoredAnnotationsQueryForKeys; private final PreparedStatement getStoredAnnotationsQueryLikeKey; private final PreparedStatement getStoredAnnotationsQueryByKey; @Inject public CassandraAnnotationMapper(CqlSession session) { - this.session = session; + this.asyncExecutor = new CassandraAsyncExecutor(session); this.delete = session.prepare(deleteFrom(CassandraAnnotationTable.TABLE_NAME) .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)), column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY))) @@ -72,63 +76,109 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements .value(CassandraAnnotationTable.VALUE, bindMarker(CassandraAnnotationTable.VALUE)) .build()); - this.getStoredAnnotationsQueryForKeys = getStoredAnnotationsQueryForKeys(); - this.getStoredAnnotationsQueryLikeKey = getStoredAnnotationsQueryLikeKey(); - this.getStoredAnnotationsQueryByKey = getStoredAnnotationsQueryByKey(); + this.getStoredAnnotationsQuery = session.prepare(getStoredAnnotationsQuery().build()); + + this.countStoredAnnotationsQuery = session.prepare(selectFrom(CassandraAnnotationTable.TABLE_NAME) + .countAll() + .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID))) + .build()); + + this.getStoredAnnotationsQueryForKeys = + session.prepare(getStoredAnnotationsQuery() + .where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY))) + .build()); + + this.getStoredAnnotationsQueryLikeKey = + session.prepare(getStoredAnnotationsQuery() + .where(column(CassandraAnnotationTable.KEY) + .isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)), + column(CassandraAnnotationTable.KEY) + .isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY))) + .build()); + + this.getStoredAnnotationsQueryByKey = + session.prepare(getStoredAnnotationsQuery() + .where(column(CassandraAnnotationTable.KEY) + .isEqualTo(bindMarker(CassandraAnnotationTable.KEY))) + .build()); } @Override public List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId) { - CassandraId cassandraId = (CassandraId) mailboxId; - return Flux.from(session.executeReactive(session.prepare(getStoredAnnotationsQuery().build()).bind() - .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid()))) - .map(this::toAnnotation) + return getAllAnnotationsReactive(mailboxId) .collectList() .block(); } + @Override + public Flux<MailboxAnnotation> getAllAnnotationsReactive(MailboxId mailboxId) { + return asyncExecutor.executeRows(getStoredAnnotationsQuery.bind() + .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())) + .map(this::toAnnotation); + } + @Override public List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { - CassandraId cassandraId = (CassandraId) mailboxId; - return Flux.from(session.executeReactive(getStoredAnnotationsQueryForKeys.bind() - .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid()) - .setList(CassandraAnnotationTable.KEY, keys.stream() - .map(MailboxAnnotationKey::asString) - .collect(ImmutableList.toImmutableList()), String.class))) - .map(this::toAnnotation) + return getAnnotationsByKeysReactive(mailboxId, keys) .collectList() .block(); } + @Override + public Flux<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return asyncExecutor.executeRows(getStoredAnnotationsQueryForKeys.bind() + .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()) + .setList(CassandraAnnotationTable.KEY, keys.stream() + .map(MailboxAnnotationKey::asString) + .collect(ImmutableList.toImmutableList()), String.class)) + .map(this::toAnnotation); + } + @Override public List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { - CassandraId cassandraId = (CassandraId) mailboxId; - return Flux.fromIterable(keys) - .flatMap(annotation -> getAnnotationsByKeyWithOneDepth(cassandraId, annotation)) - .collectList() + return getAnnotationsByKeysWithOneDepthReactive(mailboxId, keys).collectList() .block(); } @Override - public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { - CassandraId cassandraId = (CassandraId) mailboxId; + public Flux<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { return Flux.fromIterable(keys) - .flatMap(annotation -> getAnnotationsByKeyWithAllDepth(cassandraId, annotation)) - .collectList() + .flatMap(annotation -> getAnnotationsByKeyWithOneDepth((CassandraId) mailboxId, annotation)); + } + + @Override + public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return getAnnotationsByKeysWithAllDepthReactive(mailboxId, keys).collectList() .block(); } + @Override + public Flux<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return Flux.fromIterable(keys) + .flatMap(annotation -> getAnnotationsByKeyWithAllDepth((CassandraId) mailboxId, annotation)); + } + @Override public void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key) { - session.execute(delete.bind() + deleteAnnotationReactive(mailboxId, key).block(); + } + + @Override + public Mono<Void> deleteAnnotationReactive(MailboxId mailboxId, MailboxAnnotationKey key) { + return asyncExecutor.executeVoid(delete.bind() .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()) .setString(CassandraAnnotationTable.KEY, key.asString())); } @Override public void insertAnnotation(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { + insertAnnotationReactive(mailboxId, mailboxAnnotation).block(); + } + + @Override + public Mono<Void> insertAnnotationReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { Preconditions.checkArgument(!mailboxAnnotation.isNil()); - session.execute(insert.bind() + return asyncExecutor.executeVoid(insert.bind() .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()) .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString()) .setString(CassandraAnnotationTable.VALUE, mailboxAnnotation.getValue().get())); @@ -136,20 +186,25 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements @Override public boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { - CassandraId cassandraId = (CassandraId) mailboxId; - Optional<Row> row = Optional.ofNullable( - session.execute(getStoredAnnotationsQueryByKey.bind() - .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid()) - .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString())) - .one()); - return row.isPresent(); + return existReactive(mailboxId, mailboxAnnotation).block(); + } + + @Override + public Mono<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { + return asyncExecutor.executeReturnExists(getStoredAnnotationsQueryByKey.bind() + .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()) + .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString())); } @Override public int countAnnotations(MailboxId mailboxId) { - CassandraId cassandraId = (CassandraId) mailboxId; - return session.execute(session.prepare(getStoredAnnotationsQuery().build()).bind() - .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid())).getAvailableWithoutFetching(); + return countAnnotationsReactive(mailboxId).block(); + } + + public Mono<Integer> countAnnotationsReactive(MailboxId mailboxId) { + return asyncExecutor.executeSingleRow(countStoredAnnotationsQuery.bind() + .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())) + .map(row -> Ints.checkedCast(row.getLong(0))); } private MailboxAnnotation toAnnotation(Row row) { @@ -163,43 +218,24 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID))); } - private PreparedStatement getStoredAnnotationsQueryForKeys() { - return session.prepare(getStoredAnnotationsQuery() - .where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY))) - .build()); - } - - private PreparedStatement getStoredAnnotationsQueryLikeKey() { - return session.prepare(getStoredAnnotationsQuery() - .where(column(CassandraAnnotationTable.KEY).isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)), - column(CassandraAnnotationTable.KEY).isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY))) - .build()); - } - - private PreparedStatement getStoredAnnotationsQueryByKey() { - return session.prepare(getStoredAnnotationsQuery() - .where(column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY))) - .build()); - } - private String buildNextKey(String key) { return key + MailboxAnnotationKey.SLASH_CHARACTER + Ascii.MAX; } private Flux<MailboxAnnotation> getAnnotationsByKeyWithAllDepth(CassandraId mailboxId, MailboxAnnotationKey key) { - return Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind() + return asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind() .setUuid(CassandraAnnotationTable.MAILBOX_ID, mailboxId.asUuid()) .setString(CassandraAnnotationTable.GREATER_BIND_KEY, key.asString()) - .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString())))) + .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString()))) .map(this::toAnnotation) .filter(annotation -> key.isAncestorOrIsEqual(annotation.getKey())); } private Flux<MailboxAnnotation> getAnnotationsByKeyWithOneDepth(CassandraId mailboxId, MailboxAnnotationKey key) { - return Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind() + return asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind() .setUuid(CassandraAnnotationTable.MAILBOX_ID, mailboxId.asUuid()) .setString(CassandraAnnotationTable.GREATER_BIND_KEY, key.asString()) - .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString())))) + .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString()))) .map(this::toAnnotation) .filter(annotation -> key.isParentOrIsEqual(annotation.getKey())); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java index ec775de6c6..da3146fcc9 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java @@ -26,6 +26,10 @@ import org.apache.james.mailbox.model.MailboxAnnotation; import org.apache.james.mailbox.model.MailboxAnnotationKey; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.store.transaction.Mapper; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public interface AnnotationMapper extends Mapper { /** @@ -37,6 +41,10 @@ public interface AnnotationMapper extends Mapper { */ List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId); + default Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxId mailboxId) { + return Flux.fromIterable(getAllAnnotations(mailboxId)); + } + /** * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys. The result is not ordered and should not * contain duplicate by key @@ -47,6 +55,10 @@ public interface AnnotationMapper extends Mapper { */ List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys); + default Publisher<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return Flux.fromIterable(getAnnotationsByKeys(mailboxId, keys)); + } + /** * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys as well as its children entries * The result is not ordered and should not contain duplicate by key @@ -57,6 +69,10 @@ public interface AnnotationMapper extends Mapper { */ List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys); + default Publisher<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return Flux.fromIterable(getAnnotationsByKeysWithOneDepth(mailboxId, keys)); + } + /** * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys and entries below the keys * The result is not ordered and should not contain duplicate by key @@ -67,6 +83,10 @@ public interface AnnotationMapper extends Mapper { */ List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys); + default Publisher<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) { + return Flux.fromIterable(getAnnotationsByKeysWithAllDepth(mailboxId, keys)); + } + /** * Delete the annotation of selected mailbox by its key. * @@ -75,6 +95,10 @@ public interface AnnotationMapper extends Mapper { */ void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key); + default Publisher<Void> deleteAnnotationReactive(MailboxId mailboxId, MailboxAnnotationKey key) { + return Mono.fromRunnable(() -> deleteAnnotation(mailboxId, key)); + } + /** * - Insert new annotation if it does not exist on store * - Update the new value for existed annotation @@ -84,6 +108,10 @@ public interface AnnotationMapper extends Mapper { */ void insertAnnotation(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation); + default Publisher<Void> insertAnnotationReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { + return Mono.fromRunnable(() -> insertAnnotation(mailboxId, mailboxAnnotation)); + } + /** * Checking the current annotation of selected mailbox exists on store or not. It's checked by annotation key, not by its value. * @@ -93,9 +121,17 @@ public interface AnnotationMapper extends Mapper { */ boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation); + default Publisher<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) { + return Mono.fromCallable(() -> exist(mailboxId, mailboxAnnotation)); + } + /** * Getting total number of current annotation on mailbox * */ int countAnnotations(MailboxId mailboxId); + + default Publisher<Integer> countAnnotationsReactive(MailboxId mailboxId) { + return Mono.fromCallable(() -> countAnnotations(mailboxId)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
