MAILBOX-265 Cassandra Message Mapper should take Headers and Metadata fetch Type into account
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7a86fe1a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7a86fe1a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7a86fe1a Branch: refs/heads/master Commit: 7a86fe1a7250fa05c658cf817190c2eff7e83c1f Parents: 6a90560 Author: Benoit Tellier <[email protected]> Authored: Thu Mar 3 11:52:26 2016 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Mar 4 19:35:24 2016 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 103 +++++++++++++------ .../cassandra/table/CassandraMessageTable.java | 3 + 2 files changed, 72 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/7a86fe1a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index e001839..a401b90 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -33,10 +33,12 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BOD import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FIELDS; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FULL_CONTENT_OCTETS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADERS; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADER_CONTENT; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.IMAP_UID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.INTERNAL_DATE; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.METADATA; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.MOD_SEQ; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.PROPERTIES; import static org.apache.james.mailbox.cassandra.table.CassandraMessageTable.TABLE_NAME; @@ -164,15 +166,15 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { @Override public Iterator<MailboxMessage<CassandraId>> findInMailbox(Mailbox<CassandraId> mailbox, MessageRange set, FetchType ftype, int max) throws MailboxException { - return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set))) - .map(this::message) + return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, ftype))) + .map(row -> message(row, ftype)) .sorted(Comparator.comparingLong(MailboxMessage::getUid)) .iterator(); } @Override public List<Long> findRecentMessageUidsInMailbox(Mailbox<CassandraId> mailbox) throws MailboxException { - return CassandraUtils.convertToStream(session.execute(selectAll(mailbox).and((eq(RECENT, true))))) + return CassandraUtils.convertToStream(session.execute(selectAll(mailbox, FetchType.Metadata).and((eq(RECENT, true))))) .map((row) -> row.getLong(IMAP_UID)) .sorted() .collect(Collectors.toList()); @@ -180,7 +182,7 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { @Override public Long findFirstUnseenMessageUid(Mailbox<CassandraId> mailbox) throws MailboxException { - return CassandraUtils.convertToStream(session.execute(selectAll(mailbox).and((eq(SEEN, false))))) + return CassandraUtils.convertToStream(session.execute(selectAll(mailbox, FetchType.Metadata).and((eq(SEEN, false))))) .map((row) -> row.getLong(IMAP_UID)) .sorted() .findFirst() @@ -189,8 +191,8 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { @Override public Map<Long, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox<CassandraId> mailbox, MessageRange set) throws MailboxException { - return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set).and(eq(DELETED, true)))) - .map(this::message) + return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, FetchType.Metadata).and(eq(DELETED, true)))) + .map(row -> message(row, FetchType.Metadata)) .peek((message) -> delete(mailbox, message)) .collect(Collectors.toMap(MailboxMessage::getUid, SimpleMessageMetaData::new)); } @@ -226,7 +228,7 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { @Override public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException { - return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set))) + return CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, FetchType.Metadata))) .map((row) -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, row)) .filter(Optional::isPresent) .map(Optional::get) @@ -271,13 +273,13 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailboxId.asUuid()))); } - private MailboxMessage<CassandraId> message(Row row) { + private MailboxMessage<CassandraId> message(Row row, FetchType fetchType) { SimpleMailboxMessage<CassandraId> message = new SimpleMailboxMessage<>( row.getDate(INTERNAL_DATE), row.getInt(FULL_CONTENT_OCTETS), row.getInt(BODY_START_OCTET), - new SharedByteArrayInputStream(getFullContent(row)), + buildContent(row, fetchType), getFlags(row), getPropertyBuilder(row), CassandraId.of(row.getUUID(MAILBOX_ID))); @@ -286,14 +288,6 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { return message; } - private byte[] getFullContent(Row row) { - byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()]; - byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()]; - row.getBytes(HEADER_CONTENT).get(headerContent); - row.getBytes(BODY_CONTENT).get(bodyContent); - return Bytes.concat(headerContent, bodyContent); - } - private Flags getFlags(Row row) { Flags flags = new Flags(); for (String flag : CassandraMessageTable.Flag.ALL) { @@ -365,7 +359,7 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { } private Optional<UpdatedFlags> updateFlagsOnMessage(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, Row row) { - return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message(row)) + return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message(row, FetchType.Metadata)) .map(Optional::of) .orElse(handleRetries(mailbox, flagUpdateCalculator, row.getLong(IMAP_UID))); } @@ -401,7 +395,11 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { } private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox<CassandraId> mailbox, long uid, FlagsUpdateCalculator flagUpdateCalculator) { - return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message(Optional.ofNullable(session.execute(selectMessage(mailbox, uid)).one()).orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(mailbox.getMailboxId(), uid)))); + return tryMessageFlagsUpdate(flagUpdateCalculator, + mailbox, + message(Optional.ofNullable(session.execute(selectMessage(mailbox, uid, FetchType.Metadata)).one()) + .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(mailbox.getMailboxId(), uid)), + FetchType.Metadata)); } private boolean conditionalSave(MailboxMessage<CassandraId> message, long oldModSeq) { @@ -426,47 +424,84 @@ public class CassandraMessageMapper implements MessageMapper<CassandraId> { return ByteBuffer.wrap(ByteStreams.toByteArray(stream)); } - private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) { - final MessageRange.Type type = set.getType(); - switch (type) { + private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set, FetchType fetchType) { + switch (set.getType()) { case ALL: - return selectAll(mailbox); + return selectAll(mailbox, fetchType); case FROM: - return selectFrom(mailbox, set.getUidFrom()); + return selectFrom(mailbox, set.getUidFrom(), fetchType); case RANGE: - return selectRange(mailbox, set.getUidFrom(), set.getUidTo()); + return selectRange(mailbox, set.getUidFrom(), set.getUidTo(), fetchType); case ONE: - return selectMessage(mailbox, set.getUidFrom()); + return selectMessage(mailbox, set.getUidFrom(), fetchType); } throw new UnsupportedOperationException(); } - private Where selectAll(Mailbox<CassandraId> mailbox) { - return select(FIELDS) + private Where selectAll(Mailbox<CassandraId> mailbox, FetchType fetchType) { + return select(retrieveFields(fetchType)) .from(TABLE_NAME) .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())); } - private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) { - return select(FIELDS) + private Where selectFrom(Mailbox<CassandraId> mailbox, long uid, FetchType fetchType) { + return select(retrieveFields(fetchType)) .from(TABLE_NAME) .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) .and(gte(IMAP_UID, uid)); } - private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) { - return select(FIELDS) + private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to, FetchType fetchType) { + return select(retrieveFields(fetchType)) .from(TABLE_NAME) .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) .and(gte(IMAP_UID, from)) .and(lte(IMAP_UID, to)); } - private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) { - return select(FIELDS) + private Where selectMessage(Mailbox<CassandraId> mailbox, long uid, FetchType fetchType) { + return select(retrieveFields(fetchType)) .from(TABLE_NAME) .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())) .and(eq(IMAP_UID, uid)); } + private String[] retrieveFields(FetchType fetchType) { + switch (fetchType) { + case Body: + case Full: + return FIELDS; + case Headers: + return HEADERS; + case Metadata: + return METADATA; + default: + throw new RuntimeException("Unknown FetchType " + fetchType); + } + } + + private SharedByteArrayInputStream buildContent(Row row, FetchType fetchType) { + switch (fetchType) { + case Full: + case Body: + return new SharedByteArrayInputStream(getFullContent(row)); + case Headers: + return new SharedByteArrayInputStream(getFieldContent(HEADER_CONTENT, row)); + case Metadata: + return new SharedByteArrayInputStream(new byte[]{}); + default: + throw new RuntimeException("Unknown FetchType " + fetchType); + } + } + + private byte[] getFullContent(Row row) { + return Bytes.concat(getFieldContent(HEADER_CONTENT, row), getFieldContent(BODY_CONTENT, row)); + } + + private byte[] getFieldContent(String field, Row row) { + byte[] headerContent = new byte[row.getBytes(field).remaining()]; + row.getBytes(field).get(headerContent); + return headerContent; + } + } http://git-wip-us.apache.org/repos/asf/james-project/blob/7a86fe1a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java index b1829ba..aaf3692 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java @@ -37,7 +37,10 @@ public interface CassandraMessageTable { String BODY_CONTENT = "bodyContent"; String HEADER_CONTENT = "headerContent"; String PROPERTIES = "properties"; + String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, Flag.USER_FLAGS, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES }; + String[] METADATA = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, Flag.USER_FLAGS, TEXTUAL_LINE_COUNT, PROPERTIES }; + String[] HEADERS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, Flag.USER_FLAGS, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES }; interface Flag { String ANSWERED = "flagAnswered"; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
