Author: norman
Date: Mon Oct 18 11:24:34 2010
New Revision: 1023741

URL: http://svn.apache.org/viewvc?rev=1023741&view=rev
Log:
Refactor MailQueue interface to return a MailQueueItem. This is more elegant 
then before and allows better integration with multithreading

Added:
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
Modified:
    
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
    
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
    
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
    
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
    
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java

Modified: 
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
 (original)
+++ 
james/server/trunk/mailets/src/main/java/org/apache/james/transport/mailets/RemoteDelivery.java
 Mon Oct 18 11:24:34 2010
@@ -26,8 +26,8 @@ import org.apache.james.dnsservice.api.T
 import org.apache.james.lifecycle.LifecycleUtil;
 import org.apache.james.queue.MailQueue;
 import org.apache.james.queue.MailQueueFactory;
-import org.apache.james.queue.MailQueue.DequeueOperation;
 import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
 import org.apache.james.services.MailServer;
 import org.apache.james.util.TimeConverter;
 import org.apache.mailet.base.GenericMailet;
@@ -710,65 +710,65 @@ public class RemoteDelivery extends Gene
                     // of time to block is determined by the 'getWaitTime'
                     // method of the
                     // MultipleDelayFilter.
-                    queue.deQueue(new DequeueOperation() {
+                    MailQueueItem queueItem = queue.deQueue();
+                    Mail mail = queueItem.getMail();
+                    
+                    String key = mail.getName();
+                    try {
+                        if (isDebug) {
+                            String message = Thread.currentThread().getName()
+                                    + " will process mail " + key;
+                            log(message);
+                        }
                         
-                        public void process(Mail mail) throws 
MailQueueException {
-                            String key = mail.getName();
+                        // Deliver message
+                        if (deliver(mail, session)) {
+                            // Message was successfully delivered/fully 
failed... 
+                            // delete it
+                            LifecycleUtil.dispose(mail);
+                            //workRepository.remove(key);
+                        } else {
+                            // Something happened that will delay delivery.
+                            // Store it back in the retry repository.
+                            //workRepository.store(mail);
+                            int retries = 0;
                             try {
-                                if (isDebug) {
-                                    String message = 
Thread.currentThread().getName()
-                                            + " will process mail " + key;
-                                    log(message);
-                                }
-                                
-                                // Deliver message
-                                if (deliver(mail, session)) {
-                                    // Message was successfully 
delivered/fully failed... 
-                                    // delete it
-                                    LifecycleUtil.dispose(mail);
-                                    //workRepository.remove(key);
-                                } else {
-                                    // Something happened that will delay 
delivery.
-                                    // Store it back in the retry repository.
-                                    //workRepository.store(mail);
-                                    int retries = 0;
-                                    try {
-                                        retries = 
Integer.parseInt(mail.getErrorMessage());
-                                    } catch (NumberFormatException e) {
-                                        // Something strange was happen with 
the errorMessage.. 
-                                    }
-                                    
-                                    long delay =  getNextDelay (retries);
-                                    queue.enQueue(mail, delay, 
TimeUnit.MILLISECONDS);
-                                    LifecycleUtil.dispose(mail);
-
-                                    // This is an update, so we have to unlock 
and
-                                    // notify or this mail is kept locked by 
this thread.
-                                    //workRepository.unlock(key);
-                                    
-                                    // Note: We do not notify because we 
updated an
-                                    // already existing mail and we are now 
free to handle 
-                                    // more mails.
-                                    // Furthermore this mail should not be 
processed now
-                                    // because we have a retry time scheduling.
-                                }
-                                
-                                // Clear the object handle to make sure it 
recycles
-                                // this object.
-                                mail = null;
-                            } catch (Exception e) {
-                                // Prevent unexpected exceptions from causing 
looping by
-                                // removing message from outgoing.
-                                // DO NOT CHANGE THIS to catch Error! For 
example, if
-                                // there were an OutOfMemory condition caused 
because 
-                                // something else in the server was abusing 
memory, we would 
-                                // not want to start purging the retrying 
spool!
-                                LifecycleUtil.dispose(mail);
-                                //workRepository.remove(key);
-                                throw new MailQueueException("Unable to 
perform dequeue", e);
+                                retries = 
Integer.parseInt(mail.getErrorMessage());
+                            } catch (NumberFormatException e) {
+                                // Something strange was happen with the 
errorMessage.. 
                             }
+                            
+                            long delay =  getNextDelay (retries);
+                            queue.enQueue(mail, delay, TimeUnit.MILLISECONDS);
+                            LifecycleUtil.dispose(mail);
+
+                            // This is an update, so we have to unlock and
+                            // notify or this mail is kept locked by this 
thread.
+                            //workRepository.unlock(key);
+                            
+                            // Note: We do not notify because we updated an
+                            // already existing mail and we are now free to 
handle 
+                            // more mails.
+                            // Furthermore this mail should not be processed 
now
+                            // because we have a retry time scheduling.
                         }
-                    });
+                        
+                        // Clear the object handle to make sure it recycles
+                        // this object.
+                        mail = null;
+                        queueItem.done(true);
+                    } catch (Exception e) {
+                        // Prevent unexpected exceptions from causing looping 
by
+                        // removing message from outgoing.
+                        // DO NOT CHANGE THIS to catch Error! For example, if
+                        // there were an OutOfMemory condition caused because 
+                        // something else in the server was abusing memory, we 
would 
+                        // not want to start purging the retrying spool!
+                        LifecycleUtil.dispose(mail);
+                        //workRepository.remove(key);
+                        queueItem.done(false);
+                        throw new MailQueueException("Unable to perform 
dequeue", e);
+                    }
                     
                    
                 } catch (Throwable e) {

Modified: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 (original)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
 Mon Oct 18 11:24:34 2010
@@ -27,20 +27,17 @@ import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
-import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
-import org.apache.activemq.command.ActiveMQBlobMessage;
 import org.apache.activemq.pool.PooledSession;
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStream;
 import org.apache.james.core.MimeMessageInputStreamSource;
-import org.apache.james.core.MimeMessageWrapper;
 import org.apache.james.queue.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
@@ -108,87 +105,15 @@ public class ActiveMQMailQueue extends J
     public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final 
String queuename, final Log logger) {
         this(connectionFactory, queuename, DISABLE_TRESHOLD, logger);
     }
-    
-    /*
-     * (non-Javadoc)
-     * @see org.apache.james.queue.activemq.MailQueue#deQueue()
-     */
-    public void deQueue(DequeueOperation operation) throws MailQueueException, 
MessagingException {   
-        Connection connection = null;
-        Session session = null;
-        Message message = null;
-        MessageConsumer consumer = null;
-        try {
-            connection = connectionFactory.createConnection();
-            connection.start();
-            
-            session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
-            Queue queue = session.createQueue(queuename);
-            consumer = session.createConsumer(queue);
-            message = consumer.receive();
-            
-            if (message == null){
-               return;
-            }
-            
-            Mail mail = createMail(message);
-            operation.process(mail);
-            session.commit();
-            if (message instanceof ActiveMQBlobMessage) {
-                // delete the file
-               // This should get removed once this jira issue was fixed
-               // https://issues.apache.org/activemq/browse/AMQ-1529
-                try {
-                    ((ActiveMQBlobMessage) message).deleteFile();
-                } catch (IOException e) {
-                    logger.info("Unable to delete blob message file for mail " 
+ mail.getName());
-                }
-            }
-        } catch (JMSException e) {
-            throw new MailQueueException("Unable to dequeue next message", e);
-        } catch (MessagingException e) {
-               
-            if (session != null) {
-                try {
-                    session.rollback();
-                } catch (JMSException e1) {
-                    // ignore on rollback
-                }
-            }
-        } finally {
-               if (consumer != null) {
-                       
-                       try {
-                                       consumer.close();
-                               } catch (JMSException e1) {
-                    // ignore on rollback
-                               }
-               }
-            try {
-                if (session != null) session.close();
-            } catch (JMSException e) {
-                // ignore here
-            }
-            
-            try {
-                if (connection != null)  connection.close();
-            } catch (JMSException e) {
-                // ignore here
-            }
-        }
-      
-       
-    }
 
     /*
      * (non-Javadoc)
      * @see 
org.apache.james.queue.jms.JMSMailQueue#populateMailMimeMessage(javax.jms.Message,
 org.apache.mailet.Mail)
      */
-       protected void populateMailMimeMessage(Message message, Mail mail)
-                       throws MessagingException {
-                if (message instanceof BlobMessage) {
-                        try {
-                                BlobMessage blobMessage = (BlobMessage) 
message;
+    protected void populateMailMimeMessage(Message message, Mail mail) throws 
MessagingException {
+        if (message instanceof BlobMessage) {
+            try {
+                BlobMessage blobMessage = (BlobMessage) message;
                  try {
                      // store url for later usage. Maybe we can do something 
smart for RemoteDelivery here
                      // TODO: Check if this makes sense at all
@@ -199,15 +124,15 @@ public class ActiveMQMailQueue extends J
                  }
                  mail.setMessage(new MimeMessageCopyOnWriteProxy(new 
MimeMessageInputStreamSource(mail.getName(), blobMessage.getInputStream())));
 
-                        } catch (IOException e) {
-                                throw new MailQueueException("Unable to 
populate MimeMessage for mail " + mail.getName(), e);
-                        } catch (JMSException e) {
-                                throw new MailQueueException("Unable to 
populate MimeMessage for mail " + mail.getName(), e);
-                        }
-                } else {
-             super.populateMailMimeMessage(message, mail);
-                }
-       }
+            } catch (IOException e) {
+                throw new MailQueueException("Unable to populate MimeMessage 
for mail " + mail.getName(), e);
+            } catch (JMSException e) {
+                throw new MailQueueException("Unable to populate MimeMessage 
for mail " + mail.getName(), e);
+            }
+        } else {
+            super.populateMailMimeMessage(message, mail);
+        }
+    }
 
        /*
         * (non-Javadoc)
@@ -242,20 +167,10 @@ public class ActiveMQMailQueue extends J
 
        }
 
-       /*
-        * (non-Javadoc)
-        * @see 
org.apache.james.queue.jms.JMSMailQueue#populateJMSProperties(javax.jms.Message,
 org.apache.mailet.Mail, long)
-        */
-       protected void populateJMSProperties(Message message, Mail mail,
-                       long delayInMillis) throws JMSException, 
MessagingException {
-               if (delayInMillis > 0) {
-            // This will get picked up by activemq for delay message
-            
message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY,
 delayInMillis);
-        }
-               
-               super.populateJMSProperties(message, mail, delayInMillis);
-       }
-       
-       
+    @Override
+    protected MailQueueItem createMailQueueItem(Connection connection, Session 
session, MessageConsumer consumer, Message message) throws JMSException, 
MessagingException {
+        Mail mail = createMail(message);
+        return new ActiveMQMailQueueItem(mail, connection, session, consumer, 
message, logger);
+    }
 
 }

Added: 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java?rev=1023741&view=auto
==============================================================================
--- 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
 (added)
+++ 
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueItem.java
 Mon Oct 18 11:24:34 2010
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.commons.logging.Log;
+import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
+import org.apache.james.queue.jms.JMSMailQueueItem;
+import org.apache.mailet.Mail;
+
+/**
+ * ActiveMQ {...@link MailQueueItem} implementation which handles 
Blob-Messages as
+ * well
+ * 
+ */
+public class ActiveMQMailQueueItem extends JMSMailQueueItem {
+
+    private final Message message;
+    private final Log logger;
+
+    public ActiveMQMailQueueItem(Mail mail, Connection connection, Session 
session, MessageConsumer consumer, Message message, Log logger) {
+        super(mail, connection, session, consumer);
+        this.message = message;
+        this.logger = logger;
+    }
+
+    @Override
+    public void done(boolean success) throws MailQueueException {
+        super.done(success);
+        if (success) {
+            if (message instanceof ActiveMQBlobMessage) {
+                /*
+                 * TODO: Enable this once activemq 5.4.2 was released
+                // delete the file
+                // This should get removed once this jira issue was fixed
+                // https://issues.apache.org/activemq/browse/AMQ-1529
+                try {
+                    ((ActiveMQBlobMessage) message).deleteFile();
+                } catch (IOException e) {
+                    logger.info("Unable to delete blob message file for mail " 
+ getMail().getName());
+                } catch (JMSException e) {
+                    logger.info("Unable to delete blob message file for mail " 
+ getMail().getName());
+                }
+                */
+            }
+        }
+    }
+
+}

Modified: 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
 (original)
+++ 
james/server/trunk/queue-api/src/main/java/org/apache/james/queue/MailQueue.java
 Mon Oct 18 11:24:34 2010
@@ -54,7 +54,7 @@ public interface MailQueue {
      * @param unit
      * @throws MailQueueException
      */
-    public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException, MessagingException;
+    public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException;
     
     
     /**
@@ -63,7 +63,7 @@ public interface MailQueue {
      * @param mail
      * @throws MailQueueException
      */
-    public void enQueue(Mail mail) throws MailQueueException, 
MessagingException;
+    public void enQueue(Mail mail) throws MailQueueException;
     
     
     /**
@@ -73,7 +73,7 @@ public interface MailQueue {
      * @param dequeueOperation
      * @throws MailQueueException
      */
-    public void deQueue(DequeueOperation operation) throws MailQueueException, 
MessagingException;
+    public MailQueueItem deQueue() throws MailQueueException;
     
     
     /**
@@ -92,19 +92,28 @@ public interface MailQueue {
         }
     }
     
-    
+
     /**
      * 
-     * Operation which will get executed once a new Mail is ready to process
+     *
      */
-    public interface DequeueOperation {
+    public interface MailQueueItem {
+
+        /**
+         * Return the dequeued {...@link Mail}
+         * 
+         * @return mail
+         */
+        public Mail getMail();
         
         /**
-         * Process some action on the mail
-         * @param mail
+         * Callback which MUST get called after the operation on the dequeued 
{...@link Mail} was complete. 
+         * 
+         * This is mostly used to either commit a transaction or rollback. 
+         *  
+         * @param success 
          * @throws MailQueueException
-         * @throws MessagingException
          */
-        public void process(Mail mail) throws MailQueueException, 
MessagingException;
+        public void done(boolean success) throws MailQueueException;
     }
 }

Modified: 
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
 (original)
+++ 
james/server/trunk/queue-api/src/test/java/org/apache/james/queue/MockMailQueue.java
 Mon Oct 18 11:24:34 2010
@@ -25,8 +25,7 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import javax.mail.MessagingException;
-
+import org.apache.james.queue.MailQueue.MailQueueException;
 import org.apache.mailet.Mail;
 
 public class MockMailQueue implements MailQueue{
@@ -42,19 +41,30 @@ public class MockMailQueue implements Ma
         this.throwException = true;
     }
     
-    public void deQueue(DequeueOperation operation) throws MailQueueException, 
MessagingException {
+    public MailQueueItem deQueue() throws MailQueueException {
         if (throwException) {
             throwException = false;
             throw new MailQueueException("Mock");
         }
         try {
-            operation.process(queue.take());
+               final Mail mail = queue.take();
+            return new MailQueueItem() {
+                               
+                               public Mail getMail() {
+                                       return mail;
+                               }
+                               
+                               public void done(boolean success) throws 
MailQueueException {
+                                       // do nothing here
+                                       
+                               }
+                       };
         } catch (InterruptedException e) {
             throw new MailQueueException("Mock",e);
         }
     }
 
-    public void enQueue(final Mail mail, long delay, TimeUnit unit) throws 
MailQueueException, MessagingException {
+    public void enQueue(final Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
         if (throwException) {
             throwException = false;
             throw new MailQueueException("Mock");
@@ -72,7 +82,7 @@ public class MockMailQueue implements Ma
         }, delay, unit);
     }
 
-    public void enQueue(Mail mail) throws MailQueueException, 
MessagingException {
+    public void enQueue(Mail mail) throws MailQueueException {
         if (throwException) {
             throwException = false;
             throw new MailQueueException("Mock");

Modified: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 (original)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
 Mon Oct 18 11:24:34 2010
@@ -108,14 +108,13 @@ public class JMSMailQueue implements Mai
      * 
      * @see 
org.apache.james.queue.MailQueue#deQueue(org.apache.james.queue.MailQueue.DequeueOperation)
      */
-    public void deQueue(DequeueOperation operation) throws MailQueueException, 
MessagingException {   
+    public MailQueueItem deQueue() throws MailQueueException {   
         Connection connection = null;
         Session session = null;
         Message message = null;
         MessageConsumer consumer = null;
-        boolean received = false;
         
-        while(received == false) {
+        while(true) {
             try {
                 connection = connectionFactory.createConnection();
                 connection.start();
@@ -127,48 +126,65 @@ public class JMSMailQueue implements Mai
                 message = consumer.receive(10000);
            
                 if (message != null) {
-                       received = true;
-                    Mail mail = createMail(message);
-                    operation.process(mail);
+                    return createMailQueueItem(connection, session, consumer, 
message);
+                } else {
+                    session.commit();
+                    
+                    if (consumer != null) {
+
+                        try {
+                            consumer.close();
+                        } catch (JMSException e1) {
+                            // ignore on rollback
+                        }
+                    }
+                    try {
+                        if (session != null)
+                            session.close();
+                    } catch (JMSException e1) {
+                        // ignore here
+                    }
+
+                    try {
+                        if (connection != null)
+                            connection.close();
+                    } catch (JMSException e1) {
+                        // ignore here
+                    }
                 }
-                session.commit();
          
-            } catch (JMSException e) {
-                throw new MailQueueException("Unable to dequeue next message", 
e);
-            } catch (MessagingException e) {
-               
-                if (session != null) {
+            } catch (Exception e) {
+                try {
+                    session.rollback();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+
+                if (consumer != null) {
+
                     try {
-                        session.rollback();
+                        consumer.close();
                     } catch (JMSException e1) {
-                    // ignore on rollback
+                        // ignore on rollback
                     }
                 }
-            } finally {
-               if (consumer != null) {
-                       
-                           try {
-                                       consumer.close();
-                               } catch (JMSException e1) {
-                        // ignore on rollback
-                                   }
-                               }
-                               try {
-                                       if (session != null)
-                                               session.close();
-                               } catch (JMSException e) {
-                                       // ignore here
-                               }
+                try {
+                    if (session != null)
+                        session.close();
+                } catch (JMSException e1) {
+                    // ignore here
+                }
 
-                               try {
-                                       if (connection != null)
-                                               connection.close();
-                               } catch (JMSException e) {
-                                       // ignore here
-                               }
+                try {
+                    if (connection != null)
+                        connection.close();
+                } catch (JMSException e1) {
+                    // ignore here
+                }
+                throw new MailQueueException("Unable to dequeue next message", 
e);
             }
         }
-       
+
     }
     
     /*
@@ -176,7 +192,7 @@ public class JMSMailQueue implements Mai
      * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail, 
long, java.util.concurrent.TimeUnit)
      */
        public void enQueue(Mail mail, long delay, TimeUnit unit)
-                       throws MailQueueException, MessagingException {
+                       throws MailQueueException {
                Connection connection = null;
                Session session = null;
                MessageProducer producer = null;
@@ -238,8 +254,7 @@ public class JMSMailQueue implements Mai
         * (non-Javadoc)
         * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
         */
-       public void enQueue(Mail mail) throws MailQueueException,
-                       MessagingException {
+       public void enQueue(Mail mail) throws MailQueueException{
                enQueue(mail, 0, TimeUnit.MILLISECONDS);
        }
        
@@ -442,5 +457,10 @@ public class JMSMailQueue implements Mai
     public String toString() {
         return "MailQueue:" + queuename;
     }
+    
+    protected MailQueueItem createMailQueueItem(Connection connection, Session 
session, MessageConsumer consumer, Message message) throws JMSException, 
MessagingException{
+       final Mail mail = createMail(message);             
+        return new JMSMailQueueItem(mail, connection, session, consumer);
+    }
 
 }

Added: 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java?rev=1023741&view=auto
==============================================================================
--- 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
 (added)
+++ 
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueueItem.java
 Mon Oct 18 11:24:34 2010
@@ -0,0 +1,101 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.jms;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
+import org.apache.mailet.Mail;
+
+/**
+ * JMS {...@link MailQueueItem} implementation
+ * 
+ */
+public class JMSMailQueueItem implements MailQueueItem {
+
+    private final Mail mail;
+    private final Connection connection;
+    private final Session session;
+    private final MessageConsumer consumer;
+
+    public JMSMailQueueItem(Mail mail, Connection connection, Session session, 
MessageConsumer consumer) {
+        this.mail = mail;
+        this.connection = connection;
+        this.session = session;
+        this.consumer = consumer;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.james.queue.MailQueue.MailQueueItem#done(boolean)
+     */
+    public void done(boolean success) throws MailQueueException {
+        try {
+            if (success) {
+                session.commit();
+            } else {
+                try {
+                    session.rollback();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+        } catch (JMSException ex) {
+            throw new MailQueueException("Unable to commit dequeue operation 
for mail " + mail.getName(), ex);
+        } finally {
+            if (consumer != null) {
+
+                try {
+                    consumer.close();
+                } catch (JMSException e1) {
+                    // ignore on rollback
+                }
+            }
+            try {
+                if (session != null)
+                    session.close();
+            } catch (JMSException e) {
+                // ignore here
+            }
+
+            try {
+                if (connection != null)
+                    connection.close();
+            } catch (JMSException e) {
+                // ignore here
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.james.queue.MailQueue.MailQueueItem#getMail()
+     */
+    public Mail getMail() {
+        return mail;
+    }
+
+}

Modified: 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1023741&r1=1023740&r2=1023741&view=diff
==============================================================================
--- 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
 (original)
+++ 
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
 Mon Oct 18 11:24:34 2010
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.Atomi
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
-import javax.mail.MessagingException;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.HierarchicalConfiguration;
@@ -43,8 +42,7 @@ import org.apache.james.mailetcontainer.
 import org.apache.james.mailetcontainer.MailetContainer;
 import org.apache.james.queue.MailQueue;
 import org.apache.james.queue.MailQueueFactory;
-import org.apache.james.queue.MailQueue.DequeueOperation;
-import org.apache.james.queue.MailQueue.MailQueueException;
+import org.apache.james.queue.MailQueue.MailQueueItem;
 import org.apache.james.services.SpoolManager;
 import org.apache.mailet.Mail;
 import org.apache.mailet.Mailet;
@@ -157,31 +155,30 @@ public class JamesSpoolManager implement
             numActive.incrementAndGet();
 
             try {
-                queue.deQueue(new DequeueOperation() {
-                    
-                    /*
-                     * (non-Javadoc)
-                     * @see 
org.apache.james.queue.activemq.MailQueue.DequeueOperation#process(org.apache.mailet.Mail)
-                     */
-                    public void process(Mail mail) throws MailQueueException, 
MessagingException {
-                        if (logger.isDebugEnabled()) {
-                            StringBuffer debugBuffer =
-                                new StringBuffer(64)
-                                        .append("==== Begin processing mail ")
-                                        .append(mail.getName())
-                                        .append("====");
-                            logger.debug(debugBuffer.toString());
-                        }
-
-                        try {
-                            mailProcessor.service(mail);             
-                        } finally {
-                            LifecycleUtil.dispose(mail);
-                            mail = null;
-                        }
+                MailQueueItem queueItem = queue.deQueue();
+                Mail mail = queueItem.getMail();
+                if (logger.isDebugEnabled()) {
+                    StringBuffer debugBuffer =
+                        new StringBuffer(64)
+                                .append("==== Begin processing mail ")
+                                .append(mail.getName())
+                                .append("====");
+                    logger.debug(debugBuffer.toString());
+                }
+
+                try {
+                    mailProcessor.service(mail);
+                    queueItem.done(true);
+                } catch (Exception e) {
+                    if (active.get() && logger.isErrorEnabled()) {
+                        logger.error("Exception processing mail in 
JamesSpoolManager.run " + e.getMessage(), e);
                     }
-                });
-                
+                    queueItem.done(false);
+
+                } finally {
+                    LifecycleUtil.dispose(mail);
+                    mail = null;
+                }
                
             } catch (Throwable e) {
                 if (active.get() && logger.isErrorEnabled()) {



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to