Author: norman Date: Mon Oct 18 11:24:34 2010 New Revision: 1023741 URL: http://svn.apache.org/viewvc?rev=1023741&view=rev Log: Refactor MailQueue interface to return a MailQueueItem. This is more elegant then before and allows better integration with multithreading
Added: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java Modified: james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java Modified: james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java URL: http://svn.apache.org/viewvc/james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java (original) +++ james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java Mon Oct 18 11:24:34 2010 @@ -26,8 +26,8 @@ import org.apache.james.dnsservice.api.T import org.apache.james.lifecycle.LifecycleUtil; import org.apache.james.queue.MailQueue; import org.apache.james.queue.MailQueueFactory; -import org.apache.james.queue.MailQueue.DequeueOperation; import org.apache.james.queue.MailQueue.MailQueueException; +import org.apache.james.queue.MailQueue.MailQueueItem; import org.apache.james.services.MailServer; import org.apache.james.util.TimeConverter; import org.apache.mailet.base.GenericMailet; @@ -710,65 +710,65 @@ public class RemoteDelivery extends Gene // of time to block is determined by the 'getWaitTime' // method of the // MultipleDelayFilter. - queue.deQueue(new DequeueOperation() { + MailQueueItem queueItem = queue.deQueue(); + Mail mail = queueItem.getMail(); + + String key = mail.getName(); + try { + if (isDebug) { + String message = Thread.currentThread().getName() + + " will process mail " + key; + log(message); + } - public void process(Mail mail) throws MailQueueException { - String key = mail.getName(); + // Deliver message + if (deliver(mail, session)) { + // Message was successfully delivered/fully failed... + // delete it + LifecycleUtil.dispose(mail); + //workRepository.remove(key); + } else { + // Something happened that will delay delivery. + // Store it back in the retry repository. + //workRepository.store(mail); + int retries = 0; try { - if (isDebug) { - String message = Thread.currentThread().getName() - + " will process mail " + key; - log(message); - } - - // Deliver message - if (deliver(mail, session)) { - // Message was successfully delivered/fully failed... - // delete it - LifecycleUtil.dispose(mail); - //workRepository.remove(key); - } else { - // Something happened that will delay delivery. - // Store it back in the retry repository. - //workRepository.store(mail); - int retries = 0; - try { - retries = Integer.parseInt(mail.getErrorMessage()); - } catch (NumberFormatException e) { - // Something strange was happen with the errorMessage.. - } - - long delay = getNextDelay (retries); - queue.enQueue(mail, delay, TimeUnit.MILLISECONDS); - LifecycleUtil.dispose(mail); - - // This is an update, so we have to unlock and - // notify or this mail is kept locked by this thread. - //workRepository.unlock(key); - - // Note: We do not notify because we updated an - // already existing mail and we are now free to handle - // more mails. - // Furthermore this mail should not be processed now - // because we have a retry time scheduling. - } - - // Clear the object handle to make sure it recycles - // this object. - mail = null; - } catch (Exception e) { - // Prevent unexpected exceptions from causing looping by - // removing message from outgoing. - // DO NOT CHANGE THIS to catch Error! For example, if - // there were an OutOfMemory condition caused because - // something else in the server was abusing memory, we would - // not want to start purging the retrying spool! - LifecycleUtil.dispose(mail); - //workRepository.remove(key); - throw new MailQueueException("Unable to perform dequeue", e); + retries = Integer.parseInt(mail.getErrorMessage()); + } catch (NumberFormatException e) { + // Something strange was happen with the errorMessage.. } + + long delay = getNextDelay (retries); + queue.enQueue(mail, delay, TimeUnit.MILLISECONDS); + LifecycleUtil.dispose(mail); + + // This is an update, so we have to unlock and + // notify or this mail is kept locked by this thread. + //workRepository.unlock(key); + + // Note: We do not notify because we updated an + // already existing mail and we are now free to handle + // more mails. + // Furthermore this mail should not be processed now + // because we have a retry time scheduling. } - }); + + // Clear the object handle to make sure it recycles + // this object. + mail = null; + queueItem.done(true); + } catch (Exception e) { + // Prevent unexpected exceptions from causing looping by + // removing message from outgoing. + // DO NOT CHANGE THIS to catch Error! For example, if + // there were an OutOfMemory condition caused because + // something else in the server was abusing memory, we would + // not want to start purging the retrying spool! + LifecycleUtil.dispose(mail); + //workRepository.remove(key); + queueItem.done(false); + throw new MailQueueException("Unable to perform dequeue", e); + } } catch (Throwable e) { Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original) +++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Mon Oct 18 11:24:34 2010 @@ -27,20 +27,17 @@ import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.Session; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; -import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.pool.PooledSession; import org.apache.commons.logging.Log; import org.apache.james.core.MimeMessageCopyOnWriteProxy; import org.apache.james.core.MimeMessageInputStream; import org.apache.james.core.MimeMessageInputStreamSource; -import org.apache.james.core.MimeMessageWrapper; import org.apache.james.queue.MailQueue; import org.apache.james.queue.jms.JMSMailQueue; import org.apache.mailet.Mail; @@ -108,87 +105,15 @@ public class ActiveMQMailQueue extends J public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, final Log logger) { this(connectionFactory, queuename, DISABLE_TRESHOLD, logger); } - - /* - * (non-Javadoc) - * @see org.apache.james.queue.activemq.MailQueue#deQueue() - */ - public void deQueue(DequeueOperation operation) throws MailQueueException, MessagingException { - Connection connection = null; - Session session = null; - Message message = null; - MessageConsumer consumer = null; - try { - connection = connectionFactory.createConnection(); - connection.start(); - - session = connection.createSession(true, Session.SESSION_TRANSACTED); - Queue queue = session.createQueue(queuename); - consumer = session.createConsumer(queue); - message = consumer.receive(); - - if (message == null){ - return; - } - - Mail mail = createMail(message); - operation.process(mail); - session.commit(); - if (message instanceof ActiveMQBlobMessage) { - // delete the file - // This should get removed once this jira issue was fixed - // https://issues.apache.org/activemq/browse/AMQ-1529 - try { - ((ActiveMQBlobMessage) message).deleteFile(); - } catch (IOException e) { - logger.info("Unable to delete blob message file for mail " + mail.getName()); - } - } - } catch (JMSException e) { - throw new MailQueueException("Unable to dequeue next message", e); - } catch (MessagingException e) { - - if (session != null) { - try { - session.rollback(); - } catch (JMSException e1) { - // ignore on rollback - } - } - } finally { - if (consumer != null) { - - try { - consumer.close(); - } catch (JMSException e1) { - // ignore on rollback - } - } - try { - if (session != null) session.close(); - } catch (JMSException e) { - // ignore here - } - - try { - if (connection != null) connection.close(); - } catch (JMSException e) { - // ignore here - } - } - - - } /* * (non-Javadoc) * @see org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax.jms.Message, org.apache.mailet.Mail) */ - protected void populateMailMimeMessage(Message message, Mail mail) - throws MessagingException { - if (message instanceof BlobMessage) { - try { - BlobMessage blobMessage = (BlobMessage) message; + protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException { + if (message instanceof BlobMessage) { + try { + BlobMessage blobMessage = (BlobMessage) message; try { // store url for later usage. Maybe we can do something smart for RemoteDelivery here // TODO: Check if this makes sense at all @@ -199,15 +124,15 @@ public class ActiveMQMailQueue extends J } mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream()))); - } catch (IOException e) { - throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e); - } catch (JMSException e) { - throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e); - } - } else { - super.populateMailMimeMessage(message, mail); - } - } + } catch (IOException e) { + throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e); + } catch (JMSException e) { + throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e); + } + } else { + super.populateMailMimeMessage(message, mail); + } + } /* * (non-Javadoc) @@ -242,20 +167,10 @@ public class ActiveMQMailQueue extends J } - /* - * (non-Javadoc) - * @see org.apache.james.queue.jms.JMSMailQueue#populateJMSProperties(javax.jms.Message, org.apache.mailet.Mail, long) - */ - protected void populateJMSProperties(Message message, Mail mail, - long delayInMillis) throws JMSException, MessagingException { - if (delayInMillis > 0) { - // This will get picked up by activemq for delay message - message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis); - } - - super.populateJMSProperties(message, mail, delayInMillis); - } - - + @Override + protected MailQueueItem createMailQueueItem(Connection connection, Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException { + Mail mail = createMail(message); + return new ActiveMQMailQueueItem(mail, connection, session, consumer, message, logger); + } } Added: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java?rev=1023741&view=auto ============================================================================== --- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java (added) +++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java Mon Oct 18 11:24:34 2010 @@ -0,0 +1,75 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.activemq; + +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.command.ActiveMQBlobMessage; +import org.apache.commons.logging.Log; +import org.apache.james.queue.MailQueue.MailQueueException; +import org.apache.james.queue.MailQueue.MailQueueItem; +import org.apache.james.queue.jms.JMSMailQueueItem; +import org.apache.mailet.Mail; + +/** + * ActiveMQ {...@link MailQueueItem} implementation which handles Blob-Messages as + * well + * + */ +public class ActiveMQMailQueueItem extends JMSMailQueueItem { + + private final Message message; + private final Log logger; + + public ActiveMQMailQueueItem(Mail mail, Connection connection, Session session, MessageConsumer consumer, Message message, Log logger) { + super(mail, connection, session, consumer); + this.message = message; + this.logger = logger; + } + + @Override + public void done(boolean success) throws MailQueueException { + super.done(success); + if (success) { + if (message instanceof ActiveMQBlobMessage) { + /* + * TODO: Enable this once activemq 5.4.2 was released + // delete the file + // This should get removed once this jira issue was fixed + // https://issues.apache.org/activemq/browse/AMQ-1529 + try { + ((ActiveMQBlobMessage) message).deleteFile(); + } catch (IOException e) { + logger.info("Unable to delete blob message file for mail " + getMail().getName()); + } catch (JMSException e) { + logger.info("Unable to delete blob message file for mail " + getMail().getName()); + } + */ + } + } + } + +} Modified: james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java (original) +++ james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java Mon Oct 18 11:24:34 2010 @@ -54,7 +54,7 @@ public interface MailQueue { * @param unit * @throws MailQueueException */ - public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException, MessagingException; + public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException; /** @@ -63,7 +63,7 @@ public interface MailQueue { * @param mail * @throws MailQueueException */ - public void enQueue(Mail mail) throws MailQueueException, MessagingException; + public void enQueue(Mail mail) throws MailQueueException; /** @@ -73,7 +73,7 @@ public interface MailQueue { * @param dequeueOperation * @throws MailQueueException */ - public void deQueue(DequeueOperation operation) throws MailQueueException, MessagingException; + public MailQueueItem deQueue() throws MailQueueException; /** @@ -92,19 +92,28 @@ public interface MailQueue { } } - + /** * - * Operation which will get executed once a new Mail is ready to process + * */ - public interface DequeueOperation { + public interface MailQueueItem { + + /** + * Return the dequeued {...@link Mail} + * + * @return mail + */ + public Mail getMail(); /** - * Process some action on the mail - * @param mail + * Callback which MUST get called after the operation on the dequeued {...@link Mail} was complete. + * + * This is mostly used to either commit a transaction or rollback. + * + * @param success * @throws MailQueueException - * @throws MessagingException */ - public void process(Mail mail) throws MailQueueException, MessagingException; + public void done(boolean success) throws MailQueueException; } } Modified: james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java (original) +++ james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java Mon Oct 18 11:24:34 2010 @@ -25,8 +25,7 @@ import java.util.concurrent.LinkedBlocki import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import javax.mail.MessagingException; - +import org.apache.james.queue.MailQueue.MailQueueException; import org.apache.mailet.Mail; public class MockMailQueue implements MailQueue{ @@ -42,19 +41,30 @@ public class MockMailQueue implements Ma this.throwException = true; } - public void deQueue(DequeueOperation operation) throws MailQueueException, MessagingException { + public MailQueueItem deQueue() throws MailQueueException { if (throwException) { throwException = false; throw new MailQueueException("Mock"); } try { - operation.process(queue.take()); + final Mail mail = queue.take(); + return new MailQueueItem() { + + public Mail getMail() { + return mail; + } + + public void done(boolean success) throws MailQueueException { + // do nothing here + + } + }; } catch (InterruptedException e) { throw new MailQueueException("Mock",e); } } - public void enQueue(final Mail mail, long delay, TimeUnit unit) throws MailQueueException, MessagingException { + public void enQueue(final Mail mail, long delay, TimeUnit unit) throws MailQueueException { if (throwException) { throwException = false; throw new MailQueueException("Mock"); @@ -72,7 +82,7 @@ public class MockMailQueue implements Ma }, delay, unit); } - public void enQueue(Mail mail) throws MailQueueException, MessagingException { + public void enQueue(Mail mail) throws MailQueueException { if (throwException) { throwException = false; throw new MailQueueException("Mock"); Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original) +++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Mon Oct 18 11:24:34 2010 @@ -108,14 +108,13 @@ public class JMSMailQueue implements Mai * * @see org.apache.james.queue.MailQueue#deQueue(org.apache.james.queue.MailQueue.DequeueOperation) */ - public void deQueue(DequeueOperation operation) throws MailQueueException, MessagingException { + public MailQueueItem deQueue() throws MailQueueException { Connection connection = null; Session session = null; Message message = null; MessageConsumer consumer = null; - boolean received = false; - while(received == false) { + while(true) { try { connection = connectionFactory.createConnection(); connection.start(); @@ -127,48 +126,65 @@ public class JMSMailQueue implements Mai message = consumer.receive(10000); if (message != null) { - received = true; - Mail mail = createMail(message); - operation.process(mail); + return createMailQueueItem(connection, session, consumer, message); + } else { + session.commit(); + + if (consumer != null) { + + try { + consumer.close(); + } catch (JMSException e1) { + // ignore on rollback + } + } + try { + if (session != null) + session.close(); + } catch (JMSException e1) { + // ignore here + } + + try { + if (connection != null) + connection.close(); + } catch (JMSException e1) { + // ignore here + } } - session.commit(); - } catch (JMSException e) { - throw new MailQueueException("Unable to dequeue next message", e); - } catch (MessagingException e) { - - if (session != null) { + } catch (Exception e) { + try { + session.rollback(); + } catch (JMSException e1) { + // ignore on rollback + } + + if (consumer != null) { + try { - session.rollback(); + consumer.close(); } catch (JMSException e1) { - // ignore on rollback + // ignore on rollback } } - } finally { - if (consumer != null) { - - try { - consumer.close(); - } catch (JMSException e1) { - // ignore on rollback - } - } - try { - if (session != null) - session.close(); - } catch (JMSException e) { - // ignore here - } + try { + if (session != null) + session.close(); + } catch (JMSException e1) { + // ignore here + } - try { - if (connection != null) - connection.close(); - } catch (JMSException e) { - // ignore here - } + try { + if (connection != null) + connection.close(); + } catch (JMSException e1) { + // ignore here + } + throw new MailQueueException("Unable to dequeue next message", e); } } - + } /* @@ -176,7 +192,7 @@ public class JMSMailQueue implements Mai * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail, long, java.util.concurrent.TimeUnit) */ public void enQueue(Mail mail, long delay, TimeUnit unit) - throws MailQueueException, MessagingException { + throws MailQueueException { Connection connection = null; Session session = null; MessageProducer producer = null; @@ -238,8 +254,7 @@ public class JMSMailQueue implements Mai * (non-Javadoc) * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail) */ - public void enQueue(Mail mail) throws MailQueueException, - MessagingException { + public void enQueue(Mail mail) throws MailQueueException{ enQueue(mail, 0, TimeUnit.MILLISECONDS); } @@ -442,5 +457,10 @@ public class JMSMailQueue implements Mai public String toString() { return "MailQueue:" + queuename; } + + protected MailQueueItem createMailQueueItem(Connection connection, Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException{ + final Mail mail = createMail(message); + return new JMSMailQueueItem(mail, connection, session, consumer); + } } Added: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java?rev=1023741&view=auto ============================================================================== --- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java (added) +++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java Mon Oct 18 11:24:34 2010 @@ -0,0 +1,101 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.jms; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.james.queue.MailQueue.MailQueueException; +import org.apache.james.queue.MailQueue.MailQueueItem; +import org.apache.mailet.Mail; + +/** + * JMS {...@link MailQueueItem} implementation + * + */ +public class JMSMailQueueItem implements MailQueueItem { + + private final Mail mail; + private final Connection connection; + private final Session session; + private final MessageConsumer consumer; + + public JMSMailQueueItem(Mail mail, Connection connection, Session session, MessageConsumer consumer) { + this.mail = mail; + this.connection = connection; + this.session = session; + this.consumer = consumer; + } + + /* + * (non-Javadoc) + * + * @see org.apache.james.queue.MailQueue.MailQueueItem#done(boolean) + */ + public void done(boolean success) throws MailQueueException { + try { + if (success) { + session.commit(); + } else { + try { + session.rollback(); + } catch (JMSException e1) { + // ignore on rollback + } + } + } catch (JMSException ex) { + throw new MailQueueException("Unable to commit dequeue operation for mail " + mail.getName(), ex); + } finally { + if (consumer != null) { + + try { + consumer.close(); + } catch (JMSException e1) { + // ignore on rollback + } + } + try { + if (session != null) + session.close(); + } catch (JMSException e) { + // ignore here + } + + try { + if (connection != null) + connection.close(); + } catch (JMSException e) { + // ignore here + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.james.queue.MailQueue.MailQueueItem#getMail() + */ + public Mail getMail() { + return mail; + } + +} Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1023741&r1=1023740&r2=1023741&view=diff ============================================================================== --- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java (original) +++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java Mon Oct 18 11:24:34 2010 @@ -30,7 +30,6 @@ import java.util.concurrent.atomic.Atomi import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; -import javax.mail.MessagingException; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.HierarchicalConfiguration; @@ -43,8 +42,7 @@ import org.apache.james.mailetcontainer. import org.apache.james.mailetcontainer.MailetContainer; import org.apache.james.queue.MailQueue; import org.apache.james.queue.MailQueueFactory; -import org.apache.james.queue.MailQueue.DequeueOperation; -import org.apache.james.queue.MailQueue.MailQueueException; +import org.apache.james.queue.MailQueue.MailQueueItem; import org.apache.james.services.SpoolManager; import org.apache.mailet.Mail; import org.apache.mailet.Mailet; @@ -157,31 +155,30 @@ public class JamesSpoolManager implement numActive.incrementAndGet(); try { - queue.deQueue(new DequeueOperation() { - - /* - * (non-Javadoc) - * @see org.apache.james.queue.activemq.MailQueue.DequeueOperation#process(org.apache.mailet.Mail) - */ - public void process(Mail mail) throws MailQueueException, MessagingException { - if (logger.isDebugEnabled()) { - StringBuffer debugBuffer = - new StringBuffer(64) - .append("==== Begin processing mail ") - .append(mail.getName()) - .append("===="); - logger.debug(debugBuffer.toString()); - } - - try { - mailProcessor.service(mail); - } finally { - LifecycleUtil.dispose(mail); - mail = null; - } + MailQueueItem queueItem = queue.deQueue(); + Mail mail = queueItem.getMail(); + if (logger.isDebugEnabled()) { + StringBuffer debugBuffer = + new StringBuffer(64) + .append("==== Begin processing mail ") + .append(mail.getName()) + .append("===="); + logger.debug(debugBuffer.toString()); + } + + try { + mailProcessor.service(mail); + queueItem.done(true); + } catch (Exception e) { + if (active.get() && logger.isErrorEnabled()) { + logger.error("Exception processing mail in JamesSpoolManager.run " + e.getMessage(), e); } - }); - + queueItem.done(false); + + } finally { + LifecycleUtil.dispose(mail); + mail = null; + } } catch (Throwable e) { if (active.get() && logger.isErrorEnabled()) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org