MAILBOX-265 Cassandra Message Mapper should take Headers and Metadata fetch 
Type into account


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7a86fe1a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7a86fe1a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7a86fe1a

Branch: refs/heads/master
Commit: 7a86fe1a7250fa05c658cf817190c2eff7e83c1f
Parents: 6a90560
Author: Benoit Tellier <[email protected]>
Authored: Thu Mar 3 11:52:26 2016 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Fri Mar 4 19:35:24 2016 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 103 +++++++++++++------
 .../cassandra/table/CassandraMessageTable.java  |   3 +
 2 files changed, 72 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7a86fe1a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index e001839..a401b90 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -33,10 +33,12 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BOD
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.BODY_START_OCTET;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FIELDS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.FULL_CONTENT_OCTETS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADERS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.HEADER_CONTENT;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.IMAP_UID;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.INTERNAL_DATE;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.MAILBOX_ID;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.METADATA;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.MOD_SEQ;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.PROPERTIES;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageTable.TABLE_NAME;
@@ -164,15 +166,15 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
 
     @Override
     public Iterator<MailboxMessage<CassandraId>> 
findInMailbox(Mailbox<CassandraId> mailbox, MessageRange set, FetchType ftype, 
int max) throws MailboxException {
-        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set)))
-            .map(this::message)
+        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, ftype)))
+            .map(row -> message(row, ftype))
             .sorted(Comparator.comparingLong(MailboxMessage::getUid))
             .iterator();
     }
 
     @Override
     public List<Long> findRecentMessageUidsInMailbox(Mailbox<CassandraId> 
mailbox) throws MailboxException {
-        return 
CassandraUtils.convertToStream(session.execute(selectAll(mailbox).and((eq(RECENT,
 true)))))
+        return 
CassandraUtils.convertToStream(session.execute(selectAll(mailbox, 
FetchType.Metadata).and((eq(RECENT, true)))))
             .map((row) -> row.getLong(IMAP_UID))
             .sorted()
             .collect(Collectors.toList());
@@ -180,7 +182,7 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
 
     @Override
     public Long findFirstUnseenMessageUid(Mailbox<CassandraId> mailbox) throws 
MailboxException {
-        return 
CassandraUtils.convertToStream(session.execute(selectAll(mailbox).and((eq(SEEN, 
false)))))
+        return 
CassandraUtils.convertToStream(session.execute(selectAll(mailbox, 
FetchType.Metadata).and((eq(SEEN, false)))))
             .map((row) -> row.getLong(IMAP_UID))
             .sorted()
             .findFirst()
@@ -189,8 +191,8 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
 
     @Override
     public Map<Long, MessageMetaData> 
expungeMarkedForDeletionInMailbox(Mailbox<CassandraId> mailbox, MessageRange 
set) throws MailboxException {
-        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, 
set).and(eq(DELETED, true))))
-            .map(this::message)
+        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, 
FetchType.Metadata).and(eq(DELETED, true))))
+            .map(row -> message(row, FetchType.Metadata))
             .peek((message) -> delete(mailbox, message))
             .collect(Collectors.toMap(MailboxMessage::getUid, 
SimpleMessageMetaData::new));
     }
@@ -226,7 +228,7 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
 
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws 
MailboxException {
-        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set)))
+        return 
CassandraUtils.convertToStream(session.execute(buildQuery(mailbox, set, 
FetchType.Metadata)))
             .map((row) -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, 
row))
             .filter(Optional::isPresent)
             .map(Optional::get)
@@ -271,13 +273,13 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
         
session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID,
 mailboxId.asUuid())));
     }
 
-    private MailboxMessage<CassandraId> message(Row row) {
+    private MailboxMessage<CassandraId> message(Row row, FetchType fetchType) {
         SimpleMailboxMessage<CassandraId> message =
             new SimpleMailboxMessage<>(
                 row.getDate(INTERNAL_DATE),
                 row.getInt(FULL_CONTENT_OCTETS),
                 row.getInt(BODY_START_OCTET),
-                new SharedByteArrayInputStream(getFullContent(row)),
+                buildContent(row, fetchType),
                 getFlags(row),
                 getPropertyBuilder(row),
                 CassandraId.of(row.getUUID(MAILBOX_ID)));
@@ -286,14 +288,6 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
         return message;
     }
 
-    private byte[] getFullContent(Row row) {
-        byte[] headerContent = new 
byte[row.getBytes(HEADER_CONTENT).remaining()];
-        byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()];
-        row.getBytes(HEADER_CONTENT).get(headerContent);
-        row.getBytes(BODY_CONTENT).get(bodyContent);
-        return Bytes.concat(headerContent, bodyContent);
-    }
-
     private Flags getFlags(Row row) {
         Flags flags = new Flags();
         for (String flag : CassandraMessageTable.Flag.ALL) {
@@ -365,7 +359,7 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
     }
 
     private Optional<UpdatedFlags> updateFlagsOnMessage(Mailbox<CassandraId> 
mailbox, FlagsUpdateCalculator flagUpdateCalculator, Row row) {
-        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
message(row))
+        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
message(row, FetchType.Metadata))
             .map(Optional::of)
             .orElse(handleRetries(mailbox, flagUpdateCalculator, 
row.getLong(IMAP_UID)));
     }
@@ -401,7 +395,11 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
     }
 
     private Optional<UpdatedFlags> 
