Author: mlovett
Date: Thu Feb 15 06:20:22 2007
New Revision: 507936

URL: http://svn.apache.org/viewvc?view=rev&rev=507936
Log:
Improve performance of MakeConnection by polling less often, and paying 
attention to MessagePending headers

Added:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
   (with props)
Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
 Thu Feb 15 06:20:22 2007
@@ -502,6 +502,7 @@
                static final String INBOUND_SEQUENCE_ID    = 
"Sandesha2InboundSequenceId";
                static final String INBOUND_MESSAGE_NUMBER = 
"Sandesha2InboundMessageNumber";
                static final String INBOUND_LAST_MESSAGE   = 
"Sandesha2InboundLastMessage";
+               static final String MAKECONNECTION_ENTRY   = 
"Sandesha2MakeConnectionEntry";
        }
     
     public interface Assertions {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
 Thu Feb 15 06:20:22 2007
@@ -34,6 +34,7 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
 import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
+import org.apache.sandesha2.msgprocessors.MessagePendingProcessor;
 import org.apache.sandesha2.msgprocessors.SequenceProcessor;
 import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
@@ -115,6 +116,10 @@
                        if(reqProcessor.processAckRequestedHeaders(rmMsgCtx)){
                                returnValue = InvocationResponse.SUSPEND;
                        }
+                       
+                       // Process MessagePending headers
+                       MessagePendingProcessor pendingProcessor = new 
MessagePendingProcessor();
+                       pendingProcessor.processMessagePendingHeaders(rmMsgCtx);
 
                        // Process the Sequence header, if there is one
                        SequenceProcessor seqProcessor = new 
SequenceProcessor();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Thu Feb 15 06:20:22 2007
@@ -20,13 +20,11 @@
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.ServiceContext;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
@@ -38,7 +36,6 @@
 import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
@@ -90,15 +87,13 @@
 
                // Re-write the WS-A anonymous URI, if we support the RM 
anonymous URI. We only
                // need to rewrite the replyTo EPR if we have an out-in MEP.
-               SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
-               if(policy.isEnableRMAnonURI()) {
-                       AxisOperation op = msgContext.getAxisOperation();
-                       if(op != null) {
-                               int mep = op.getAxisSpecifMEPConstant();
-                               if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
-                                       EndpointReference replyTo = 
rewriteEPR(msgContext.getReplyTo(), msgContext);
-                                       msgContext.setReplyTo(replyTo);
-                               }
+               AxisOperation op = msgContext.getAxisOperation();
+               int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+               if(op != null) {
+                       mep = op.getAxisSpecifMEPConstant();
+                       if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+                               EndpointReference replyTo = 
SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
+                               msgContext.setReplyTo(replyTo);
                        }
                }
                
@@ -279,9 +274,14 @@
                // set this as the response highest message.
                rmsBean.setHighestOutMessageNumber(messageNumber);
                
-               // saving the used message number
-               if (!dummyMessage)
+               // saving the used message number, and the expected reply count
+               if (!dummyMessage) {
                        rmsBean.setNextMessageNumber(messageNumber);
+                       if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+                               long expectedReplies = 
rmsBean.getExpectedReplies();
+                               rmsBean.setExpectedReplies(expectedReplies + 1);
+                       }
+               }
                
                RelatesTo relatesTo = msgContext.getRelatesTo();
                if(relatesTo != null) {
@@ -491,45 +491,6 @@
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::processResponseMessage");
-       }
-
-       private EndpointReference rewriteEPR(EndpointReference epr, 
MessageContext mc)
-       throws SandeshaException
-       {
-               if (log.isDebugEnabled())
-                       log.debug("Exit: SandeshaOutHandler::rewriteEPR " + 
epr);
-
-               // Handle EPRs that have not yet been set. These are 
effectively WS-A anon, and therefore
-               // we can rewrite them.
-               if(epr == null) epr = new EndpointReference(null);
-               
-               String address = epr.getAddress();
-               if(address == null ||
-                  AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) 
||
-                  
AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
-                       // We use the service context to co-ordinate the RM 
anon uuid, so that several
-                       // invocations of the same target will yield stable 
replyTo addresses.
-                       String uuid = null;
-                       ServiceContext sc = mc.getServiceContext();
-                       if(sc == null) {
-                               String msg = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
-                               throw new SandeshaException(msg);
-                       }
-                       synchronized (sc) {
-                               uuid = (String) 
sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
-                               if(uuid == null) {
-                                       uuid = SandeshaUtil.getUUID();
-                                       
sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
-                               }
-                       }
-                       
-                       if(log.isDebugEnabled()) log.debug("Rewriting EPR with 
UUID " + uuid);
-                       
epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
-               }
-               
-               if (log.isDebugEnabled())
-                       log.debug("Exit: SandeshaOutHandler::rewriteEPR " + 
epr);
-               return epr;
        }
 
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Thu Feb 15 06:20:22 2007
@@ -164,6 +164,7 @@
                        rMDBean.setAcksToEPR(acksToEPR.getAddress());
                        rMDBean.setSequenceID(rmsBean.getOfferedSequence());
                        rMDBean.setNextMsgNoToProcess(1);
