Author: norman
Date: Thu Sep 22 12:40:33 2011
New Revision: 1174095

URL: http://svn.apache.org/viewvc?rev=1174095&view=rev
Log:
Make sure the FileMailQueue also handles messages with delay the right way. See 
JAMES-1316 

Modified:
    
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java

Modified: 
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
URL: 
http://svn.apache.org/viewvc/james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java?rev=1174095&r1=1174094&r2=1174095&view=diff
==============================================================================
--- 
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
 (original)
+++ 
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
 Thu Sep 22 12:40:33 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.mail.MessagingException;
 import javax.mail.util.SharedFileInputStream;
@@ -61,7 +62,7 @@ public class FileMailQueue implements Ma
     private ConcurrentHashMap<String, FileItem> keyMappings = new 
ConcurrentHashMap<String, FileMailQueue.FileItem>();
     private BlockingQueue<String> inmemoryQueue = new 
LinkedBlockingQueue<String>();
     private ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
-    
+    private final static AtomicLong COUNTER = new AtomicLong();
     private final String queuename;
     private final File parentDir;
     private String queueDirName;
@@ -69,6 +70,7 @@ public class FileMailQueue implements Ma
     private boolean sync;
     private final static String MSG_EXTENSION = ".msg";
     private final static String OBJECT_EXTENSION = ".obj";
+    private final static String NEXT_DELIVERY = "FileQueueNextDelivery";
     
     public FileMailQueue(File parentDir, String queuename, boolean sync, 
Logger log) throws IOException {
         this.queuename = queuename;
@@ -91,26 +93,34 @@ public class FileMailQueue implements Ma
                 
                 @Override
                 public boolean accept(File dir, String name) {
-                    if (name.endsWith(MSG_EXTENSION)) {
-                        if (new File(dir, name.substring(0, name.length() 
-MSG_EXTENSION.length()) + OBJECT_EXTENSION).exists()) {
-                            return true;
-                        }
+                    if (name.endsWith(OBJECT_EXTENSION)) {
+                        return true;
                     }
                     return false;
                 }
             });
             for (int a = 0; a < files.length; a++) {
                 final String name = files[a];
+                ObjectInputStream oin = null;
                 
-                int i = name.indexOf("-");
-                if ( i > -1) {
-                    final String objectFilename = name.substring(0, 
name.length() - MSG_EXTENSION.length()) + OBJECT_EXTENSION;
-                    long next = Long.parseLong(name.substring(0,i));
-                    final String key = name.substring(i +1, name.length() - 
MSG_EXTENSION.length());
-                    FileItem item = new FileItem(queueDirName + "/" + 
objectFilename, queueDirName + "/" + name);
+
+                try {
+                    final String msgFileName = name.substring(0, name.length() 
- OBJECT_EXTENSION.length()) + MSG_EXTENSION;
+
+                    FileItem item = new FileItem(queueDirName + "/" + name, 
queueDirName + "/" + msgFileName);
+
+                    oin = new ObjectInputStream(new 
FileInputStream(item.getObjectFile()));
+                    Mail mail = (Mail) oin.readObject();
+                    Long next = (Long) mail.getAttribute(NEXT_DELIVERY);
+                    if (next == null) {
+                        next = 0L;
+                    }
+
+
+                    final String key = mail.getName();
                     keyMappings.put(key, item);
                     if (next <= System.currentTimeMillis()) {
-                        
+
                         try {
                             inmemoryQueue.put(key);
                         } catch (InterruptedException e) {
@@ -118,10 +128,11 @@ public class FileMailQueue implements Ma
                             throw new RuntimeException("Unable to init", e);
                         }
                     } else {
-                        
-                        // Schedule a task which will put the mail in the 
queue for processing after a given delay
+
+                        // Schedule a task which will put the mail in the queue
+                        // for processing after a given delay
                         scheduler.schedule(new Runnable() {
-                            
+
                             @Override
                             public void run() {
                                 try {
@@ -129,11 +140,25 @@ public class FileMailQueue implements Ma
                                 } catch (InterruptedException e) {
                                     Thread.currentThread().interrupt();
                                     throw new RuntimeException("Unable to 
init", e);
-                                }                                
+                                }
                             }
                         }, next - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
                     }
+
+                } catch (ClassNotFoundException e1) {
+                    log.error("Unable to load Mail", e1);
+                } catch (IOException e) {
+                    log.error("Unable to load Mail", e);
+                } finally {
+                    if (oin != null) {
+                        try {
+                            oin.close();
+                        } catch (Exception e) {
+                            // ignore on close
+                        }
+                    }
                 }
+
             }
         }
     }
@@ -142,13 +167,13 @@ public class FileMailQueue implements Ma
     
     
     @Override
-    public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
-        final String key = mail.getName();
+    public void enQueue(final Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
+        final String key = mail.getName() + "-" + COUNTER.incrementAndGet();
         FileOutputStream out = null;
         FileOutputStream foout = null;
         ObjectOutputStream oout = null;
         try {
-            String name = getFileNameWithoutExtension(queueDirName, key, 
unit.toMillis(delay));
+            String name = queueDirName + "/" + key;
             
             final FileItem item = new FileItem(name + OBJECT_EXTENSION, name + 
MSG_EXTENSION);
 
@@ -158,20 +183,24 @@ public class FileMailQueue implements Ma
             oout.flush();
             if (sync) foout.getFD().sync();
             out = new FileOutputStream(item.getMessageFile());
+           
             mail.getMessage().writeTo(out);
             out.flush();
             if (sync) out.getFD().sync();
             
             keyMappings.put(key, item);
-            
+        
+
             if (delay > 0) {
+                mail.setAttribute(NEXT_DELIVERY, System.currentTimeMillis() + 
unit.toMillis(delay));
                 // The message should get delayed so schedule it for later 
                 scheduler.schedule(new Runnable() {
                     
                     @Override
                     public void run() {
-                        try {
+                        try {                           
                             inmemoryQueue.put(key);
+
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                             throw new RuntimeException("Unable to init", e);
@@ -220,9 +249,6 @@ public class FileMailQueue implements Ma
 
     }
 
-    private String getFileNameWithoutExtension(String parentdir, String name, 
long delay) {
-        return parentdir + "/" + System.currentTimeMillis() + delay + "-" + 
name;
-    }
     
     
     @Override
@@ -237,7 +263,9 @@ public class FileMailQueue implements Ma
             String k = null;
             while (item == null) {
                 k = inmemoryQueue.take();
+                
                 item = keyMappings.get(k);
+
             }
             final String key = k;
             ObjectInputStream oin = null;



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to