Author: amilas
Date: Wed Sep 24 02:44:17 2008
New Revision: 698486

URL: http://svn.apache.org/viewvc?rev=698486&view=rev
Log:
Added support for sync invocations with mail. For axis2 this should handle at 
the transport level.
for mail it uses the In-Reply-To header value to match the request and response

Modified:
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
    
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
    
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
    
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
    
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java
 Wed Sep 24 02:44:17 2008
@@ -272,7 +272,19 @@
         // send the message context through the axis engine
         try {
                 try {
-                    AxisEngine.receive(msgCtx);
+                    // check if an callback has register for this message
+                    Map callBackMap = (Map) 
msgCtx.getConfigurationContext().getProperty(BaseConstants.CALLBACK_TABLE);
+                    Object replyToMessageID = 
trpHeaders.get(BaseConstants.HEADER_IN_REPLY_TO);
+                    // if there is a call back register with this replyto ID 
then this has
+                    // to handle as a synchronized message
+                    if ((replyToMessageID != null) && 
(callBackMap.get(replyToMessageID) != null)) {
+                        SynchronousCallback synchronousCallback = 
(SynchronousCallback) callBackMap.get(replyToMessageID);
+                        synchronousCallback.setInMessageContext(msgCtx);
+                        callBackMap.remove(replyToMessageID);
+                    } else {
+                        AxisEngine.receive(msgCtx);
+                    }
+
                 } catch (AxisFault e) {
                     e.printStackTrace();
                     if (log.isDebugEnabled()) {

Modified: 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java
 Wed Sep 24 02:44:17 2008
@@ -85,4 +85,10 @@
      * The default poll interval in milliseconds.
      */
     public static final int DEFAULT_POLL_INTERVAL = 5 * 60 * 1000; // 5 mins 
by default
+
+    public static final String CALLBACK_TABLE = "callbackTable";
+    public static final String HEADER_IN_REPLY_TO = "In-Reply-To";
+
+    // this is an property required by axis2
+    public final static String MAIL_CONTENT_TYPE = "mail.contenttype";
 }

Modified: 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailConstants.java
 Wed Sep 24 02:44:17 2008
@@ -90,4 +90,5 @@
     // Custom headers
     /** @see org.apache.axis2.transport.mail.WSMimeMessage */
     public static final String MAIL_HEADER_X_MESSAGE_ID= "X-Message-ID";
+    
 }

Modified: 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportListener.java
 Wed Sep 24 02:44:17 2008
@@ -42,6 +42,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This mail transport lister implementation uses the base transport framework 
and is a polling
@@ -52,6 +53,7 @@
  * (e.g. with imap). When checking for new mail, the transport ignores 
messages already flaged as
  * SEEN and DELETED
  */
+
 public class MailTransportListener extends 
AbstractPollingTransportListener<PollTableEntry>
     implements ManagementSupport {
 
@@ -69,6 +71,11 @@
     public void init(ConfigurationContext cfgCtx, TransportInDescription 
trpInDesc)
         throws AxisFault {
         super.init(cfgCtx, trpInDesc);
+
+        // set the synchronise callback table
+        if (cfgCtx.getProperty(BaseConstants.CALLBACK_TABLE) == null){
+            cfgCtx.setProperty(BaseConstants.CALLBACK_TABLE, new 
ConcurrentHashMap());
+        }
     }
 
     @Override
@@ -260,6 +267,11 @@
             }
         } catch (MessagingException ignore) {}
 
+        // some times the mail server sends a special mail message which is 
not relavent
+        // in processing. ignore this message.
+        if ((trpHeaders.get("Status") != null) && 
(trpHeaders.get("Status").equals("RO"))){
+            return;
+        }
         // figure out content type of primary request. If the content type is 