+                       rMDBean.setOutboundSequence(rmsBean.getSequenceID());
 
                        //Storing the referenceMessage of the sending side 
sequence as the reference message
                        //of the receiving side as well.

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
 Thu Feb 15 06:20:22 2007
@@ -10,7 +10,6 @@
 import org.apache.axis2.context.ContextFactory;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.commons.logging.Log;

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
 Thu Feb 15 06:20:22 2007
@@ -2,43 +2,35 @@
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.polling.PollingManager;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SequenceEntry;
 import org.apache.sandesha2.wsrm.MessagePending;
-import org.apache.sandesha2.wsrm.Sequence;
 
 public class MessagePendingProcessor {
 
        private static final Log log = 
LogFactory.getLog(MessagePendingProcessor.class);
        
-       public boolean processMessagePendingHeaders (MessageContext message) 
throws AxisFault {
+       public void processMessagePendingHeaders(RMMsgContext message) throws 
AxisFault {
                
                if (log.isDebugEnabled())
                        log.debug("Enter: 
MessagePendingProcessor::processMessagePendingHeaders");
 
-               boolean messagePaused = false;
-               
-               RMMsgContext rmMsgContext = 
MsgInitializer.initializeMessage(message);
-               Sequence sequence = (Sequence) 
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-               MessagePending messagePending = (MessagePending) 
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
-               
-               if (sequence!=null) {
-                       String sequenceId = 
sequence.getIdentifier().getIdentifier();
-                       
-                       if (messagePending!=null) {
-                               boolean pending = messagePending.isPending();
-                               if (pending) {
-                                       ConfigurationContext context = 
rmMsgContext.getConfigurationContext();
+               MessagePending messagePending = (MessagePending) 
message.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+               if (messagePending!=null) {
+                       boolean pending = messagePending.isPending();
+                       if (pending) {
+                               SequenceEntry entry = (SequenceEntry) 
message.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+                               if(entry != null) {
+                                       ConfigurationContext context = 
message.getConfigurationContext();
                                        StorageManager storage = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
                                        PollingManager pollingManager = 
storage.getPollingManager();
-                                       if(pollingManager != null) 
pollingManager.schedulePollingRequest(sequenceId, false);
+                                       if(pollingManager != null) 
pollingManager.schedulePollingRequest(entry.getSequenceId(), 
entry.isRmSource());
                                }
                        }
                }
@@ -47,8 +39,6 @@
                
                if (log.isDebugEnabled())
                        log.debug("Exit: 
MessagePendingProcessor::processMessagePendingHeaders");
-
-               return messagePaused;
        }
 
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Thu Feb 15 06:20:22 2007
@@ -43,6 +43,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
@@ -263,6 +264,18 @@
                if (!msgNoPresentInList)
                {
                        serverCompletedMessageRanges.addRange(new Range(msgNo));
+               }
+               
+               // If the message is a reply to an outbound message then we can 
update the RMSBean that
+               // matches.
+               String outboundSequence = bean.getOutboundSequence();
+               if(outboundSequence != null) {
+                       RMSBean outBean = 
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outboundSequence);
+                       if(outBean != null && outBean.getExpectedReplies() > 0 
) {
+                               
outBean.setExpectedReplies(outBean.getExpectedReplies() - 1);
+                               RMSBeanMgr outMgr = 
storageManager.getRMSBeanMgr();
+                               outMgr.update(outBean);
+                       }
                }
                
                // Update the RMD bean

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 Thu Feb 15 06:20:22 2007
@@ -23,6 +23,7 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -37,10 +38,12 @@
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SequenceEntry;
 
 /**
  * This class is responsible for sending MakeConnection requests. This is a 
seperate thread that
@@ -68,11 +71,14 @@
                Transaction t = null;
                try {
                        // If we have request scheduled, handle them first, and 
then pick
-                       // pick a sequence using a round-robin approach.
+                       // pick a sequence using a round-robin approach. 
Scheduled polls
+                       // bypass the normal polling checks, to make sure that 
they happen
+                       boolean forcePoll = false;
                        SequenceEntry entry = null;
                        synchronized (this) {
                                if(!scheduledPollingRequests.isEmpty()) {
                                        entry = (SequenceEntry) 
scheduledPollingRequests.removeFirst();
+                                       forcePoll = true;
                                }
                        }
                        if(entry == null) {
@@ -93,9 +99,9 @@
 
                        t = storageManager.getTransaction();
                        if(entry.isRmSource()) {
-                               pollRMSSide(entry);
+                               pollRMSSide(entry, forcePoll);
                        } else {
-                               pollRMDSide(entry);
+                               pollRMDSide(entry, forcePoll);
                        }
                        if(t != null) t.commit();
                        t = null;
@@ -115,8 +121,8 @@
                return false;
        }
        
-       private void pollRMSSide(SequenceEntry entry) throws AxisFault {
-               if(log.isDebugEnabled()) log.debug("Enter: 
PollingManager::pollRMSSide");
+       private void pollRMSSide(SequenceEntry entry, boolean force) throws 
AxisFault {
+               if(log.isDebugEnabled()) log.debug("Enter: 
PollingManager::pollRMSSide, force: " + force);
                
                RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
                RMSBean findRMS = new RMSBean();
@@ -129,14 +135,19 @@
                        // This sequence must have been terminated, or deleted
                        stopThreadForSequence(entry.getSequenceId(), true);
                } else {
-                       pollForSequence(beanToPoll.getSequenceID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll);
+                       // The sequence is there, but we still only poll if we 
are expecting reply messages,
+                       // or if we don't have clean ack state.
+                       boolean cleanAcks = 
AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(),
 beanToPoll.getNextMessageNumber());
+                       long  repliesExpected = beanToPoll.getExpectedReplies();
+                       if(force ||     !cleanAcks || repliesExpected > 0)
+                               pollForSequence(beanToPoll.getSequenceID(), 
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), 
beanToPoll, entry);
                }
 
                if(log.isDebugEnabled()) log.debug("Exit: 
PollingManager::pollRMSSide");
        }
 
-       private void pollRMDSide(SequenceEntry entry) throws AxisFault {
-               if(log.isDebugEnabled()) log.debug("Enter: 
PollingManager::pollRMDSide");
+       private void pollRMDSide(SequenceEntry entry, boolean force) throws 
AxisFault {
+               if(log.isDebugEnabled()) log.debug("Enter: 
PollingManager::pollRMDSide, force: " + force);
                RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
                RMDBean findBean = new RMDBean();
                findBean.setPollingMode(true);
@@ -148,7 +159,22 @@
                        // This sequence must have been terminated, or deleted
                        stopThreadForSequence(entry.getSequenceId(), false);
                } else {
-                       pollForSequence(nextMsgBean.getSequenceID(), 
nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
+                       // The sequence is still there, but if we have a 
running related sequence
+                       // that is not expecting replies then there is no need 
to poll.
+                       boolean doPoll = true;
+                       String outboundSequence = 
nextMsgBean.getOutboundSequence();
+                       if(outboundSequence != null) {
+                               RMSBean findRMS = new RMSBean();
+                               findRMS.setSequenceID(outboundSequence);
+                               findRMS.setTerminated(false);
+                               RMSBeanMgr mgr = storageManager.getRMSBeanMgr();
+                               RMSBean outbound = mgr.findUnique(findRMS);
+                               if(outbound != null && 
outbound.getExpectedReplies() == 0) {
+                                       doPoll = false;
+                               }
+                       }
+                       if(force || doPoll)
+                               pollForSequence(nextMsgBean.getSequenceID(), 
nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean, 
entry);
                }
 
                if(log.isDebugEnabled()) log.debug("Exit: 
PollingManager::pollRMDSide");
@@ -157,25 +183,34 @@
        private void pollForSequence(String sequenceId,
                                                                 String 
sequencePropertyKey,
                                                                 String 
referenceMsgKey,
-                                                                RMSequenceBean 
rmBean)
+                                                                RMSequenceBean 
rmBean,
+                                                                SequenceEntry 
entry)
        throws SandeshaException, SandeshaStorageException, AxisFault
        {
                if(log.isDebugEnabled()) log.debug("Enter: 
PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey + 
", " + referenceMsgKey + ", " + rmBean);
                
                //create a MakeConnection message  
                String replyTo = rmBean.getReplyToEPR();
-               String WSRMAnonReplyToURI = null;
+               String wireSeqId = null;
+               String wireAddress = null;
                if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
                        // If we are polling on a RM anon URI then we don't 
want to include the sequence id
                        // in the MakeConnection message.
-                       sequenceId = null;
-                       WSRMAnonReplyToURI = replyTo;
+                       wireAddress = replyTo;
+               } else {
+                       wireSeqId = sequenceId;
                }
                
                MessageContext referenceMessage = 
storageManager.retrieveMessageContext(referenceMsgKey,context);
                RMMsgContext referenceRMMessage = 
MsgInitializer.initializeMessage(referenceMessage);
                RMMsgContext makeConnectionRMMessage = 
RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
-                               rmBean, sequenceId, WSRMAnonReplyToURI, 
storageManager);
+                               rmBean, wireSeqId, wireAddress, storageManager);
+               
+               // Store properties so that we know which sequence we are 
polling for. This can be used
+               // to match reply sequences up to requests, as well as to help 
process messagePending
+               // headers.
+               OperationContext ctx = 
makeConnectionRMMessage.getMessageContext().getOperationContext();
+               
ctx.setProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY,
 entry);
                
                
makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
                //storing the MakeConnection message.

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
 Thu Feb 15 06:20:22 2007
@@ -58,6 +58,12 @@
         * To Address of the messages that will be received for this sequence.
         */
        private String toAddress;
+       
+       /**
+        * Client side, we keep track of inbound and outbound sequence pairs. 
Each
+        * inbound sequence has the identifier of the associated outbound 
sequence.
+        */
+       private String outboundSequence;
 
        /**
         * Comment for <code>nextMsgNoToProcess</code>
@@ -157,6 +163,15 @@
                this.toAddress = toAddress;
        }
 
+       public String getOutboundSequence() {
+               return outboundSequence;
+       }
+
+       public void setOutboundSequence(String outboundSequence) {
+               this.outboundSequence = outboundSequence;
+       }
+
+
        public String toString() {
                StringBuffer result = new StringBuffer();
                result.append(this.getClass().getName());
@@ -195,6 +210,9 @@
                        equal = false;
 
                else if(bean.getToAddress() != null && 
!bean.getToAddress().equals(this.getToAddress()))
+                       equal = false;
+               
+               else if(bean.getOutboundSequence() != null && 
!bean.getOutboundSequence().equals(this.getOutboundSequence()))
                        equal = false;
                
                else if ((bean.rmdFlags & NEXT_MSG_NO_FLAG) != 0 && 
bean.getNextMsgNoToProcess() != this.getNextMsgNoToProcess())

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
 Thu Feb 15 06:20:22 2007
@@ -122,6 +122,11 @@
        private long numberOfMessagesAcked = 0;
 
        /**
+        * The number of reply messages that we expect
+        */
+       private long expectedReplies = 0;
+       
+       /**
         * Flags that are used to check if the primitive types on this bean
         * have been set. If a primitive type has not been set then it will
         * be ignored within the match method.
@@ -136,6 +141,7 @@
        private static final int SEQ_CLOSED_CLIENT_FLAG    = 0x01000000;
        private static final int ACKED_MESSAGES_FLAG       = 0x10000000;
        private static final int TERM_PAUSER_FOR_CS        = 0x00000002;
+       private static final int EXPECTED_REPLIES          = 0x00000020;
 
   /**
    * In WSRM Anon URI scenario, we may not want to terminate a perticular 
sequence until the CreateSequence has been received
@@ -310,6 +316,15 @@
        }
 
 
+       public long getExpectedReplies() {
+               return expectedReplies;
+       }
+
+       public void setExpectedReplies(long expectedReplies) {
+               this.expectedReplies = expectedReplies;
+               this.rmsFlags |= EXPECTED_REPLIES;
+       }
+
        public String toString() {
                StringBuffer result = new StringBuffer();
                result.append(this.getClass().getName());
@@ -327,6 +342,7 @@
                result.append("\nTimedOut         : "); result.append(timedOut);
                result.append("\nClosedClient     : "); 
result.append(sequenceClosedClient);
                result.append("\nNumAckedMsgs     : "); 
result.append(numberOfMessagesAcked);
+               result.append("\nExpectedReplies  : "); 
result.append(expectedReplies);
                result.append("\nTransportTo      : "); 
result.append(transportTo);
                result.append("\nOfferedEndPoint  : "); 
result.append(offeredEndPoint);
                result.append("\nOfferedSequence  : "); 
result.append(offeredSequence);
@@ -403,7 +419,11 @@
                
                else if((bean.rmsFlags & TERM_PAUSER_FOR_CS) != 0 && 
bean.isTerminationPauserForCS() != this.isTerminationPauserForCS())
                        match = false;
-               
+
+               else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && 
bean.getExpectedReplies() != this.getExpectedReplies())
+                       match = false;
+
                return match;
        }
+
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Thu Feb 15 06:20:22 2007
@@ -941,4 +941,53 @@
     String stackTrace = baos.toString();
     return stackTrace;
        }
+
+       public static EndpointReference rewriteEPR(EndpointReference epr, 
MessageContext mc)
+       throws SandeshaException
+       {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
+
+               ConfigurationContext configContext = 
mc.getConfigurationContext();
+               SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
+               if(!policy.isEnableRMAnonURI()) {
+                       if (log.isDebugEnabled())
+                               log.debug("Exit: SandeshaUtil::rewriteEPR, anon 
uri is disabled");
+                       return epr;
+               }
+
+               // Handle EPRs that have not yet been set. These are 
effectively WS-A anon, and therefore
+               // we can rewrite them.
+               if(epr == null) epr = new EndpointReference(null);
+               
+               String address = epr.getAddress();
+               if(address == null ||
+                  AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) 
||
+                  
AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
+                       // We use the service context to co-ordinate the RM 
anon uuid, so that several
+                       // invocations of the same target will yield stable 
replyTo addresses.
+                       String uuid = null;
+                       ServiceContext sc = mc.getServiceContext();
+                       if(sc == null) {
+                               String msg = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
+                               throw new SandeshaException(msg);
+                       }
+                       synchronized (sc) {
+                               uuid = (String) 
sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
+                               if(uuid == null) {
+                                       uuid = SandeshaUtil.getUUID();
+                                       
sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
+                               }
+                       }
+                       
+                       if(log.isDebugEnabled()) log.debug("Rewriting EPR with 
UUID " + uuid);
+                       
epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
+               }
+               
+               if (log.isDebugEnabled())
+                       log.debug("Exit: SandeshaUtil::rewriteEPR " + epr);
+               return epr;
+       }
+
+
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 Thu Feb 15 06:20:22 2007
@@ -31,6 +31,7 @@
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.workers.SequenceEntry;
 import org.apache.sandesha2.wsrm.CreateSequence;
 
 /**
@@ -82,6 +83,13 @@
                }
 
                MessageContext createSeqContext = 
createSequenceMsg.getMessageContext();
+               
+               // If this create is the result of a MakeConnection, then we 
must have a related
+               // outbound sequence.
+               SequenceEntry entry = (SequenceEntry) 
createSeqContext.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+               if(entry != null && entry.isRmSource()) {
+                       rmdBean.setOutboundSequence(entry.getSequenceId());
+               }
 
                rmdBean.setServerCompletedMessages(new RangeString());
                
@@ -260,7 +268,11 @@
                                }
                        }
                }
-               // Store both the acksTo and replyTo
+               // In case either of the replyTo or AcksTo is anonymous, 
rewrite them using the AnonURI template
+               replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR, 
firstAplicationMsgCtx);
+               acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR, 
firstAplicationMsgCtx);
+               
+               // Store both the acksTo and replyTo 
                if(replyToEPR != null) 
rmsBean.setReplyToEPR(replyToEPR.getAddress());
                if(acksToEPR  != null) 
rmsBean.setAcksToEPR(acksToEPR.getAddress());
                

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 Thu Feb 15 06:20:22 2007
@@ -299,42 +299,4 @@
                        }
                }
        }
-       
-       protected class SequenceEntry {
-               private String  sequenceId;
-               private boolean rmSource;
-               
-               public SequenceEntry(String sequenceId, boolean rmSource) {
-                       this.sequenceId = sequenceId;
-                       this.rmSource = rmSource;
-               }
-               public boolean isRmSource() {
-                       return rmSource;
-               }
-               public String getSequenceId() {
-                       return sequenceId;
-               }
-
-
-               public boolean equals(Object o) {
-                       if(o == null) return false;
-                       if(o == this) return true;
-                       if(o.getClass() != getClass()) return false;
-                       
-                       SequenceEntry other = (SequenceEntry) o;
-                       if(sequenceId != null) {
-                               if(!sequenceId.equals(other.sequenceId)) return 
false;
-                       } else {
-                               if(other.sequenceId != null) return false;
-                       }
-                       
-                       return rmSource == other.rmSource;
-               }
-               public int hashCode() {
-                       int result = 1;
-                       if(sequenceId != null) result = sequenceId.hashCode();
-                       if(rmSource) result = -result;
-                       return result;
-               }
-       }
 }

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java?view=auto&rev=507936
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
 Thu Feb 15 06:20:22 2007
@@ -0,0 +1,59 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.workers;
+
+import java.io.Serializable;
+
+public class SequenceEntry implements Serializable {
+       private static final long serialVersionUID = -6823171634616402792L;
+
+       private String  sequenceId;
+       private boolean rmSource;
+       
+       public SequenceEntry(String sequenceId, boolean rmSource) {
+               this.sequenceId = sequenceId;
+               this.rmSource = rmSource;
+       }
+       public boolean isRmSource() {
+               return rmSource;
+       }
+       public String getSequenceId() {
+               return sequenceId;
+       }
+
+
+       public boolean equals(Object o) {
+               if(o == null) return false;
+               if(o == this) return true;
+               if(o.getClass() != getClass()) return false;
+               
+               SequenceEntry other = (SequenceEntry) o;
+               if(sequenceId != null) {
+                       if(!sequenceId.equals(other.sequenceId)) return false;
+               } else {
+                       if(other.sequenceId != null) return false;
+               }
+               
+               return rmSource == other.rmSource;
+       }
+       public int hashCode() {
+               int result = 1;
+               if(sequenceId != null) result = sequenceId.hashCode();
+               if(rmSource) result = -result;
+               return result;
+       }
+}

Propchange: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to