This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 63d865dda3819f3a608beeca87557a8686219ac9 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Dec 2 12:54:47 2022 +0700 [PERF] Improve Cassandra rows interpretation in mailbox/cassandra --- .../cassandra/mail/CassandraMessageIdDAO.java | 23 +++++++++++++--------- .../mailbox/cassandra/mail/FlagsExtractor.java | 8 +++++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java index 8e4d26823a..edb99eb80c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java @@ -70,6 +70,7 @@ import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.util.streams.Limit; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -109,10 +110,13 @@ public class CassandraMessageIdDAO { private final PreparedStatement selectUidRangeLimited; private final PreparedStatement update; private final PreparedStatement listStatement; + private final ProtocolVersion protocolVersion; @Inject public CassandraMessageIdDAO(CqlSession session, BlobId.Factory blobIdFactory) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.protocolVersion = session.getContext().getProtocolVersion(); + this.blobIdFactory = blobIdFactory; this.delete = prepareDelete(session); this.insert = prepareInsert(session); @@ -417,7 +421,7 @@ public class CassandraMessageIdDAO { public Flux<MessageUid> listUids(CassandraId mailboxId) { return cassandraAsyncExecutor.executeRows(selectAllUids.bind() .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)) - .map(row -> MessageUid.of(row.getLong(0))); + .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion))); } public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(CassandraId mailboxId, MessageRange range) { @@ -426,21 +430,21 @@ public class CassandraMessageIdDAO { .setLong(IMAP_UID_GTE, range.getUidFrom().asLong()) .setLong(IMAP_UID_LTE, range.getUidTo().asLong())) .map(row -> { - CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID)); + CassandraMessageId messageId = CassandraMessageId.Factory.of(row.get(MESSAGE_ID, TypeCodecs.TIMEUUID)); return ComposedMessageIdWithMetaData.builder() - .modSeq(ModSeq.of(row.getLong(MOD_SEQ))) + .modSeq(ModSeq.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(MOD_SEQ), protocolVersion))) .threadId(getThreadIdFromRow(row, messageId)) .flags(FlagsExtractor.getFlags(row)) .composedMessageId(new ComposedMessageId(mailboxId, messageId, - MessageUid.of(row.getLong(IMAP_UID)))) + MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(IMAP_UID), protocolVersion)))) .build(); }); } public Flux<MessageUid> listNotDeletedUids(CassandraId mailboxId, MessageRange range) { return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind() - .setUuid(MAILBOX_ID, mailboxId.asUuid()) + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID) .setLong(IMAP_UID_GTE, range.getUidFrom().asLong()) .setLong(IMAP_UID_LTE, range.getUidTo().asLong())) .filter(row -> !row.getBoolean(org.apache.james.mailbox.cassandra.table.Flag.DELETED)) @@ -449,10 +453,10 @@ public class CassandraMessageIdDAO { private Flux<MessageUid> doListUids(CassandraId mailboxId, MessageRange range) { return cassandraAsyncExecutor.executeRows(selectUidOnlyRange.bind() - .setUuid(MAILBOX_ID, mailboxId.asUuid()) + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID) .setLong(IMAP_UID_GTE, range.getUidFrom().asLong()) .setLong(IMAP_UID_LTE, range.getUidTo().asLong())) - .map(row -> MessageUid.of(row.getLong(IMAP_UID))); + .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion))); } public Flux<MessageUid> listUids(CassandraId mailboxId, MessageRange range) { @@ -517,7 +521,8 @@ public class CassandraMessageIdDAO { } private Optional<CassandraMessageMetadata> fromRowToComposedMessageIdWithFlags(Row row) { - if (row.getUuid(MESSAGE_ID) == null) { + UUID rowAsUuid = row.getUuid(MESSAGE_ID); + if (rowAsUuid == null) { // Out of order updates with concurrent deletes can result in the row being partially deleted // We filter out such records, and cleanup them. delete(CassandraId.of(row.getUuid(MAILBOX_ID)), @@ -526,7 +531,7 @@ public class CassandraMessageIdDAO { .subscribe(); return Optional.empty(); } - final CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID)); + CassandraMessageId messageId = CassandraMessageId.Factory.of(rowAsUuid); return Optional.of(CassandraMessageMetadata.builder() .ids(ComposedMessageIdWithMetaData.builder() .composedMessageId(new ComposedMessageId( diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java index e20871266e..10e04eb35a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java @@ -28,17 +28,23 @@ import javax.mail.Flags; import org.apache.james.mailbox.cassandra.table.Flag; import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; +import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; public class FlagsExtractor { public static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = CodecRegistry.DEFAULT.codecFor(setOf(TEXT)); public static Flags getFlags(Row row) { + return getFlags(row, row.protocolVersion()); + } + + private static Flags getFlags(Row row, ProtocolVersion protocolVersion) { Flags flags = new Flags(); for (CqlIdentifier cqlId : Flag.ALL_LOWERCASE) { - if (row.getBoolean(cqlId)) { + if (TypeCodecs.BOOLEAN.decodePrimitive(row.getBytesUnsafe(cqlId), protocolVersion)) { flags.add(Flag.JAVAX_MAIL_FLAG.get(cqlId)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org