Author: amilas
Date: Wed Sep 24 02:44:17 2008
New Revision: 698486
URL: http://svn.apache.org/viewvc?rev=698486&view=rev
Log:
Added support for sync invocations with mail. For axis2 this should handle at
the transport level.
for mail it uses the In-Reply-To header value to match the request and response
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
Wed Sep 24 02:44:17 2008
@@ -272,7 +272,19 @@
// send the message context through the axis engine
try {
try {
- AxisEngine.receive(msgCtx);
+ // check if an callback has register for this message
+ Map callBackMap = (Map)
msgCtx.getConfigurationContext().getProperty(BaseConstants.CALLBACK_TABLE);
+ Object replyToMessageID =
trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO);
+ // if there is a call back register with this replyto ID
then this has
+ // to handle as a synchronized message
+ if ((replyToMessageID != null) &&
(callBackMap.get(replyToMessageID) != null)) {
+ SynchronousCallback synchronousCallback =
(SynchronousCallback) callBackMap.get(replyToMessageID);
+ synchronousCallback.setInMessageContext(msgCtx);
+ callBackMap.remove(replyToMessageID);
+ } else {
+ AxisEngine.receive(msgCtx);
+ }
+
} catch (AxisFault e) {
e.printStackTrace();
if (log.isDebugEnabled()) {
Modified:
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
Wed Sep 24 02:44:17 2008
@@ -85,4 +85,10 @@
* The default poll interval in milliseconds.
*/
public static final int DEFAULT_POLL_INTERVAL = 5 * 60 * 1000; // 5 mins
by default
+
+ public static final String CALLBACK_TABLE = "callbackTable";
+ public static final String HEADER_IN_REPLY_TO = "In-Reply-To";
+
+ // this is an property required by axis2
+ public final static String MAIL_CONTENT_TYPE = "mail.contenttype";
}
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
Wed Sep 24 02:44:17 2008
@@ -90,4 +90,5 @@
// Custom headers
/** @see org.apache.axis2.transport.mail.WSMimeMessage */
public static final String MAIL_HEADER_X_MESSAGE_ID= "X-Message-ID";
+
}
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
Wed Sep 24 02:44:17 2008
@@ -42,6 +42,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* This mail transport lister implementation uses the base transport framework
and is a polling
@@ -52,6 +53,7 @@
* (e.g. with imap). When checking for new mail, the transport ignores
messages already flaged as
* SEEN and DELETED
*/
+
public class MailTransportListener extends
AbstractPollingTransportListener<PollTableEntry>
implements ManagementSupport {
@@ -69,6 +71,11 @@
public void init(ConfigurationContext cfgCtx, TransportInDescription
trpInDesc)
throws AxisFault {
super.init(cfgCtx, trpInDesc);
+
+ // set the synchronise callback table
+ if (cfgCtx.getProperty(BaseConstants.CALLBACK_TABLE) == null){
+ cfgCtx.setProperty(BaseConstants.CALLBACK_TABLE, new
ConcurrentHashMap());
+ }
}
@Override
@@ -260,6 +267,11 @@
}
} catch (MessagingException ignore) {}
+ // some times the mail server sends a special mail message which is
not relavent
+ // in processing. ignore this message.
+ if ((trpHeaders.get("Status") != null) &&
(trpHeaders.get("Status").equals("RO"))){
+ return;
+ }
// figure out content type of primary request. If the content type is
specified, use it
String contentType = entry.getContentType();
if (BaseUtils.isBlank(contentType)) {
@@ -386,7 +398,7 @@
* @param message the email message to be moved or deleted
*/
private void moveOrDeleteAfterProcessing(final PollTableEntry entry, Store
store,
- Folder folder, Message message) {
+ Folder folder, Message message) {
String moveToFolder = null;
try {
@@ -456,6 +468,7 @@
}
}
+
entry.setContentType(
ParamUtils.getOptionalParam(service,
MailConstants.TRANSPORT_MAIL_CONTENT_TYPE));
entry.setReplyAddress(
@@ -493,7 +506,7 @@
entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000);
return entry;
-
+
} catch (AxisFault axisFault) {
String msg = "Error configuring the Mail transport for Service : "
+
service.getName() + " :: " + axisFault.getMessage();
Modified:
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
URL:
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
---
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
(original)
+++
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
Wed Sep 24 02:44:17 2008
@@ -21,15 +21,14 @@
import org.apache.axis2.format.MessageFormatterEx;
import org.apache.axis2.format.MessageFormatterExAdapter;
-import org.apache.axis2.transport.base.AbstractTransportSender;
-import org.apache.axis2.transport.base.BaseConstants;
-import org.apache.axis2.transport.base.BaseUtils;
-import org.apache.axis2.transport.base.ManagementSupport;
+import org.apache.axis2.transport.base.*;
import org.apache.commons.logging.LogFactory;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.OutOnlyAxisOperation;
+import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.transport.OutTransportInfo;
@@ -43,12 +42,14 @@
import javax.activation.CommandMap;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
/**
* The mail transport sender sends mail using an SMTP server configuration
defined
* in the axis2.xml's transport sender definition
*/
+
public class MailTransportSender extends AbstractTransportSender
implements ManagementSupport {
@@ -135,6 +136,11 @@
CommandMap.setDefaultCommandMap(mc);
session.setDebug(log.isTraceEnabled());
+
+ // set the synchronise callback table
+ if (cfgCtx.getProperty(BaseConstants.CALLBACK_TABLE) == null){
+ cfgCtx.setProperty(BaseConstants.CALLBACK_TABLE, new
ConcurrentHashMap());
+ }
}
/**
@@ -181,7 +187,12 @@
if (mailOutInfo != null) {
try {
- sendMail(mailOutInfo, msgCtx);
+ String messageID = sendMail(mailOutInfo, msgCtx);
+ // this is important in axis2 client side if the mail
transport uses anonymous addressing
+ // the sender have to wait util the response comes.
+ if (!msgCtx.getOptions().isUseSeparateListener() &&
!msgCtx.isServerSide()){
+ waitForReply(msgCtx, messageID);
+ }
} catch (MessagingException e) {
handleException("Error generating mail message", e);
} catch (IOException e) {
@@ -192,13 +203,49 @@
}
}
+ private void waitForReply(MessageContext msgContext, String mailMessageID)
throws AxisFault {
+ // piggy back message constant is used to pass a piggy back
+ // message context in asnych model
+ if (msgContext.getAxisOperation() instanceof OutOnlyAxisOperation &&
+
(msgContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null))
{
+ return;
+ }
+
+ ConfigurationContext configContext =
msgContext.getConfigurationContext();
+ // if the mail message listner has not started we need to start it
+ if
(!configContext.getListenerManager().isListenerRunning(MailConstants.TRANSPORT_NAME))
{
+ TransportInDescription mailTo =
+
configContext.getAxisConfiguration().getTransportIn(MailConstants.TRANSPORT_NAME);
+ if (mailTo == null) {
+ throw new AxisFault("Could not found the transport receiver
for " + MailConstants.TRANSPORT_NAME);
+ }
+ configContext.getListenerManager().addListener(mailTo, false);
+ }
+
+ SynchronousCallback synchronousCallback = new
SynchronousCallback(msgContext);
+ Map callBackMap = (Map)
msgContext.getConfigurationContext().getProperty(BaseConstants.CALLBACK_TABLE);
+ callBackMap.put(mailMessageID, synchronousCallback);
+ synchronized (synchronousCallback) {
+ try {
+
synchronousCallback.wait(msgContext.getOptions().getTimeOutInMilliSeconds());
+ } catch (InterruptedException e) {
+ throw new AxisFault("Error occured while waiting ..", e);
+ }
+ }
+
+ if (!synchronousCallback.isComplete()){
+ throw new AxisFault("Timeout while waiting from a response");
+ }
+ }
+
/**
* Populate email with a SOAP formatted message
* @param outInfo the out transport information holder
* @param msgContext the message context that holds the message to be
written
* @throws AxisFault on error
+ * @return id of the send mail message
*/
- private void sendMail(MailOutTransportInfo outInfo, MessageContext
msgContext)
+ private String sendMail(MailOutTransportInfo outInfo, MessageContext
msgContext)
throws AxisFault, MessagingException, IOException {
OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
@@ -428,5 +475,6 @@
handleException("Error creating mail message or sending it to the
configured server", e);
}
+ return message.getMessageID();
}
}