Author: gatfora
Date: Fri Mar  9 07:51:40 2007
New Revision: 516440

URL: http://svn.apache.org/viewvc?view=rev&rev=516440
Log:
Enable duplicate messages to be processed by Reliable messaging.  Duplicate 
message is detected by the SandeshaGlobalInHandler, then processed by the 
SequenceProcessor.processReliableMessage

Modified:
    webservices/sandesha/trunk/java/config/module.xml
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java

Modified: webservices/sandesha/trunk/java/config/module.xml
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Fri Mar  9 07:51:40 2007
@@ -75,6 +75,10 @@
         <messageReceiver 
class="org.apache.sandesha2.msgreceivers.RMMessageReceiver"/>
     </operation>
 
+    <operation name="RMInOutDuplicateMessageOperation" 
mep="http://www.w3.org/2006/01/wsdl/in-out";>
+        <messageReceiver 
class="org.apache.sandesha2.msgreceivers.RMMessageReceiver"/>
+    </operation>
+
    <supported-policy-namespaces 
namespaces="http://ws.apache.org/sandesha2/policy"; />
 
        <wsp:Policy xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy";

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 Fri Mar  9 07:51:40 2007
@@ -249,8 +249,10 @@
                int MAKE_CONNECTION_MSG = 11;
                
                int LAST_MESSAGE = 12;
