Author: asankha
Date: Fri Mar 27 11:37:35 2009
New Revision: 759107

URL: http://svn.apache.org/viewvc?rev=759107&view=rev
Log:
Fix WSCOMMONS-454 (part of SYNAPSE-434 - for Mail transport)

Modified:
    
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java

Modified: 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=759107&r1=759106&r2=759107&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 Fri Mar 27 11:37:35 2009
@@ -40,15 +40,7 @@
 import org.apache.axis2.transport.base.event.TransportErrorSource;
 import org.apache.axis2.transport.base.event.TransportErrorSourceSupport;
 
-import javax.mail.Flags;
-import javax.mail.Folder;
-import javax.mail.Header;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.Part;
-import javax.mail.Session;
-import javax.mail.Store;
+import javax.mail.*;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.ContentType;
 import javax.mail.internet.InternetAddress;
@@ -62,6 +54,8 @@
 import java.io.InputStream;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.lang.reflect.Method;
 
 /**
  * This mail transport lister implementation uses the base transport framework 
and is a polling
@@ -163,6 +157,8 @@
 
         if (connected) {
             Folder folder = null;
+            CountDownLatch latch = null;
+
             try {
 
                 if (entry.getFolder() != null) {
@@ -191,40 +187,42 @@
                         log.debug(messages.length + " messgaes in folder : " + 
folder);
                     }
 
+                    latch = new CountDownLatch(total);
                     for (int i = 0; i < total; i++) {
-                        String[] status = messages[i].getHeader("Status");
-                        if (status != null && status.length == 1 && 
status[0].equals("RO")) {
-                            // some times the mail server sends a special mail 
message which is not relavent
-                            // in processing. ignore this message.
-                            if (log.isDebugEnabled()) {
-                                log.debug("Skipping message # : " + i + " : " +
-                                        messages[i].getSubject() + " - Status: 
RO");
-                            }
-                        } else if (messages[i].isSet(Flags.Flag.SEEN)) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Skipping message # : " + i + " : " +
-                                    messages[i].getSubject() + " - already 
marked SEEN");
+
+                        try {
+                            String[] status = messages[i].getHeader("Status");
+                            if (status != null && status.length == 1 && 
status[0].equals("RO")) {
+                                // some times the mail server sends a special 
mail message which is
+                                // not relavent in processing. ignore this 
message.
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                        + " : " + messages[i].getSubject() + " 
- Status: RO");
+                                }
+                                latch.countDown();
+                            } else if (messages[i].isSet(Flags.Flag.SEEN)) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                        + " : " + messages[i].getSubject() + " 
- already marked SEEN");
+                                }
+                                latch.countDown();
+                            } else if (messages[i].isSet(Flags.Flag.DELETED)) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Skipping message # : " + 
messages[i].getMessageNumber()
+                                        + " : " +  messages[i].getSubject() + 
" - already marked DELETED");
+                                }
+                                latch.countDown();
+
+                            } else {
+                                processMail(entry, folder, store, messages[i], 
latch);
                             }
-                        } else if (messages[i].isSet(Flags.Flag.DELETED)) {
+                        } catch (MessageRemovedException ignore) {
+                            // while reading the meta information, this mail 
was deleted, thats ok
                             if (log.isDebugEnabled()) {
-                                log.debug("Skipping message # : " + i + " : " +
-                                    messages[i].getSubject() + " - already 
marked DELETED");
+                                log.debug("Skipping message # : " + 
messages[i].getMessageNumber() +
+                                    " as it has been DELETED by another thread 
after processing");
                             }
-
-                        } else {
-                            entry.setLastPollState(PollTableEntry.NONE);
-                            try {
-                                processMail(messages[i], entry);
-                                
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
-                                metrics.incrementMessagesReceived();
-                            } catch (Exception e) {
-                                log.error("Failed to process message", e);
-                                entry.setLastPollState(PollTableEntry.FAILED);
-                                metrics.incrementFaultsReceiving();
-                                tess.error(entry.getService(), e);
-                            }
-
-                            moveOrDeleteAfterProcessing(entry, store, folder, 
messages[i]);
+                            latch.countDown();
                         }
                     }
                 }
@@ -235,8 +233,22 @@
 
             } finally {
 
+                // wait till all parallel message processing tasks complete
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Awaiting completion of " + latch.getCount() 
+
+                            " concurrent message processing threads");
+                    }
+                    latch.await();
+                } catch (InterruptedException e) {
+                    log.warn("Mail transport listner polling thread 
interrupted before completion");
+                }
+
                 try {
                     folder.close(true /** expunge messages flagged as 
DELETED*/);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Mail folder closed, and deleted mail 
expunged");
+                    }
                 } catch (MessagingException e) {
                     processFailure("Error closing mail folder : " +
                         folder + " for account : " + emailAddress, e, entry);
@@ -245,6 +257,9 @@
                 if (store != null) {
                     try {
                         store.close();
+                        if (log.isDebugEnabled()) {
+                            log.debug("Mail store closed for : " + 
emailAddress);
+                        }
                     } catch (MessagingException e) {
                         log.warn("Error closing mail store for account : " +
                             emailAddress + " :: " + e.getMessage(), e);
@@ -255,6 +270,131 @@
     }
 
     /**
+     * Invoke the actual message processor in the current thread or another 
worker thread
+     * @param entry PolltableEntry
+     * @param folder mail folder
+     * @param store mail store, to move or delete after processing
+     * @param message message to process
+     * @param pos the message position seen initially
+     * @param mp the MailProcessor object
+     * @param latch the completion latch to notify
+     */
+    private void processMail(PollTableEntry entry, Folder folder, Store store, 
Message message,
+                             CountDownLatch latch) {
+
+        MailProcessor mp = new MailProcessor(entry, message, store, folder, 
latch);
+
+        // should messages be processed in parallel?
+        if (entry.isConcurrentPollingAllowed()) {
+
+            // try to locate the UID of the message
+            String uid = getMessageUID(folder, message);
+
+            if (uid != null) {
+                if (entry.isProcessingUID(uid)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Skipping message # : " + 
message.getMessageNumber() + " : UIDL " +
+                            uid + " - already being processed by another 
thread");
+                    }
+                    latch.countDown();
+
+                } else {
+                    entry.processingUID(uid);
+                    mp.setUID(uid);
+                    
+                    if (entry.isProcessingMailInParallel()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Processing message # : " + 
message.getMessageNumber() +
+                                " with UID : " + uid + " with a worker 
thread");
+                        }
+                        workerPool.execute(mp);
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Processing message # : " + 
message.getMessageNumber() +
+                                " with UID : " + uid + " in same thread");
+                        }
+                        mp.run();
+                    }
+                }
+            } else {
+                log.warn("Cannot process mail in parallel as the " +
+                    "folder does not support UIDs. Processing message # : " +
+                    message.getMessageNumber() + " in the same thread");
+                entry.setConcurrentPollingAllowed(false);
+                mp.run();
+            }
+
+        } else {
+            if (entry.isProcessingMailInParallel()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Processing message # : " + 
message.getMessageNumber() +
+                        " with a worker thread");
+                }
+                workerPool.execute(mp);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Processing message # : " + 
message.getMessageNumber() + " in same thread");
+                }
+                mp.run();
+            }
+        }
+    }
+
+    /**
+     * Handle processing of a message, possibly in a new thread
+     */
+    private class MailProcessor implements Runnable {
+
+        private PollTableEntry entry = null;
+        private Message message = null;
+        private Store store = null;
+        private Folder folder = null;
+        private String uid = null;
+        private CountDownLatch doneSignal = null;
+
+        MailProcessor(PollTableEntry entry, Message message, Store store, 
Folder folder, CountDownLatch doneSignal) {
+            this.entry = entry;
+            this.message = message;
+            this.store = store;
+            this.folder = folder;
+            this.doneSignal = doneSignal;
+        }
+
+        public void setUID(String uid) {
+            this.uid = uid;
+        }
+
+        public void run() {
+
+            entry.setLastPollState(PollTableEntry.NONE);
+            try {
+                processMail(message, entry);
+                entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
+                metrics.incrementMessagesReceived();
+
+            } catch (Exception e) {
+                log.error("Failed to process message", e);
+                entry.setLastPollState(PollTableEntry.FAILED);
+                metrics.incrementFaultsReceiving();
+                tess.error(entry.getService(), e);
+
+            } finally {
+                if (uid != null) {
+                    entry.removeUID(uid);
+                }
+            }
+            try {
+                moveOrDeleteAfterProcessing(entry, store, folder, message);
+            } catch (Exception e) {
+                log.error("Failed to move or delete email message", e);
+                tess.error(entry.getService(), e);
+            }
+
+            doneSignal.countDown();
+        }
+    }
+
+    /**
      * Process a mail message through Axis2
      *
      * @param message the email message
@@ -622,6 +762,18 @@
                 paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_FAILURE);
             entry.setMoveAfterFailure(modeFolderAfterFailure);
 
+            String processInParallel = ParamUtils.getOptionalParam(
+                paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL);
+            if (processInParallel != null) {
+                
entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel));
+            }
+
+            String pollInParallel = ParamUtils.getOptionalParam(
+                paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL);
+            if (pollInParallel != null) {
+                
entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel));
+            }
+
             String strMaxRetryCount = ParamUtils.getOptionalParam(
                 paramIncl, MailConstants.MAX_RETRY_COUNT);
             if (strMaxRetryCount != null)
@@ -643,4 +795,31 @@
     public void removeErrorListener(TransportErrorListener listener) {
         tess.removeErrorListener(listener);
     }
+
+    /**
+     * Return the UID of a message from the given folder
+     * @param folder the POP3 or IMAP folder
+     * @param message the message
+     * @return UID as a String (long is converted to a String) or null
+     */
+    private String getMessageUID(Folder folder, Message message) {
+        String uid = null;
+        if (folder instanceof UIDFolder) {
+            try {
+                uid = Long.toString(((UIDFolder) folder).getUID(message));
+            } catch (MessagingException ignore) {}
+        } else {
+            try {
+                Method m = folder.getClass().getMethod(
+                    "getUID", Message.class);
+                Object o = m.invoke(folder, new Object[]{message});
+                if (o != null && o instanceof Long) {
+                    uid = Long.toString((Long) o);
+                } else if (o != null && o instanceof String) {
+                    uid = (String) o;
+                }
+            } catch (Exception ignore) {}
+        }
+        return uid;
+    }
 }


Reply via email to