specified, use it
         String contentType = entry.getContentType();
         if (BaseUtils.isBlank(contentType)) {
@@ -386,7 +398,7 @@
      * @param message the email message to be moved or deleted
      */
     private void moveOrDeleteAfterProcessing(final PollTableEntry entry, Store 
store,
-        Folder folder, Message message) {
+                                             Folder folder, Message message) {
 
         String moveToFolder = null;
         try {
@@ -456,6 +468,7 @@
                 }
             }
 
+
             entry.setContentType(
                 ParamUtils.getOptionalParam(service, 
MailConstants.TRANSPORT_MAIL_CONTENT_TYPE));
             entry.setReplyAddress(
@@ -493,7 +506,7 @@
                 
entry.setReconnectTimeout(Integer.parseInt(strReconnectTimeout) * 1000);
 
             return entry;
-            
+
         } catch (AxisFault axisFault) {
             String msg = "Error configuring the Mail transport for Service : " 
+
                 service.getName() + " :: " + axisFault.getMessage();

Modified: 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
URL: 
http://svn.apache.org/viewvc/webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java?rev=698486&r1=698485&r2=698486&view=diff
==============================================================================
--- 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
 (original)
+++ 
webservices/commons/trunk/modules/transport/modules/mail/src/main/java/org/apache/axis2/transport/mail/MailTransportSender.java
 Wed Sep 24 02:44:17 2008
@@ -21,15 +21,14 @@
 
 import org.apache.axis2.format.MessageFormatterEx;
 import org.apache.axis2.format.MessageFormatterExAdapter;
-import org.apache.axis2.transport.base.AbstractTransportSender;
-import org.apache.axis2.transport.base.BaseConstants;
-import org.apache.axis2.transport.base.BaseUtils;
-import org.apache.axis2.transport.base.ManagementSupport;
+import org.apache.axis2.transport.base.*;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.OutOnlyAxisOperation;
+import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.transport.OutTransportInfo;
@@ -43,12 +42,14 @@
 import javax.activation.CommandMap;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.io.IOException;
 
 /**
  * The mail transport sender sends mail using an SMTP server configuration 
defined
  * in the axis2.xml's transport sender definition
  */
+
 public class MailTransportSender extends AbstractTransportSender
     implements ManagementSupport {
 
@@ -135,6 +136,11 @@
         CommandMap.setDefaultCommandMap(mc);
         
         session.setDebug(log.isTraceEnabled());
+
+        // set the synchronise callback table
+        if (cfgCtx.getProperty(BaseConstants.CALLBACK_TABLE) == null){
+            cfgCtx.setProperty(BaseConstants.CALLBACK_TABLE, new 
ConcurrentHashMap());
+        }
     }
 
     /**
@@ -181,7 +187,12 @@
 
         if (mailOutInfo != null) {
             try {
-                sendMail(mailOutInfo, msgCtx);
+                String messageID = sendMail(mailOutInfo, msgCtx);
+                // this is important in axis2 client side if the mail 
transport uses anonymous addressing
+                // the sender have to wait util the response comes.
+                if (!msgCtx.getOptions().isUseSeparateListener() && 
!msgCtx.isServerSide()){
+                    waitForReply(msgCtx, messageID);
+                }
             } catch (MessagingException e) {
                 handleException("Error generating mail message", e);
             } catch (IOException e) {
@@ -192,13 +203,49 @@
         }
     }
 
+    private void waitForReply(MessageContext msgContext, String mailMessageID) 
throws AxisFault {
+        // piggy back message constant is used to pass a piggy back
+        // message context in asnych model
+        if (msgContext.getAxisOperation() instanceof OutOnlyAxisOperation &&
+                
(msgContext.getProperty(org.apache.axis2.Constants.PIGGYBACK_MESSAGE) == null)) 
{
+            return;
+        }
+
+        ConfigurationContext configContext = 
msgContext.getConfigurationContext();
+        // if the mail message listner has not started we need to start it
+        if 
(!configContext.getListenerManager().isListenerRunning(MailConstants.TRANSPORT_NAME))
 {
+            TransportInDescription mailTo =
+                    
configContext.getAxisConfiguration().getTransportIn(MailConstants.TRANSPORT_NAME);
+            if (mailTo == null) {
+                throw new AxisFault("Could not found the transport receiver 
for " + MailConstants.TRANSPORT_NAME);
+            }
+            configContext.getListenerManager().addListener(mailTo, false);
+        }
+
+        SynchronousCallback synchronousCallback = new 
SynchronousCallback(msgContext);
+        Map callBackMap = (Map) 
msgContext.getConfigurationContext().getProperty(BaseConstants.CALLBACK_TABLE);
+        callBackMap.put(mailMessageID, synchronousCallback);
+        synchronized (synchronousCallback) {
+            try {
+                
synchronousCallback.wait(msgContext.getOptions().getTimeOutInMilliSeconds());
+            } catch (InterruptedException e) {
+                throw new AxisFault("Error occured while waiting ..", e);
+            }
+        }
+
+        if (!synchronousCallback.isComplete()){
+            throw new AxisFault("Timeout while waiting from a response");
+        }
+    }
+
     /**
      * Populate email with a SOAP formatted message
      * @param outInfo the out transport information holder
      * @param msgContext the message context that holds the message to be 
written
      * @throws AxisFault on error
+     * @return id of the send mail message
      */
-    private void sendMail(MailOutTransportInfo outInfo, MessageContext 
msgContext)
+    private String sendMail(MailOutTransportInfo outInfo, MessageContext 
msgContext)
         throws AxisFault, MessagingException, IOException {
 
         OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext);
@@ -428,5 +475,6 @@
             handleException("Error creating mail message or sending it to the 
configured server", e);
             
         }
+        return message.getMessageID();
     }
 }


Reply via email to