Author: norman
Date: Thu Sep 22 12:40:33 2011
New Revision: 1174095
URL: http://svn.apache.org/viewvc?rev=1174095&view=rev
Log:
Make sure the FileMailQueue also handles messages with delay the right way. See
JAMES-1316
Modified:
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
Modified:
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
URL:
http://svn.apache.org/viewvc/james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java?rev=1174095&r1=1174094&r2=1174095&view=diff
==============================================================================
---
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
(original)
+++
james/server/trunk/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueue.java
Thu Sep 22 12:40:33 2011
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.mail.MessagingException;
import javax.mail.util.SharedFileInputStream;
@@ -61,7 +62,7 @@ public class FileMailQueue implements Ma
private ConcurrentHashMap<String, FileItem> keyMappings = new
ConcurrentHashMap<String, FileMailQueue.FileItem>();
private BlockingQueue<String> inmemoryQueue = new
LinkedBlockingQueue<String>();
private ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
-
+ private final static AtomicLong COUNTER = new AtomicLong();
private final String queuename;
private final File parentDir;
private String queueDirName;
@@ -69,6 +70,7 @@ public class FileMailQueue implements Ma
private boolean sync;
private final static String MSG_EXTENSION = ".msg";
private final static String OBJECT_EXTENSION = ".obj";
+ private final static String NEXT_DELIVERY = "FileQueueNextDelivery";
public FileMailQueue(File parentDir, String queuename, boolean sync,
Logger log) throws IOException {
this.queuename = queuename;
@@ -91,26 +93,34 @@ public class FileMailQueue implements Ma
@Override
public boolean accept(File dir, String name) {
- if (name.endsWith(MSG_EXTENSION)) {
- if (new File(dir, name.substring(0, name.length()
-MSG_EXTENSION.length()) + OBJECT_EXTENSION).exists()) {
- return true;
- }
+ if (name.endsWith(OBJECT_EXTENSION)) {
+ return true;
}
return false;
}
});
for (int a = 0; a < files.length; a++) {
final String name = files[a];
+ ObjectInputStream oin = null;
- int i = name.indexOf("-");
- if ( i > -1) {
- final String objectFilename = name.substring(0,
name.length() - MSG_EXTENSION.length()) + OBJECT_EXTENSION;
- long next = Long.parseLong(name.substring(0,i));
- final String key = name.substring(i +1, name.length() -
MSG_EXTENSION.length());
- FileItem item = new FileItem(queueDirName + "/" +
objectFilename, queueDirName + "/" + name);
+
+ try {
+ final String msgFileName = name.substring(0, name.length()
- OBJECT_EXTENSION.length()) + MSG_EXTENSION;
+
+ FileItem item = new FileItem(queueDirName + "/" + name,
queueDirName + "/" + msgFileName);
+
+ oin = new ObjectInputStream(new
FileInputStream(item.getObjectFile()));
+ Mail mail = (Mail) oin.readObject();
+ Long next = (Long) mail.getAttribute(NEXT_DELIVERY);
+ if (next == null) {
+ next = 0L;
+ }
+
+
+ final String key = mail.getName();
keyMappings.put(key, item);
if (next <= System.currentTimeMillis()) {
-
+
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
@@ -118,10 +128,11 @@ public class FileMailQueue implements Ma
throw new RuntimeException("Unable to init", e);
}
} else {
-
- // Schedule a task which will put the mail in the
queue for processing after a given delay
+
+ // Schedule a task which will put the mail in the queue
+ // for processing after a given delay
scheduler.schedule(new Runnable() {
-
+
@Override
public void run() {
try {
@@ -129,11 +140,25 @@ public class FileMailQueue implements Ma
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to
init", e);
- }
+ }
}
}, next - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
+
+ } catch (ClassNotFoundException e1) {
+ log.error("Unable to load Mail", e1);
+ } catch (IOException e) {
+ log.error("Unable to load Mail", e);
+ } finally {
+ if (oin != null) {
+ try {
+ oin.close();
+ } catch (Exception e) {
+ // ignore on close
+ }
+ }
}
+
}
}
}
@@ -142,13 +167,13 @@ public class FileMailQueue implements Ma
@Override
- public void enQueue(Mail mail, long delay, TimeUnit unit) throws
MailQueueException {
- final String key = mail.getName();
+ public void enQueue(final Mail mail, long delay, TimeUnit unit) throws
MailQueueException {
+ final String key = mail.getName() + "-" + COUNTER.incrementAndGet();
FileOutputStream out = null;
FileOutputStream foout = null;
ObjectOutputStream oout = null;
try {
- String name = getFileNameWithoutExtension(queueDirName, key,
unit.toMillis(delay));
+ String name = queueDirName + "/" + key;
final FileItem item = new FileItem(name + OBJECT_EXTENSION, name +
MSG_EXTENSION);
@@ -158,20 +183,24 @@ public class FileMailQueue implements Ma
oout.flush();
if (sync) foout.getFD().sync();
out = new FileOutputStream(item.getMessageFile());
+
mail.getMessage().writeTo(out);
out.flush();
if (sync) out.getFD().sync();
keyMappings.put(key, item);
-
+
+
if (delay > 0) {
+ mail.setAttribute(NEXT_DELIVERY, System.currentTimeMillis() +
unit.toMillis(delay));
// The message should get delayed so schedule it for later
scheduler.schedule(new Runnable() {
@Override
public void run() {
- try {
+ try {
inmemoryQueue.put(key);
+
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to init", e);
@@ -220,9 +249,6 @@ public class FileMailQueue implements Ma
}
- private String getFileNameWithoutExtension(String parentdir, String name,
long delay) {
- return parentdir + "/" + System.currentTimeMillis() + delay + "-" +
name;
- }
@Override
@@ -237,7 +263,9 @@ public class FileMailQueue implements Ma
String k = null;
while (item == null) {
k = inmemoryQueue.take();
+
item = keyMappings.get(k);
+
}
final String key = k;
ObjectInputStream oin = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]