Author: gatfora
Date: Tue Dec  5 07:43:24 2006
New Revision: 482691

URL: http://svn.apache.org/viewvc?view=rev&rev=482691
Log:
ack request, close and terminate processing refactor patch for SANDESHA2-58

Added:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Tue Dec  5 07:43:24 2006
@@ -30,7 +30,6 @@
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -41,7 +40,6 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
@@ -50,7 +48,6 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.FaultManager;
@@ -59,13 +56,14 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.AckRequested;
 
 /**
  * Responsible for processing ack requested headers on incoming messages.
  */
 
-public class AckRequestedProcessor {
+public class AckRequestedProcessor extends WSRMMessageSender {
 
        private static final Log log = 
LogFactory.getLog(AckRequestedProcessor.class);
 
@@ -274,11 +272,8 @@
 
                        if (it.hasNext()) {
                                SenderBean oldAckBean = (SenderBean) it.next();
-                               timeToSend = oldAckBean.getTimeToSend(); // If 
there is an
-                                                                               
                                        // old ack. This ack
-                                                                               
                                        // will be sent in
-                                                                               
                                        // the old
-                                                                               
                                        // timeToSend.
+                               // If there is an old ack. This ack will be 
sent in the old timeToSend.
+                               timeToSend = oldAckBean.getTimeToSend(); 
                                
retransmitterBeanMgr.delete(oldAckBean.getMessageID());
                        }
 
@@ -315,58 +310,18 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: 
AckRequestedProcessor::processOutgoingAckRequestMessage");
 
-               MessageContext msgContext = ackRequestRMMsg.getMessageContext();
-               ConfigurationContext configurationContext = 
msgContext.getConfigurationContext();
-               Options options = msgContext.getOptions();
-
-               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,
-                               configurationContext.getAxisConfiguration());
-
-               String toAddress = ackRequestRMMsg.getTo().getAddress();
-               String sequenceKey = (String) 
options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-               String internalSequenceID = 
SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-               // Does the sequence exist ?
-               boolean sequenceExists = false;
-               String outSequenceID = null;
-               
-               // Get the Create sequence bean with the matching internal 
sequenceid 
-               CreateSeqBean createSeqFindBean = new CreateSeqBean();
-               createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-               CreateSeqBean createSeqBean = 
storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-               
-               if (createSeqBean == null)
-               {
-                       if (log.isDebugEnabled())
-                               log.debug("Exit: 
AckRequestedProcessor::processOutMessage Sequence doesn't exist");
-                       
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
-                                       
SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));         
        
-               }
-               
-               if (createSeqBean.getSequenceID() != null)
-               {
-                       sequenceExists = true;          
-                       outSequenceID = createSeqBean.getSequenceID();
-               }
-               else
-                       outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;    
                
-
-               String rmVersion = 
SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-               if (rmVersion == null)
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+               setupOutMessage(ackRequestRMMsg);
 
                AxisOperation ackOperation = 
SpecSpecificConstants.getWSRMOperation(
                                Sandesha2Constants.MessageTypes.ACK,
-                               rmVersion,
-                               msgContext.getAxisService());
-               msgContext.setAxisOperation(ackOperation);
+                               getRMVersion(),
+                               getMsgContext().getAxisService());
+               getMsgContext().setAxisOperation(ackOperation);
 
                OperationContext opcontext = new OperationContext(ackOperation);
-               opcontext.setParent(msgContext.getServiceContext());
-               
configurationContext.registerOperationContext(ackRequestRMMsg.getMessageId(), 
opcontext);
-               msgContext.setOperationContext(opcontext);
+               opcontext.setParent(getMsgContext().getServiceContext());
+               
getConfigurationContext().registerOperationContext(ackRequestRMMsg.getMessageId(),
 opcontext);
+               getMsgContext().setOperationContext(opcontext);
                
                Iterator iterator = 
ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
                
@@ -383,70 +338,13 @@
                        throw new SandeshaException 
(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAckRequestPartFound));
                }
                
-               ackRequested.getIdentifier().setIndentifer(outSequenceID);
-               
-               
msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-               
ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction 
(rmVersion));
-               
ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction 
(rmVersion));
-
-               String transportTo = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-               if (transportTo != null) {
-                       
ackRequestRMMsg.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-               }
-               
-               //setting msg context properties
-               
ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,
 outSequenceID);