retryMessageFlagsUpdate(Mailbox<CassandraId> mailbox, long uid, 
FlagsUpdateCalculator flagUpdateCalculator) {
-        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
message(Optional.ofNullable(session.execute(selectMessage(mailbox, 
uid)).one()).orElseThrow(() -> new 
MessageDeletedDuringFlagsUpdateException(mailbox.getMailboxId(), uid))));
+        return tryMessageFlagsUpdate(flagUpdateCalculator,
+            mailbox,
+            message(Optional.ofNullable(session.execute(selectMessage(mailbox, 
uid, FetchType.Metadata)).one())
+                .orElseThrow(() -> new 
MessageDeletedDuringFlagsUpdateException(mailbox.getMailboxId(), uid)),
+                FetchType.Metadata));
     }
 
     private boolean conditionalSave(MailboxMessage<CassandraId> message, long 
oldModSeq) {
@@ -426,47 +424,84 @@ public class CassandraMessageMapper implements 
MessageMapper<CassandraId> {
         return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
     }
 
-    private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) {
-        final MessageRange.Type type = set.getType();
-        switch (type) {
+    private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set, 
FetchType fetchType) {
+        switch (set.getType()) {
         case ALL:
-            return selectAll(mailbox);
+            return selectAll(mailbox, fetchType);
         case FROM:
-            return selectFrom(mailbox, set.getUidFrom());
+            return selectFrom(mailbox, set.getUidFrom(), fetchType);
         case RANGE:
-            return selectRange(mailbox, set.getUidFrom(), set.getUidTo());
+            return selectRange(mailbox, set.getUidFrom(), set.getUidTo(), 
fetchType);
         case ONE:
-            return selectMessage(mailbox, set.getUidFrom());
+            return selectMessage(mailbox, set.getUidFrom(), fetchType);
         }
         throw new UnsupportedOperationException();
     }
 
-    private Where selectAll(Mailbox<CassandraId> mailbox) {
-        return select(FIELDS)
+    private Where selectAll(Mailbox<CassandraId> mailbox, FetchType fetchType) 
{
+        return select(retrieveFields(fetchType))
             .from(TABLE_NAME)
             .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()));
     }
 
-    private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) {
-        return select(FIELDS)
+    private Where selectFrom(Mailbox<CassandraId> mailbox, long uid, FetchType 
fetchType) {
+        return select(retrieveFields(fetchType))
             .from(TABLE_NAME)
             .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
             .and(gte(IMAP_UID, uid));
     }
 
-    private Where selectRange(Mailbox<CassandraId> mailbox, long from, long 
to) {
-        return select(FIELDS)
+    private Where selectRange(Mailbox<CassandraId> mailbox, long from, long 
to, FetchType fetchType) {
+        return select(retrieveFields(fetchType))
             .from(TABLE_NAME)
             .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
             .and(gte(IMAP_UID, from))
             .and(lte(IMAP_UID, to));
     }
 
-    private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) {
-        return select(FIELDS)
+    private Where selectMessage(Mailbox<CassandraId> mailbox, long uid, 
FetchType fetchType) {
+        return select(retrieveFields(fetchType))
             .from(TABLE_NAME)
             .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
             .and(eq(IMAP_UID, uid));
     }
 
+    private String[] retrieveFields(FetchType fetchType) {
+        switch (fetchType) {
+            case Body:
+            case Full:
+                return FIELDS;
+            case Headers:
+                return HEADERS;
+            case Metadata:
+                return METADATA;
+            default:
+                throw new RuntimeException("Unknown FetchType " + fetchType);
+        }
+    }
+
+    private SharedByteArrayInputStream buildContent(Row row, FetchType 
fetchType) {
+        switch (fetchType) {
+            case Full:
+            case Body:
+                return new SharedByteArrayInputStream(getFullContent(row));
+            case Headers:
+                return new 
SharedByteArrayInputStream(getFieldContent(HEADER_CONTENT, row));
+            case Metadata:
+                return new SharedByteArrayInputStream(new byte[]{});
+            default:
+                throw new RuntimeException("Unknown FetchType " + fetchType);
+        }
+    }
+
+    private byte[] getFullContent(Row row) {
+        return Bytes.concat(getFieldContent(HEADER_CONTENT, row), 
getFieldContent(BODY_CONTENT, row));
+    }
+
+    private byte[] getFieldContent(String field, Row row) {
+        byte[] headerContent = new byte[row.getBytes(field).remaining()];
+        row.getBytes(field).get(headerContent);
+        return headerContent;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7a86fe1a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
index b1829ba..aaf3692 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageTable.java
@@ -37,7 +37,10 @@ public interface CassandraMessageTable {
     String BODY_CONTENT = "bodyContent";
     String HEADER_CONTENT = "headerContent";
     String PROPERTIES = "properties";
+
     String[] FIELDS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, 
Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, 
Flag.USER_FLAGS, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES };
+    String[] METADATA = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, 
Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, 
Flag.USER_FLAGS, TEXTUAL_LINE_COUNT, PROPERTIES };
+    String[] HEADERS = { MAILBOX_ID, IMAP_UID, INTERNAL_DATE, MOD_SEQ, 
BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, Flag.ANSWERED, 
Flag.DELETED, Flag.DRAFT, Flag.FLAGGED, Flag.RECENT, Flag.SEEN, Flag.USER, 
Flag.USER_FLAGS, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES };
 
     interface Flag {
         String ANSWERED = "flagAnswered";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to