Author: asankha
Date: Thu Apr 2 04:58:21 2009
New Revision: 761168
URL: http://svn.apache.org/viewvc?rev=761168&view=rev
Log:
Fix WSCOMMONS-454 fully as per comment by Andreas, that waiting for sub-task
completion could lead to a deadlock. Now the main task threads do not wait for
sub tasks that process messages to complete, but the last sub task that
complets will perform completion tasks, and also request the
AbstractTransportListener to schedule the next run - if appropriate
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractPollingTransportListener.java
Thu Apr 2 04:58:21 2009
@@ -47,7 +47,8 @@
super.init(cfgCtx, transportIn);
T entry = createPollTableEntry(transportIn);
if (entry != null) {
- schedulePoll(entry, getPollInterval(transportIn));
+ entry.setPollInterval(getPollInterval(transportIn));
+ schedulePoll(entry);
pollTable.add(entry);
}
}
@@ -82,7 +83,8 @@
* @param entry the poll table entry with the configuration for the service
* @param pollInterval the interval between successive polls in
milliseconds
*/
- void schedulePoll(final T entry, final long pollInterval) {
+ void schedulePoll(final T entry) {
+ final long pollInterval = entry.getPollInterval();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
@@ -96,14 +98,6 @@
} else {
poll(entry);
}
-
- if (!entry.isConcurrentPollingAllowed()) {
- synchronized (entry) {
- if (!entry.canceled) {
- schedulePoll(entry, pollInterval);
- }
- }
- }
}
});
}
@@ -123,9 +117,19 @@
}
pollTable.remove(entry);
}
-
+
protected abstract void poll(T entry);
+ protected void onPollCompletion(T entry) {
+ if (!entry.isConcurrentPollingAllowed()) {
+ synchronized (entry) {
+ if (!entry.canceled) {
+ schedulePoll(entry);
+ }
+ }
+ }
+ }
+
/**
* method to log a failure to the log file and to update the last poll
status and time
* @param msg text for the log message
@@ -142,6 +146,7 @@
entry.setLastPollState(AbstractPollTableEntry.FAILED);
entry.setLastPollTime(now);
entry.setNextPollTime(now + entry.getPollInterval());
+ onPollCompletion(entry);
}
private long getPollInterval(ParameterInclude params) {
@@ -173,7 +178,8 @@
throw new AxisFault("The service has no configuration for the
transport");
}
entry.setService(service);
- schedulePoll(entry, getPollInterval(service));
+ entry.setPollInterval(getPollInterval(service));
+ schedulePoll(entry);
pollTable.add(entry);
}
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=761168&r1=761167&r2=761168&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
Thu Apr 2 04:58:21 2009
@@ -117,6 +117,8 @@
long reconnectionTimeout = entry.getReconnectTimeout();
Session session = entry.getSession();
Store store = null;
+ Folder folder = null;
+ boolean mailProcessingStarted = false;
while (!connected) {
try {
@@ -137,6 +139,17 @@
// were we able to connect?
connected = store.isConnected();
+ if (connected) {
+ if (entry.getFolder() != null) {
+ folder = store.getFolder(entry.getFolder());
+ } else {
+ folder = store.getFolder(MailConstants.DEFAULT_FOLDER);
+ }
+ if (folder == null) {
+ folder = store.getDefaultFolder();
+ }
+ }
+
} catch (Exception e) {
log.error("Error connecting to mail server for address : " +
emailAddress, e);
if (maxRetryCount <= retryCount) {
@@ -156,117 +169,78 @@
}
}
- if (connected) {
- Folder folder = null;
+ if (connected && folder != null) {
+
CountDownLatch latch = null;
+ Runnable onCompletion = new MailCheckCompletionTask(folder, store,
emailAddress, entry);
try {
-
- if (entry.getFolder() != null) {
- folder = store.getFolder(entry.getFolder());
- } else {
- folder = store.getFolder(MailConstants.DEFAULT_FOLDER);
- }
- if (folder == null) {
- folder = store.getDefaultFolder();
+ if (log.isDebugEnabled()) {
+ log.debug("Connecting to folder : " + folder.getName() +
+ " of email account : " + emailAddress);
}
- if (folder == null) {
- processFailure("Unable to access mail folder", null,
entry);
-
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Connecting to folder : " + folder.getName()
+
- " of email account : " + emailAddress);
- }
-
- folder.open(Folder.READ_WRITE);
- int total = folder.getMessageCount();
- Message[] messages = folder.getMessages();
-
- if (log.isDebugEnabled()) {
- log.debug(messages.length + " messgaes in folder : " +
folder);
- }
+ folder.open(Folder.READ_WRITE);
+ int total = folder.getMessageCount();
+ Message[] messages = folder.getMessages();
- latch = new CountDownLatch(total);
- for (int i = 0; i < total; i++) {
+ if (log.isDebugEnabled()) {
+ log.debug(messages.length + " messgaes in folder : " +
folder);
+ }
- try {
- String[] status = messages[i].getHeader("Status");
- if (status != null && status.length == 1 &&
status[0].equals("RO")) {
- // some times the mail server sends a special
mail message which is
- // not relavent in processing. ignore this
message.
- if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " +
messages[i].getMessageNumber()
- + " : " + messages[i].getSubject() + "
- Status: RO");
- }
- latch.countDown();
- } else if (messages[i].isSet(Flags.Flag.SEEN)) {
- if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " +
messages[i].getMessageNumber()
- + " : " + messages[i].getSubject() + "
- already marked SEEN");
- }
- latch.countDown();
- } else if (messages[i].isSet(Flags.Flag.DELETED)) {
- if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " +
messages[i].getMessageNumber()
- + " : " + messages[i].getSubject() +
" - already marked DELETED");
- }
- latch.countDown();
+ latch = new CountDownLatch(total);
+ for (int i = 0; i < total; i++) {
- } else {
- processMail(entry, folder, store, messages[i],
latch);
+ try {
+ String[] status = messages[i].getHeader("Status");
+ if (status != null && status.length == 1 &&
status[0].equals("RO")) {
+ // some times the mail server sends a special mail
message which is
+ // not relavent in processing. ignore this message.
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() + " -
Status: RO");
+ }
+ latch.countDown();
+ } else if (messages[i].isSet(Flags.Flag.SEEN)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() + " -
already marked SEEN");
}
- } catch (MessageRemovedException ignore) {
- // while reading the meta information, this mail
was deleted, thats ok
+ latch.countDown();
+ } else if (messages[i].isSet(Flags.Flag.DELETED)) {
if (log.isDebugEnabled()) {
- log.debug("Skipping message # : " +
messages[i].getMessageNumber() +
- " as it has been DELETED by another thread
after processing");
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber()
+ + " : " + messages[i].getSubject() + " -
already marked DELETED");
}
latch.countDown();
+
+ } else {
+ processMail(entry, folder, store, messages[i],
latch, onCompletion);
+ mailProcessingStarted = true;
}
+ } catch (MessageRemovedException ignore) {
+ // while reading the meta information, this mail was
deleted, thats ok
+ if (log.isDebugEnabled()) {
+ log.debug("Skipping message # : " +
messages[i].getMessageNumber() +
+ " as it has been DELETED by another thread
after processing");
+ }
+ latch.countDown();
}
}
+ if (!mailProcessingStarted) {
+ // if we didnt process any mail in this run, the
onCompletion will not
+ // run from the mail processor by default
+ onCompletion.run();
+ }
+
} catch (MessagingException me) {
processFailure("Error checking mail for account : " +
emailAddress + " :: " + me.getMessage(), me, entry);
-
- } finally {
-
- // wait till all parallel message processing tasks complete
- try {
- if (log.isDebugEnabled()) {
- log.debug("Awaiting completion of " + latch.getCount()
+
- " concurrent message processing threads");
- }
- latch.await();
- } catch (InterruptedException e) {
- log.warn("Mail transport listner polling thread
interrupted before completion");
- }
-
- try {
- folder.close(true /** expunge messages flagged as
DELETED*/);
- if (log.isDebugEnabled()) {
- log.debug("Mail folder closed, and deleted mail
expunged");
- }
- } catch (MessagingException e) {
- processFailure("Error closing mail folder : " +
- folder + " for account : " + emailAddress, e, entry);
- }
-
- if (store != null) {
- try {
- store.close();
- if (log.isDebugEnabled()) {
- log.debug("Mail store closed for : " +
emailAddress);
- }
- } catch (MessagingException e) {
- log.warn("Error closing mail store for account : " +
- emailAddress + " :: " + e.getMessage(), e);
- }
- }
}
+
+ } else {
+ processFailure("Unable to access mail folder", null, entry);
}
}
@@ -279,11 +253,12 @@
* @param pos the message position seen initially
* @param mp the MailProcessor object
* @param latch the completion latch to notify
+ * @param onCompletion the tasks to run on the completion of mail
processing
*/
private void processMail(PollTableEntry entry, Folder folder, Store store,
Message message,
- CountDownLatch latch) {
+ CountDownLatch latch, Runnable onCompletion) {
- MailProcessor mp = new MailProcessor(entry, message, store, folder,
latch);
+ MailProcessor mp = new MailProcessor(entry, message, store, folder,
latch, onCompletion);
// should messages be processed in parallel?
if (entry.isConcurrentPollingAllowed()) {
@@ -352,13 +327,16 @@
private Folder folder = null;
private String uid = null;
private CountDownLatch doneSignal = null;
+ private Runnable onCompletion = null;
- MailProcessor(PollTableEntry entry, Message message, Store store,
Folder folder, CountDownLatch doneSignal) {
+ MailProcessor(PollTableEntry entry, Message message, Store store,
Folder folder,
+ CountDownLatch doneSignal, Runnable onCompletion) {
this.entry = entry;
this.message = message;
this.store = store;
this.folder = folder;
this.doneSignal = doneSignal;
+ this.onCompletion = onCompletion;
}
public void setUID(String uid) {
@@ -392,6 +370,73 @@
}
doneSignal.countDown();
+
+ if (doneSignal.getCount() == 0) {
+ onCompletion.run();
+ }
+ }
+ }
+
+ /**
+ * Handle optional logic of the mail transport, that needs to happen once
all messages in
+ * a check mail cycle has ended.
+ */
+ private class MailCheckCompletionTask implements Runnable {
+ private final Folder folder;
+ private final Store store;
+ private final InternetAddress emailAddress;
+ private final PollTableEntry entry;
+ private boolean taskStarted = false;
+
+ public MailCheckCompletionTask(Folder folder, Store store,
+ InternetAddress emailAddress,
PollTableEntry entry) {
+ this.folder = folder;
+ this.store = store;
+ this.emailAddress = emailAddress;
+ this.entry = entry;
+ }
+
+ public void run() {
+ synchronized(this) {
+ if (taskStarted) {
+ return;
+ } else {
+ taskStarted = true;
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Executing onCompletion task for the mail download
of : " + emailAddress);
+ }
+
+ if (folder != null) {
+ try {
+ folder.close(true /** expunge messages flagged as
DELETED*/);
+ if (log.isDebugEnabled()) {
+ log.debug("Mail folder closed, and deleted mail
expunged");
+ }
+ } catch (MessagingException e) {
+ log.warn("Error closing mail folder : " +
+ folder + " for account : " + emailAddress + " :: "+
e.getMessage());
+ }
+ }
+
+ if (store != null) {
+ try {
+ store.close();
+ if (log.isDebugEnabled()) {
+ log.debug("Mail store closed for : " + emailAddress);
+ }
+ } catch (MessagingException e) {
+ log.warn("Error closing mail store for account : " +
+ emailAddress + " :: " + e.getMessage(), e);
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Scheduling next poll for : " + emailAddress);
+ }
+ onPollCompletion(entry);
}
}
@@ -769,12 +814,18 @@
paramIncl, MailConstants.TRANSPORT_MAIL_PROCESS_IN_PARALLEL);
if (processInParallel != null) {
entry.setProcessingMailInParallel(Boolean.parseBoolean(processInParallel));
+ if (log.isDebugEnabled() &&
entry.isProcessingMailInParallel()) {
+ log.debug("Parallel mail processing enabled for : " +
address);
+ }
}
String pollInParallel = ParamUtils.getOptionalParam(
paramIncl, BaseConstants.TRANSPORT_POLL_IN_PARALLEL);
if (pollInParallel != null) {
entry.setConcurrentPollingAllowed(Boolean.parseBoolean(pollInParallel));
+ if (log.isDebugEnabled() &&
entry.isConcurrentPollingAllowed()) {
+ log.debug("Concurrent mail polling enabled for : " +
address);
+ }
}
String strMaxRetryCount = ParamUtils.getOptionalParam(