-               
ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
-               
ackRequestRMMsg.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY
 , sequenceKey);
+               
ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction 
(getRMVersion()));
+               
ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction 
(getRMVersion()));
 
-               ackRequestRMMsg.addSOAPEnvelope();
+               sendOutgoingMessage(ackRequestRMMsg, 
Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
                
-               // Ensure the outbound message us secured using the correct 
token
-               String tokenData = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
-                               storageManager);
-               if(tokenData != null) {
-                       SecurityManager secMgr = 
SandeshaUtil.getSecurityManager(configurationContext);
-                       SecurityToken token = 
secMgr.recoverSecurityToken(tokenData);
-                       secMgr.applySecurityToken(token, msgContext);
-               }
-
-               String key = SandeshaUtil.getUUID();
-
-               SenderBean ackRequestBean = new SenderBean();
-               
ackRequestBean.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
-               ackRequestBean.setMessageContextRefKey(key);
-
-               ackRequestBean.setTimeToSend(System.currentTimeMillis());
-
-               ackRequestBean.setMessageID(msgContext.getMessageID());
-               
-               EndpointReference to = msgContext.getTo();
-               if (to!=null)
-                       ackRequestBean.setToAddress(to.getAddress());
-               
-               // this will be set to true at the sender.
-               if (sequenceExists)
-                 ackRequestBean.setSend(true);
-               else
-                       ackRequestBean.setSend(false);
-
-               
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-
-               ackRequestBean.setReSend(false);
-
-               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
-
-               // Set the sequence id and internal sequence id in the 
SenderBean
-               ackRequestBean.setInternalSequenceID(internalSequenceID);
-               if (sequenceExists)
-                       ackRequestBean.setSequenceID(outSequenceID);
-               
-               SandeshaUtil.executeAndStore(ackRequestRMMsg, key);
-
-               retramsmitterMgr.insert(ackRequestBean);
+               // Pause the message context
+               ackRequestRMMsg.pause();
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
 Tue Dec  5 07:43:24 2006
@@ -23,9 +23,6 @@
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -37,16 +34,12 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
@@ -54,6 +47,7 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.CloseSequence;
 import org.apache.sandesha2.wsrm.Identifier;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -63,7 +57,7 @@
  * by the WSRM 1.1 specification)
  */
 
-public class CloseSequenceProcessor implements MsgProcessor {
+public class CloseSequenceProcessor extends WSRMMessageSender implements 
MsgProcessor {
 
        private static final Log log = 
LogFactory.getLog(CloseSequenceProcessor.class);
 
@@ -174,61 +168,26 @@
                if (log.isDebugEnabled()) 
                        log.debug("Enter: 
CloseSequenceProcessor::processOutMessage");
                
-               MessageContext msgContext = rmMsgCtx.getMessageContext();
-               ConfigurationContext configurationContext = 
msgContext.getConfigurationContext();
-               Options options = msgContext.getOptions();
-
-               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,
-                               configurationContext.getAxisConfiguration());
-
-               String toAddress = rmMsgCtx.getTo().getAddress();
-               String sequenceKey = (String) 
options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-               String internalSequenceID = 
SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-               // Does the sequence exist ?
-               boolean sequenceExists = false;
-               String outSequenceID = null;
-               
-               // Get the Create sequence bean with the matching internal 
sequenceid 
-               CreateSeqBean createSeqFindBean = new CreateSeqBean();
-               createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-               CreateSeqBean createSeqBean = 
storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-               
-               if (createSeqBean == null)
-               {
-                       if (log.isDebugEnabled())
-                               log.debug("Exit: 
CloseSequenceProcessor::processOutMessage Sequence doesn't exist");
-                       
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
-                                       
SandeshaMessageKeys.couldNotSendCloseSeqNotFound, internalSequenceID));         
        
-               }
-               
-               if (createSeqBean.getSequenceID() != null)
-               {
-                       sequenceExists = true;          
-                       outSequenceID = createSeqBean.getSequenceID();
-               }
-               else
-                       outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;    
                
+               // Get the data from the message context
+               setupOutMessage(rmMsgCtx);
 
                //write into the sequence proeprties that the client is now 
closed
                SequencePropertyBean sequenceClosedBean = new 
SequencePropertyBean();
-               sequenceClosedBean.setSequencePropertyKey(internalSequenceID);
+               
sequenceClosedBean.setSequencePropertyKey(getInternalSequenceID());
                
sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED_CLIENT);
                sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
