Author: norman Date: Mon Oct 18 19:56:20 2010 New Revision: 1023968 URL: http://svn.apache.org/viewvc?rev=1023968&view=rev Log: Just some refactoring to make it easier to extend JMSMailQueue
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff ============================================================================== --- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original) +++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Mon Oct 18 19:56:20 2010 @@ -20,6 +20,8 @@ package org.apache.james.queue.activemq; import java.io.IOException; import java.net.MalformedURLException; +import java.util.Iterator; +import java.util.Map; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -27,6 +29,8 @@ import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -152,8 +156,7 @@ public class ActiveMQMailQueue extends J * org.apache.james.queue.jms.JMSMailQueue#createMessage(javax.jms.Session, * org.apache.mailet.Mail, long) */ - protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, IOException, MessagingException { - + protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { boolean useBlob = false; if (messageTreshold != -1) { try { @@ -166,16 +169,36 @@ public class ActiveMQMailQueue extends J } } if (useBlob) { - ActiveMQSession amqSession; - if (session instanceof PooledSession) { - amqSession = ((PooledSession) session).getInternalSession(); - } else { - amqSession = (ActiveMQSession) session; + MessageProducer producer = null; + try { + ActiveMQSession amqSession; + if (session instanceof PooledSession) { + amqSession = ((PooledSession) session).getInternalSession(); + } else { + amqSession = (ActiveMQSession) session; + } + BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage())); + Queue queue = session.createQueue(queuename); + + producer = session.createProducer(queue); + Iterator<String> keys = props.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + message.setObjectProperty(key, props.get(key)); + } + producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); + } finally { + + try { + if (producer != null) + producer.close(); + } catch (JMSException e) { + // ignore here + } } - BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage())); - return message; + } else { - return super.createMessage(session, mail, delayInMillis); + super.produceMail(session, props, msgPrio, mail); } } Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff ============================================================================== --- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original) +++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Mon Oct 18 19:56:20 2010 @@ -22,8 +22,10 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.TimeUnit; @@ -199,7 +201,6 @@ public class JMSMailQueue implements Mai public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException { Connection connection = null; Session session = null; - MessageProducer producer = null; long mydelay = 0; @@ -212,18 +213,17 @@ public class JMSMailQueue implements Mai connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(queuename); - producer = session.createProducer(queue); - Message message = createMessage(session, mail, mydelay); - populateJMSProperties(message, mail, mydelay); - + int msgPrio = NORMAL_PRIORITY; Object prio = mail.getAttribute(MAIL_PRIORITY); if (prio instanceof Integer) { msgPrio = (Integer) prio; } - - producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); + + Map<String, Object> props = getJMSProperties(mail, mydelay); + + produceMail(session, props, msgPrio, mail); + } catch (Exception e) { if (session != null) { try { @@ -235,13 +235,6 @@ public class JMSMailQueue implements Mai throw new MailQueueException("Unable to enqueue mail " + mail, e); } finally { - - try { - if (producer != null) - producer.close(); - } catch (JMSException e) { - // ignore here - } try { if (session != null) session.close(); @@ -268,25 +261,42 @@ public class JMSMailQueue implements Mai } /** - * Create Message which holds the {...@link MimeMessage} of the given Mail - * - * @param session - * @param mail - * @return jmsMessage - * @throws JMSException - * @throws IOException - * @throws MessagingException - * @throws IOException + * Produce the mail to the JMS Queue */ - protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, MessagingException, IOException { - BytesMessage message = session.createBytesMessage(); - mail.getMessage().writeTo(new BytesMessageOutputStream(message)); + protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException { + MessageProducer producer = null; + + try { + Queue queue = session.createQueue(queuename); + + producer = session.createProducer(queue); + BytesMessage message = session.createBytesMessage(); + + Iterator<String> keys = props.keySet().iterator(); + while(keys.hasNext()) { + String key = keys.next(); + message.setObjectProperty(key, props.get(key)); + } + + mail.getMessage().writeTo(new BytesMessageOutputStream(message)); + producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE); + + } finally { + + try { + if (producer != null) + producer.close(); + } catch (JMSException e) { + // ignore here + } + } + + - return message; } /** - * Populate JMS Message properties with values + * Get JMS Message properties with values * * @param message * @param mail @@ -295,17 +305,18 @@ public class JMSMailQueue implements Mai * @throws MessagingException */ @SuppressWarnings("unchecked") - protected void populateJMSProperties(Message message, Mail mail, long delayInMillis) throws JMSException, MessagingException { + protected Map<String,Object> getJMSProperties(Mail mail, long delayInMillis) throws JMSException, MessagingException { + Map<String, Object> props = new HashMap<String, Object>(); long nextDelivery = -1; if (delayInMillis > 0) { nextDelivery = System.currentTimeMillis() + delayInMillis; } - message.setLongProperty(JAMES_NEXT_DELIVERY, nextDelivery); - message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage()); - message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime()); - message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize()); - message.setStringProperty(JAMES_MAIL_NAME, mail.getName()); + props.put(JAMES_NEXT_DELIVERY, nextDelivery); + props.put(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage()); + props.put(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime()); + props.put(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize()); + props.put(JAMES_MAIL_NAME, mail.getName()); StringBuilder recipientsBuilder = new StringBuilder(); @@ -317,9 +328,9 @@ public class JMSMailQueue implements Mai recipientsBuilder.append(JAMES_MAIL_SEPERATOR); } } - message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString()); - message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr()); - message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost()); + props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString()); + props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr()); + props.put(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost()); String sender; MailAddress s = mail.getSender(); @@ -336,16 +347,16 @@ public class JMSMailQueue implements Mai attrsBuilder.append(attrName); Object value = convertAttributeValue(mail.getAttribute(attrName)); - message.setObjectProperty(attrName, value); + props.put(attrName, value); if (attrs.hasNext()) { attrsBuilder.append(JAMES_MAIL_SEPERATOR); } } - message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString()); - message.setStringProperty(JAMES_MAIL_SENDER, sender); - message.setStringProperty(JAMES_MAIL_STATE, mail.getState()); - + props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString()); + props.put(JAMES_MAIL_SENDER, sender); + props.put(JAMES_MAIL_STATE, mail.getState()); + return props; } /** @@ -392,7 +403,6 @@ public class JMSMailQueue implements Mai * @throws JMSException */ protected void populateMail(Message message, MailImpl mail) throws JMSException { - mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE)); mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED))); mail.setName(message.getStringProperty(JAMES_MAIL_NAME)); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org