-               
-               int MAX_MESSAGE_TYPE = 12;
+
+    int DUPLICATE_MESSAGE = 13;
+
+               int MAX_MESSAGE_TYPE = 13;
        }
 
        public interface MessageParts {
@@ -541,6 +543,7 @@
                static final String INBOUND_MESSAGE_NUMBER = 
"Sandesha2InboundMessageNumber";
                static final String INBOUND_LAST_MESSAGE   = 
"Sandesha2InboundLastMessage";
                static final String MAKECONNECTION_ENTRY   = 
"Sandesha2MakeConnectionEntry";
+    static final String RM_MESSAGE_CONTEXT     = "RMMessageContext";
        }
     
     public interface Assertions {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 Fri Mar  9 07:51:40 2007
@@ -17,15 +17,34 @@
 
 package org.apache.sandesha2.handlers;
 
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.handlers.AbstractHandler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.wsrm.Sequence;
 
 /**
@@ -84,10 +103,103 @@
                        }
 
                }
-               
+    
+    // Check if this is an application message and if it is a duplicate
+    RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgContext);
+    
+    // Set the RMMMessageContext as a property on the message so we can 
retrieve it later
+    
msgContext.setProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT,
 rmMsgCtx);
+
+    if (rmMsgCtx.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
+      processApplicationMessage(rmMsgCtx);
+    }
+    
                if (log.isDebugEnabled())
-                       log.debug("Exit: SandeshaGlobalInHandler::invoke, 
continuing");
+                       log.debug("Exit: SandeshaGlobalInHandler::invoke " + 
InvocationResponse.CONTINUE);
                return InvocationResponse.CONTINUE;
        }
        
+  private static void processApplicationMessage(RMMsgContext rmMsgCtx) throws 
AxisFault {
+    if (log.isDebugEnabled())
+      log.debug("Enter: SandeshaGlobalInHandler::processApplicationMessage");
+    // Check if this is a duplicate message
+    Sequence sequence = (Sequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+    String sequenceId = sequence.getIdentifier().getIdentifier();
+    long msgNo = sequence.getMessageNumber().getMessageNumber();
+
+    StorageManager storageManager = 
+      
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getConfigurationContext(), 
+          rmMsgCtx.getConfigurationContext().getAxisConfiguration());
+    
+    Transaction transaction = storageManager.getTransaction();
+    
+    try {
+    
+      // Check that both the Sequence header and message body have been 
secured properly
+      RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+      RMDBean bean = mgr.retrieve(sequenceId);
+      
+      if(bean != null && bean.getSecurityTokenData() != null) {
+        SecurityManager secManager = 
SandeshaUtil.getSecurityManager(rmMsgCtx.getConfigurationContext());
+        
+        QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(), 
Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+        
+        SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope();
+        OMElement body = envelope.getBody();
+        OMElement seqHeader = 
envelope.getHeader().getFirstChildWithName(seqName);
+        
+        SecurityToken token = 
secManager.recoverSecurityToken(bean.getSecurityTokenData());
+        
+        secManager.checkProofOfPossession(token, seqHeader, 
rmMsgCtx.getMessageContext());
+        secManager.checkProofOfPossession(token, body, 
rmMsgCtx.getMessageContext());
+      }
+    
+      if (bean != null) {
+        
+        if (msgNo == 0) {
+          String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+              .toString(msgNo));
+          log.debug(message);
+          throw new SandeshaException(message);
+        }
+    
+        // Get the server completed message ranges list
+        RangeString serverCompletedMessageRanges = 
bean.getServerCompletedMessages();
+    
+        // See if the message is in the list of completed ranges
+        boolean msgNoPresentInList = 
+          serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
+          
+        if (!msgNoPresentInList) {
+          serverCompletedMessageRanges.addRange(new Range(msgNo));
+          
+          storageManager.getRMDBeanMgr().update(bean);
+        }
+        else {
+          // Add the duplicate RM AxisOperation to the message
+          AxisOperation duplicateMessageOperation = 
SpecSpecificConstants.getWSRMOperation(
+              Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE,
+              Sandesha2Constants.SPEC_VERSIONS.v1_0,
+              rmMsgCtx.getMessageContext().getAxisService());
+          
rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
+        }
+              
+      } else {
+        // Add the duplicate RM AxisOperation to the message
+        AxisOperation duplicateMessageOperation = 
SpecSpecificConstants.getWSRMOperation(
+            Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE,
+            Sandesha2Constants.SPEC_VERSIONS.v1_0,
+            rmMsgCtx.getMessageContext().getAxisService());
+        
rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
+      }
+      transaction.commit();
+      transaction = null;
+    }
+    finally {
+      if (transaction != null)
+        transaction.rollback();
+    }
+    if (log.isDebugEnabled())
+      log.debug("Exit: SandeshaGlobalInHandler::processApplicationMessage");
+  }
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 Fri Mar  9 07:51:40 2007
@@ -90,9 +90,14 @@
                                throw new AxisFault(message);
                        }
 
-                       //processing any incoming faults.                       
-                       RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
+                       RMMsgContext rmMsgCtx = null;
+      
+      if 
(msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT)
 != null)
+        rmMsgCtx = 
(RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+      else
+        rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
 
+      //processing any incoming faults.     
                        //This is responsible for Sandesha2 specific 
                        FaultManager.processMessagesForFaults(rmMsgCtx);
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Fri Mar  9 07:51:40 2007
@@ -51,8 +51,6 @@
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.Range;
-import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.workers.SandeshaThread;
@@ -171,13 +169,6 @@
                        return InvocationResponse.ABORT;
                }
 
-               if (msgNo == 0) {
-                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
-                                       .toString(msgNo));
-                       log.debug(message);
-                       throw new SandeshaException(message);
-               }
-
                // Pause the messages bean if not the right message to invoke.
                
                // updating the last activated time of the sequence.
@@ -207,15 +198,9 @@
                        bean.setHighestInMessageId(messageId);
                        bean.setHighestInMessageNumber(msgNo);
                }
-
-               // Get the server completed message ranges list
-               RangeString serverCompletedMessageRanges = 
bean.getServerCompletedMessages();
-               // See if the message is in the list of completed ranges
-               boolean msgNoPresentInList = 
-                       
serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
                
                String specVersion = rmMsgCtx.getRMSpecVersion();
-               if (msgNoPresentInList
+               if 
(rmMsgCtx.getMessageContext().getAxisOperation().getName().getLocalPart().equals("RMInOutDuplicateMessageOperation")
                                && 
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == 
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
                        // this is a duplicate message and the invocation type 
is EXACTLY_ONCE. We try to return
                        // ack messages at this point, as if someone is sending 
duplicates then they may have
@@ -254,11 +239,6 @@
                        if (log.isDebugEnabled())
                                log.debug("Exit: 
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
                        return result;
-               }
-
-               if (!msgNoPresentInList)
-               {
-                       serverCompletedMessageRanges.addRange(new Range(msgNo));
                }
                
                // If the message is a reply to an outbound message then we can 
update the RMSBean that

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
 Fri Mar  9 07:51:40 2007
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.MsgProcessor;
@@ -48,7 +49,13 @@
        public final void receive(MessageContext msgCtx) throws AxisFault {
                if(log.isDebugEnabled()) log.debug("Entry: 
RMMessageReceiver::receive");
                
-               RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
+    RMMsgContext rmMsgCtx = null;
+    
+    if 
(msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT)
 != null)
+      rmMsgCtx = 
(RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+    else
+      rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+    
                if(log.isDebugEnabled()) log.debug("MsgReceiver got type: " + 
SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));    
 
                // Note that some messages (such as stand-alone acks) will be 
routed here, but

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
 Fri Mar  9 07:51:40 2007
@@ -346,8 +346,11 @@
                        case Sandesha2Constants.MessageTypes.ACK_REQUEST:
                        case Sandesha2Constants.MessageTypes.LAST_MESSAGE:
                                result = service.getOperation(new 
QName("RMOutOnlyOperation"));
-                               break;
-                       }
+                               break;                  
+      case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
+        result = service.getOperation(new 
QName("RMInOutDuplicateMessageOperation"));
+        break;
+      }
                } else 
if(rmSpecLevel.equals(Sandesha2Constants.SPEC_VERSIONS.v1_1)) {
                        switch(messageType) {
                        case Sandesha2Constants.MessageTypes.CREATE_SEQ:

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Fri Mar  9 07:51:40 2007
@@ -169,6 +169,14 @@
                        // sending the message
                        boolean successfullySent = false;
 
+                       // have to commit the transaction before sending. This 
may
+                       // get changed when WS-AT is available.
+                       if(transaction != null) {
+                               transaction.commit();
+                               transaction = null;
+                               transaction = storageManager.getTransaction();
+                       }
+
                        // Although not actually sent yet, update the send 
count to indicate an attempt
                        if (senderBean.isReSend()) {
                                SenderBean bean2 = 
senderBeanMgr.retrieve(senderBean.getMessageID());

Modified: 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
 (original)
+++ 
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
 Fri Mar  9 07:51:40 2007
@@ -101,6 +101,8 @@
        public void testSyncEcho() throws Exception {
                // Test sync echo with an offer, and the 1.1 spec
                Options clientOptions = new Options();
+//             org.apache.log4j.BasicConfigurator.configure();
+//             to = "http://127.0.0.1:"; + 9999 + 
"/axis2/services/RMSampleService";
                
clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,SandeshaUtil.getUUID());
                
clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
                runEcho(clientOptions, false, false, false);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to