-               
storageManager.getSequencePropertyBeanMgr().insert(sequenceClosedBean);
+               
getStorageManager().getSequencePropertyBeanMgr().insert(sequenceClosedBean);
 
                AxisOperation closeOperation = 
SpecSpecificConstants.getWSRMOperation(
                                Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
                                rmMsgCtx.getRMSpecVersion(),
                                rmMsgCtx.getMessageContext().getAxisService());
-               msgContext.setAxisOperation(closeOperation);
+               getMsgContext().setAxisOperation(closeOperation);
 
                OperationContext opcontext = new 
OperationContext(closeOperation);
-               opcontext.setParent(msgContext.getServiceContext());
-               
configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
-               msgContext.setOperationContext(opcontext);
+               opcontext.setParent(getMsgContext().getServiceContext());
+               
getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+               getMsgContext().setOperationContext(opcontext);
                
                CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
@@ -238,76 +197,14 @@
                        closeSequencePart.setIdentifier(identifier);
                }
                
-               identifier.setIndentifer(outSequenceID);
-
-               
msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-               String rmVersion = 
SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-               if (rmVersion == null)
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-               
rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(rmVersion));
-               
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction 
(rmVersion));
-
-               String transportTo = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-               if (transportTo != null) {
-                       
rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-               }
-               
-               //setting msg context properties
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, 
outSequenceID);
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY
 , sequenceKey);
-
-               rmMsgCtx.addSOAPEnvelope();
-
-               // Ensure the outbound message us secured using the correct 
token
-               String tokenData = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
-                               storageManager);
-               if(tokenData != null) {
-                       SecurityManager secMgr = 
SandeshaUtil.getSecurityManager(configurationContext);
-                       SecurityToken token = 
secMgr.recoverSecurityToken(tokenData);
-                       secMgr.applySecurityToken(token, msgContext);
-               }
-
-               String key = SandeshaUtil.getUUID();
-
-               SenderBean closeBean = new SenderBean();
-               // Indicate that this is a close sequence message
-               
closeBean.setMessageType(Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE);
-               closeBean.setMessageContextRefKey(key);
-
-               closeBean.setTimeToSend(System.currentTimeMillis());
-
-               closeBean.setMessageID(msgContext.getMessageID());
-               
-               EndpointReference to = msgContext.getTo();
-               if (to!=null)
-                       closeBean.setToAddress(to.getAddress());
-               
-               // this will be set to true at the sender.
-               if (sequenceExists)
-                 closeBean.setSend(true);
-               else
-                       closeBean.setSend(false);
-
-               
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-
-               closeBean.setReSend(false);
-
-               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
-               
-               // Add the sequence id and internal sequenceid to the closeBean
-               if (sequenceExists)
-                 closeBean.setSequenceID(outSequenceID);
-               
-               closeBean.setInternalSequenceID(internalSequenceID);
+               
rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(getRMVersion()));
+               
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction 
(getRMVersion()));
 
