Author: norman Date: Thu Sep 22 12:40:33 2011 New Revision: 1174095 URL: http://svn.apache.org/viewvc?rev=1174095&view=rev Log: Make sure the FileMailQueue also handles messages with delay the right way. See JAMES-1316
Modified: james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java Modified: james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java?rev=1174095&r1=1174094&r2=1174095&view=diff ============================================================================== --- james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java (original) +++ james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java Thu Sep 22 12:40:33 2011 @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.mail.MessagingException; import javax.mail.util.SharedFileInputStream; @@ -61,7 +62,7 @@ public class FileMailQueue implements Ma private ConcurrentHashMap<String, FileItem> keyMappings = new ConcurrentHashMap<String, FileMailQueue.FileItem>(); private BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<String>(); private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - + private final static AtomicLong COUNTER = new AtomicLong(); private final String queuename; private final File parentDir; private String queueDirName; @@ -69,6 +70,7 @@ public class FileMailQueue implements Ma private boolean sync; private final static String MSG_EXTENSION = ".msg"; private final static String OBJECT_EXTENSION = ".obj"; + private final static String NEXT_DELIVERY = "FileQueueNextDelivery"; public FileMailQueue(File parentDir, String queuename, boolean sync, Logger log) throws IOException { this.queuename = queuename; @@ -91,26 +93,34 @@ public class FileMailQueue implements Ma @Override public boolean accept(File dir, String name) { - if (name.endsWith(MSG_EXTENSION)) { - if (new File(dir, name.substring(0, name.length() -MSG_EXTENSION.length()) + OBJECT_EXTENSION).exists()) { - return true; - } + if (name.endsWith(OBJECT_EXTENSION)) { + return true; } return false; } }); for (int a = 0; a < files.length; a++) { final String name = files[a]; + ObjectInputStream oin = null; - int i = name.indexOf("-"); - if ( i > -1) { - final String objectFilename = name.substring(0, name.length() - MSG_EXTENSION.length()) + OBJECT_EXTENSION; - long next = Long.parseLong(name.substring(0,i)); - final String key = name.substring(i +1, name.length() - MSG_EXTENSION.length()); - FileItem item = new FileItem(queueDirName + "/" + objectFilename, queueDirName + "/" + name); + + try { + final String msgFileName = name.substring(0, name.length() - OBJECT_EXTENSION.length()) + MSG_EXTENSION; + + FileItem item = new FileItem(queueDirName + "/" + name, queueDirName + "/" + msgFileName); + + oin = new ObjectInputStream(new FileInputStream(item.getObjectFile())); + Mail mail = (Mail) oin.readObject(); + Long next = (Long) mail.getAttribute(NEXT_DELIVERY); + if (next == null) { + next = 0L; + } + + + final String key = mail.getName(); keyMappings.put(key, item); if (next <= System.currentTimeMillis()) { - + try { inmemoryQueue.put(key); } catch (InterruptedException e) { @@ -118,10 +128,11 @@ public class FileMailQueue implements Ma throw new RuntimeException("Unable to init", e); } } else { - - // Schedule a task which will put the mail in the queue for processing after a given delay + + // Schedule a task which will put the mail in the queue + // for processing after a given delay scheduler.schedule(new Runnable() { - + @Override public void run() { try { @@ -129,11 +140,25 @@ public class FileMailQueue implements Ma } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Unable to init", e); - } + } } }, next - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } + + } catch (ClassNotFoundException e1) { + log.error("Unable to load Mail", e1); + } catch (IOException e) { + log.error("Unable to load Mail", e); + } finally { + if (oin != null) { + try { + oin.close(); + } catch (Exception e) { + // ignore on close + } + } } + } } } @@ -142,13 +167,13 @@ public class FileMailQueue implements Ma @Override - public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException { - final String key = mail.getName(); + public void enQueue(final Mail mail, long delay, TimeUnit unit) throws MailQueueException { + final String key = mail.getName() + "-" + COUNTER.incrementAndGet(); FileOutputStream out = null; FileOutputStream foout = null; ObjectOutputStream oout = null; try { - String name = getFileNameWithoutExtension(queueDirName, key, unit.toMillis(delay)); + String name = queueDirName + "/" + key; final FileItem item = new FileItem(name + OBJECT_EXTENSION, name + MSG_EXTENSION); @@ -158,20 +183,24 @@ public class FileMailQueue implements Ma oout.flush(); if (sync) foout.getFD().sync(); out = new FileOutputStream(item.getMessageFile()); + mail.getMessage().writeTo(out); out.flush(); if (sync) out.getFD().sync(); keyMappings.put(key, item); - + + if (delay > 0) { + mail.setAttribute(NEXT_DELIVERY, System.currentTimeMillis() + unit.toMillis(delay)); // The message should get delayed so schedule it for later scheduler.schedule(new Runnable() { @Override public void run() { - try { + try { inmemoryQueue.put(key); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Unable to init", e); @@ -220,9 +249,6 @@ public class FileMailQueue implements Ma } - private String getFileNameWithoutExtension(String parentdir, String name, long delay) { - return parentdir + "/" + System.currentTimeMillis() + delay + "-" + name; - } @Override @@ -237,7 +263,9 @@ public class FileMailQueue implements Ma String k = null; while (item == null) { k = inmemoryQueue.take(); + item = keyMappings.get(k); + } final String key = k; ObjectInputStream oin = null; --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org