Author: asankha
Date: Fri Mar 27 11:37:35 2009
New Revision: 759107
URL: http://svn.apache.org/viewvc?rev=759107&view=rev
Log:
Fix WSCOMMONS-454 (part of SYNAPSE-434 - for Mail transport)
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=759107&r1=759106&r2=759107&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
Fri Mar 27 11:37:35 2009
@@ -40,15 +40,7 @@
import org.apache.axis2.transport.base.event.TransportErrorSource;
import org.apache.axis2.transport.base.event.TransportErrorSourceSupport;
-import javax.mail.Flags;
-import javax.mail.Folder;
-import javax.mail.Header;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.Part;
-import javax.mail.Session;
-import javax.mail.Store;
+import javax.mail.*;
import javax.mail.internet.AddressException;
import javax.mail.internet.ContentType;
import javax.mail.internet.InternetAddress;
@@ -62,6 +54,8 @@
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.lang.reflect.Method;
/**
* This mail transport lister implementation uses the base transport framework
and is a polling
@@ -163,6 +157,8 @@
if (connected) {
Folder folder = null;
+ CountDownLatch latch = null;
+
try {
if (entry.getFolder() != null) {
@@ -191,40 +187,42 @@
log.debug(messages.length + " messgaes in folder : " +
folder);
}
+ latch = new CountDownLatch(total);
for (int i = 0; i < total; i++) {
- String[] status = messages[i].getHeader("Status");
- if (status != null && status.length == 1 &&
status[0].equals("RO")) {
- // some times the mail server sends a special mail
message which is not relavent
- // in processing. ignore this message.
- if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " + i + " : " +
- messages[i].getSubject() + " - Status:
RO");
- }
- } else if (messages[i].isSet(Flags.Flag.SEEN)) {
- if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " + i + " : " +
- messages[i].getSubject() + " - already
marked SEEN");
+
+ try {
+ String[] status = messages[i].getHeader("Status");
+ if (status != null && status.length == 1 &&
status[0].equals("RO")) {
+ // some times the mail server sends a special
mail message which is
+ // not relavent in processing. ignore this
message.
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() + "
- Status: RO");
+ }
+ latch.countDown();
+ } else if (messages[i].isSet(Flags.Flag.SEEN)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() + "
- already marked SEEN");
+ }
+ latch.countDown();
+ } else if (messages[i].isSet(Flags.Flag.DELETED)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() +
" - already marked DELETED");
+ }
+ latch.countDown();
+
+ } else {
+ processMail(entry, folder, store, messages[i],
latch);
}
- } else if (messages[i].isSet(Flags.Flag.DELETED)) {
+ } catch (MessageRemovedException ignore) {
+ // while reading the meta information, this mail
was deleted, thats ok
if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " + i + " : " +
- messages[i].getSubject() + " - already
marked DELETED");
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber() +
+ " as it has been DELETED by another thread
after processing");
}
-
- } else {
- entry.setLastPollState(PollTableEntry.NONE);
- try {
- processMail(messages[i], entry);
-
entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
- metrics.incrementMessagesReceived();
- } catch (Exception e) {
- log.error("Failed to process message", e);
- entry.setLastPollState(PollTableEntry.FAILED);
- metrics.incrementFaultsReceiving();
- tess.error(entry.getService(), e);
- }
-
- moveOrDeleteAfterProcessing(entry, store, folder,
messages[i]);
+ latch.countDown();
}
}
}
@@ -235,8 +233,22 @@
} finally {
+ // wait till all parallel message processing tasks complete
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("Awaiting completion of " + latch.getCount()
+
+ " concurrent message processing threads");
+ }
+ latch.await();
+ } catch (InterruptedException e) {
+ log.warn("Mail transport listner polling thread
interrupted before completion");
+ }
+
try {
folder.close(true /** expunge messages flagged as
DELETED*/);
+ if (log.isDebugEnabled()) {
+ log.debug("Mail folder closed, and deleted mail
expunged");
+ }
} catch (MessagingException e) {
processFailure("Error closing mail folder : " +
folder + " for account : " + emailAddress, e, entry);
@@ -245,6 +257,9 @@
if (store != null) {
try {
store.close();
+ if (log.isDebugEnabled()) {
+ log.debug("Mail store closed for : " +
emailAddress);
+ }
} catch (MessagingException e) {
log.warn("Error closing mail store for account : " +
emailAddress + " :: " + e.getMessage(), e);
@@ -255,6 +270,131 @@
}
/**
+ * Invoke the actual message processor in the current thread or another
worker thread
+ * @param entry PolltableEntry
+ * @param folder mail folder
+ * @param store mail store, to move or delete after processing
+ * @param message message to process
+ * @param pos the message position seen initially
+ * @param mp the MailProcessor object
+ * @param latch the completion latch to notify
+ */
+ private void processMail(PollTableEntry entry, Folder folder, Store store,
Message message,
+ CountDownLatch latch) {
+
+ MailProcessor mp = new MailProcessor(entry, message, store, folder,
latch);
+
+ // should messages be processed in parallel?
+ if (entry.isConcurrentPollingAllowed()) {
+
+ // try to locate the UID of the message
+ String uid = getMessageUID(folder, message);
+
+ if (uid != null) {
+ if (entry.isProcessingUID(uid)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
message.getMessageNumber() + " : UIDL " +
+ uid + " - already being processed by another
thread");
+ }
+ latch.countDown();
+
+ } else {
+ entry.processingUID(uid);
+ mp.setUID(uid);
+
+ if (entry.isProcessingMailInParallel()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing message # : " +
message.getMessageNumber() +
+ " with UID : " + uid + " with a worker
thread");
+ }
+ workerPool.execute(mp);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing message # : " +
message.getMessageNumber() +
+ " with UID : " + uid + " in same thread");
+ }
+ mp.run();
+ }
+ }
+ } else {
+ log.warn("Cannot process mail in parallel as the " +
+ "folder does not support UIDs. Processing message # : " +
+ message.getMessageNumber() + " in the same thread");
+ entry.setConcurrentPollingAllowed(false);
+ mp.run();
+ }
+
+ } else {
+ if (entry.isProcessingMailInParallel()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing message # : " +
message.getMessageNumber() +
+ " with a worker thread");
+ }
+ workerPool.execute(mp);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing message # : " +
message.getMessageNumber() + " in same thread");
+ }
+ mp.run();
+ }
+ }
+ }
+
+ /**
+ * Handle processing of a message, possibly in a new thread
+ */
+ private class MailProcessor implements Runnable {
+
+ private PollTableEntry entry = null;
+ private Message message = null;
+ private Store store = null;
+ private Folder folder = null;
+ private String uid = null;
+ private CountDownLatch doneSignal = null;
+
+ MailProcessor(PollTableEntry entry, Message message, Store store,
Folder folder, CountDownLatch doneSignal) {
+ this.entry = entry;
+ this.message = message;
+ this.store = store;
+ this.folder = folder;
+ this.doneSignal = doneSignal;
+ }
+
+ public void setUID(String uid) {
+ this.uid = uid;
+ }
+
+ public void run() {
+
+ entry.setLastPollState(PollTableEntry.NONE);
+ try {
+ processMail(message, entry);
+ entry.setLastPollState(PollTableEntry.SUCCSESSFUL);
+ metrics.incrementMessagesReceived();
+
+ } catch (Exception e) {
+ log.error("Failed to process message", e);
+ entry.setLastPollState(PollTableEntry.FAILED);
+ metrics.incrementFaultsReceiving();
+ tess.error(entry.getService(), e);
+
+ } finally {
+ if (uid != null) {
+ entry.removeUID(uid);
+ }
+ }
+ try {
+ moveOrDeleteAfterProcessing(entry, store, folder, message);
+ } catch (Exception e) {
+ log.error("Failed to move or delete email message", e);
+ tess.error(entry.getService(), e);
+ }
+
+ doneSignal.countDown();
+ }
+ }
+
+ /**
* Process a mail message through Axis2
*
* @param message the email message
@@ -622,6 +762,18 @@
paramIncl, MailConstants.TRANSPORT_MAIL_MOVE_AFTER_FAILURE);
entry.setMoveAfterFailure(modeFolderAfterFailure);
+ String processInParallel = ParamUtils.getOptionalParam(
+ paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL);
+ if (processInParallel != null) {
+
entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel));
+ }
+
+ String pollInParallel = ParamUtils.getOptionalParam(
+ paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL);
+ if (pollInParallel != null) {
+
entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel));
+ }
+
String strMaxRetryCount = ParamUtils.getOptionalParam(
paramIncl, MailConstants.MAX_RETRY_COUNT);
if (strMaxRetryCount != null)
@@ -643,4 +795,31 @@
public void removeErrorListener(TransportErrorListener listener) {
tess.removeErrorListener(listener);
}
+
+ /**
+ * Return the UID of a message from the given folder
+ * @param folder the POP3 or IMAP folder
+ * @param message the message
+ * @return UID as a String (long is converted to a String) or null
+ */
+ private String getMessageUID(Folder folder, Message message) {
+ String uid = null;
+ if (folder instanceof UIDFolder) {
+ try {
+ uid = Long.toString(((UIDFolder) folder).getUID(message));
+ } catch (MessagingException ignore) {}
+ } else {
+ try {
+ Method m = folder.getClass().getMethod(
+ "getUID", Message.class);
+ Object o = m.invoke(folder, new Object[]{message});
+ if (o != null && o instanceof Long) {
+ uid = Long.toString((Long) o);
+ } else if (o != null && o instanceof String) {
+ uid = (String) o;
+ }
+ } catch (Exception ignore) {}
+ }
+ return uid;
+ }
}