-               SandeshaUtil.executeAndStore(rmMsgCtx, key);
+               // Send this outgoing message
+               sendOutgoingMessage(rmMsgCtx, 
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
 
-               retramsmitterMgr.insert(closeBean);
+               // Pause the message context
+               rmMsgCtx.pause();
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=482691&r1=482690&r2=482691
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Tue Dec  5 07:43:24 2006
@@ -21,9 +21,7 @@
 
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
@@ -37,16 +35,12 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
@@ -55,6 +49,7 @@
 import org.apache.sandesha2.util.SequenceManager;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.util.WSRMMessageSender;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
@@ -62,7 +57,7 @@
  * Responsible for processing an incoming Terminate Sequence message.
  */
 
-public class TerminateSeqMsgProcessor implements MsgProcessor {
+public class TerminateSeqMsgProcessor extends WSRMMessageSender implements 
MsgProcessor {
 
        private static final Log log = 
LogFactory.getLog(TerminateSeqMsgProcessor.class);
 
@@ -312,49 +307,12 @@
                if (log.isDebugEnabled())
                        log.debug("Enter: 
TerminateSeqMsgProcessor::processOutMessage");
 
-               MessageContext msgContext = rmMsgCtx.getMessageContext();
-               ConfigurationContext configurationContext = 
msgContext.getConfigurationContext();
-               Options options = msgContext.getOptions();
-
-               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,
-                               configurationContext.getAxisConfiguration());
-
-               SequencePropertyBeanMgr seqPropMgr = 
storageManager.getSequencePropertyBeanMgr();
-
-               String toAddress = rmMsgCtx.getTo().getAddress();
-               String sequenceKey = (String) 
options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
-               String internalSequenceID = 
SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
-
-               // Does the sequence exist ?
-               boolean sequenceExists = false;
-               String outSequenceID = null;
-               
-               // Get the Create sequence bean with the matching internal 
sequenceid 
-               CreateSeqBean createSeqFindBean = new CreateSeqBean();
-               createSeqFindBean.setInternalSequenceID(internalSequenceID);
-
-               CreateSeqBean createSeqBean = 
storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
-               
-               if (createSeqBean == null)
-               {
-                       if (log.isDebugEnabled())
-                               log.debug("Exit: 
TerminateSeqMsgProcessor::processOutMessage Sequence doesn't exist");
-                       
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
-                                       
SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));     
                
-               }
-               
-               if (createSeqBean.getSequenceID() != null)
-               {
-                       sequenceExists = true;          
-                       outSequenceID = createSeqBean.getSequenceID();
-               }
-               else
-                       outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;    
                
+               // Get the parent processor to setup the out message
+               setupOutMessage(rmMsgCtx);
                
                // Check if the sequence is already terminated (stored on the 
internal sequenceid)
-               String terminated = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, storageManager);
+               String terminated = 
SandeshaUtil.getSequenceProperty(getInternalSequenceID(),
+                               
Sandesha2Constants.SequenceProperties.TERMINATE_ADDED, getStorageManager());
 
                if (terminated != null && "true".equals(terminated)) {
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
@@ -367,94 +325,36 @@
                AxisOperation terminateOp = 
SpecSpecificConstants.getWSRMOperation(
                                Sandesha2Constants.MessageTypes.TERMINATE_SEQ,
                                rmMsgCtx.getRMSpecVersion(),
-                               msgContext.getAxisService());
+                               getMsgContext().getAxisService());
                OperationContext opcontext = OperationContextFactory
                                .createOperationContext(
                                                
WSDL20_2004Constants.MEP_CONSTANT_OUT_IN, terminateOp);
-               opcontext.setParent(msgContext.getServiceContext());
-               
configurationContext.registerOperationContext(rmMsgCtx.getMessageId(),  
opcontext);
+               opcontext.setParent(getMsgContext().getServiceContext());
+               
getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),     
opcontext);
 
-               msgContext.setOperationContext(opcontext);
-               msgContext.setAxisOperation(terminateOp);
+               getMsgContext().setOperationContext(opcontext);
+               getMsgContext().setAxisOperation(terminateOp);
 
                TerminateSequence terminateSequencePart = (TerminateSequence) 
rmMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
-               
terminateSequencePart.getIdentifier().setIndentifer(outSequenceID);
-
-               rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
-               
msgContext.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-               rmMsgCtx.setTo(new EndpointReference(toAddress));
-
-               String rmVersion = 
SandeshaUtil.getRMVersion(internalSequenceID, storageManager);
-               if (rmVersion == null)
-                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
-
-               
rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
-               
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
-               String transportTo = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
-                               
Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
-               if (transportTo != null) {
-                       
rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
-               }               
-               
-               //setting msg context properties
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, 
outSequenceID);
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
-               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY
 , sequenceKey);
-
-               try {
-                       rmMsgCtx.addSOAPEnvelope();
-               } catch (AxisFault e) {
-                       throw new SandeshaException(e.getMessage(),e);
-               }
+               
terminateSequencePart.getIdentifier().setIndentifer(getOutSequenceID());
 
-               String key = SandeshaUtil.getUUID();
+               
rmMsgCtx.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(getRMVersion()));
+               
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(getRMVersion()));
+               
+               SequencePropertyBean terminateAdded = new 
SequencePropertyBean();
+               
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+               terminateAdded.setSequencePropertyKey(getInternalSequenceID());
+               terminateAdded.setValue("true");
 
-               SenderBean terminateBean = new SenderBean();
-               
terminateBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
-               terminateBean.setMessageContextRefKey(key);
+               
getStorageManager().getSequencePropertyBeanMgr().insert(terminateAdded);
 
+               // Send the outgoing message
                // Set a retransmitter lastSentTime so that terminate will be 
send with
                // some delay.
                // Otherwise this get send before return of the current request 
(ack).
                // TODO: refine the terminate delay.
-               terminateBean.setTimeToSend(System.currentTimeMillis() + 
Sandesha2Constants.TERMINATE_DELAY);
-
-               terminateBean.setMessageID(msgContext.getMessageID());
-               
-               // Set the internal sequence id and outgoing sequence id for 
the terminate message
-               terminateBean.setInternalSequenceID(internalSequenceID);
-               if (sequenceExists)
-                 terminateBean.setSequenceID(outSequenceID);
-               
-               EndpointReference to = msgContext.getTo();
-               if (to!=null)
-                       terminateBean.setToAddress(to.getAddress());
-               
-               // this will be set to true at the sender.
-               if (sequenceExists)
-                       terminateBean.setSend(true);
-               else
-                       terminateBean.setSend(false);
-
-               
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-
-               terminateBean.setReSend(false);
-
-               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
-
-               SequencePropertyBean terminateAdded = new 
SequencePropertyBean();
-               
terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-               terminateAdded.setSequencePropertyKey(internalSequenceID);
-               terminateAdded.setValue("true");
-
-               seqPropMgr.insert(terminateAdded);
-               
-               SandeshaUtil.executeAndStore(rmMsgCtx, key);
-       
-               retramsmitterMgr.insert(terminateBean);
+               sendOutgoingMessage(rmMsgCtx, 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ, 
Sandesha2Constants.TERMINATE_DELAY);               
 
                // Pause the message context
                rmMsgCtx.pause();
@@ -463,5 +363,4 @@
                        log.debug("Exit: 
TerminateSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
                return true;
        }
-
 }

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java?view=auto&rev=482691
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/WSRMMessageSender.java
 Tue Dec  5 07:43:24 2006
@@ -0,0 +1,217 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.CreateSeqBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+
+public class WSRMMessageSender  {
+
+       private static final Log log = 
LogFactory.getLog(WSRMMessageSender.class);
+       
+       private MessageContext msgContext;
+       private StorageManager storageManager;
+       private ConfigurationContext configurationContext;
+       private String toAddress;
+       private String sequenceKey;
+       private String internalSequenceID;
+       private boolean sequenceExists;
+       private String outSequenceID;
+       private String rmVersion;
+       
+       /**
+        * Extracts information from the rmMsgCtx specific for processing out 
messages
+        * 
+        * @param rmMsgCtx
+        * @throws AxisFault
+        */
+       protected void setupOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
WSRMParentProcessor::setupOutMessage");
+
+               msgContext = rmMsgCtx.getMessageContext();
+               configurationContext = msgContext.getConfigurationContext();
+               Options options = msgContext.getOptions();
+
+               storageManager = 
SandeshaUtil.getSandeshaStorageManager(configurationContext,
+                               configurationContext.getAxisConfiguration());
+
+               toAddress = rmMsgCtx.getTo().getAddress();
+               sequenceKey = (String) 
options.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+               internalSequenceID = 
SandeshaUtil.getInternalSequenceID(toAddress, sequenceKey);
+
+               // Does the sequence exist ?
+               sequenceExists = false;
+               outSequenceID = null;
+               
+               // Get the Create sequence bean with the matching internal 
sequenceid 
+               CreateSeqBean createSeqFindBean = new CreateSeqBean();
+               createSeqFindBean.setInternalSequenceID(internalSequenceID);
+
+               CreateSeqBean createSeqBean = 
storageManager.getCreateSeqBeanMgr().findUnique(createSeqFindBean);
+               
+               if (createSeqBean == null)
+               {
+                       if (log.isDebugEnabled())
+                               log.debug("Exit: 
WSRMParentProcessor::setupOutMessage Sequence doesn't exist");
+                       
+                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(
+                                       
SandeshaMessageKeys.couldNotSendTerminateSeqNotFound, internalSequenceID));     
                
+               }
+               
+               if (createSeqBean.getSequenceID() != null)
+               {
+                       sequenceExists = true;          
+                       outSequenceID = createSeqBean.getSequenceID();
+               }
+               else
+                       outSequenceID = Sandesha2Constants.TEMP_SEQUENCE_ID;    
                
+
+               String rmVersion = 
SandeshaUtil.getRMVersion(getInternalSequenceID(), getStorageManager());
+               if (rmVersion == null)
+                       throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotDecideRMVersion));
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: WSRMParentProcessor::setupOutMessage");
+  }
+       
+       
+       protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, 
long delay) throws AxisFault {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: 
WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
+               
+               rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
+               
getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, 
"true");
+
+               rmMsgCtx.setTo(new EndpointReference(toAddress));
+               
+               String transportTo = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
+                               
Sandesha2Constants.SequenceProperties.TRANSPORT_TO, storageManager);
+               if (transportTo != null) {
+                       
rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
+               }               
+               
+               //setting msg context properties
+               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, 
outSequenceID);
+               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
+               
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY
 , sequenceKey);
+
+               rmMsgCtx.addSOAPEnvelope();
+
+               // Ensure the outbound message us secured using the correct 
token
+               String tokenData = 
SandeshaUtil.getSequenceProperty(internalSequenceID,
+                               
Sandesha2Constants.SequenceProperties.SECURITY_TOKEN,
+                               storageManager);
+               if(tokenData != null) {
+                       SecurityManager secMgr = 
SandeshaUtil.getSecurityManager(configurationContext);
+                       SecurityToken token = 
secMgr.recoverSecurityToken(tokenData);
+                       secMgr.applySecurityToken(token, msgContext);
+               }
+               
+               String key = SandeshaUtil.getUUID();
+
+               SenderBean senderBean = new SenderBean();
+               senderBean.setMessageType(msgType);
+               senderBean.setMessageContextRefKey(key);
+               senderBean.setTimeToSend(System.currentTimeMillis() + delay);
+               senderBean.setMessageID(msgContext.getMessageID());
+               
+               // Set the internal sequence id and outgoing sequence id for 
the terminate message
+               senderBean.setInternalSequenceID(internalSequenceID);
+               if (sequenceExists)
+               {
+                       senderBean.setSend(true);
+                       senderBean.setSequenceID(outSequenceID);
+               }
+               else
+                       senderBean.setSend(false);                      
+               
+               EndpointReference to = msgContext.getTo();
+               if (to!=null)
+                       senderBean.setToAddress(to.getAddress());
+               
+               
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
+
+               senderBean.setReSend(false);
+
+               SenderBeanMgr retramsmitterMgr = 
storageManager.getRetransmitterBeanMgr();
+               
+               SandeshaUtil.executeAndStore(rmMsgCtx, key);
+       
+               retramsmitterMgr.insert(senderBean);
+               
+               if (log.isDebugEnabled())
+                       log.debug("Exit: 
WSRMParentProcessor::sendOutgoingMessage");
+
+       }
+       
+
+       public final StorageManager getStorageManager() {
+       return storageManager;
+  }
+
+       public final String getInternalSequenceID() {
+       return internalSequenceID;
+  }
+
+       public final MessageContext getMsgContext() {
+       return msgContext;
+  }
+
+       public final String getOutSequenceID() {
+       return outSequenceID;
+  }
+
+       public final boolean isSequenceExists() {
+       return sequenceExists;
+  }
+
+       public final String getSequenceKey() {
+       return sequenceKey;
+  }
+
+       public final String getToAddress() {
+       return toAddress;
+  }
+
+       public final ConfigurationContext getConfigurationContext() {
+       return configurationContext;
+  }
+
+       public final String getRMVersion() {
+       return rmVersion;
+  }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to