Author: mlovett
Date: Wed Feb  7 02:45:23 2007
New Revision: 504494

URL: http://svn.apache.org/viewvc?view=rev&rev=504494
Log:
Make the Sender take each sequence in turn, to avoid starving them

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -263,8 +263,6 @@
                        // inserting the new ack.
                        retransmitterBeanMgr.insert(ackBean);
 
-                       
SandeshaUtil.startSenderForTheSequence(configurationContext, sequenceId);
-
                        msgContext.pause();
 
                        if (log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -217,7 +217,7 @@
                        
                        if (rMDBean!=null && rMDBean.isPollingMode()) {
                                PollingManager manager = 
storageManager.getPollingManager();
-                               
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+                               if(manager != null) 
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
                        }
                }
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -180,8 +180,6 @@
                if (internalSequenceId!=null)
                        
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
 
-               String sequencePropertyKey = 
SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
                /*
                 * checking weather the user has given the messageNumber (most 
of the
                 * cases this will not be the case where the system will 
generate the
@@ -284,7 +282,7 @@
 
                        // if first message - setup the sending side sequence - 
both for the
                        // server and the client sides
-                       rmsBean = 
SequenceManager.setupNewClientSequence(msgContext, sequencePropertyKey, 
specVersion, storageManager);
+                       rmsBean = 
SequenceManager.setupNewClientSequence(msgContext, specVersion, storageManager);
                        
                        EndpointReference acksToEPR = null;
 
@@ -331,32 +329,29 @@
                                if (transportIn == null)
                                        transportIn = 
org.apache.axis2.Constants.TRANSPORT_HTTP;
                        } else if (acksToEPR == null && serverSide) {
-//                                     String incomingSequencId = SandeshaUtil
-//                                                     
.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
-                                       
                                try {
                                        MessageContext requestMsgContext = 
operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
                                        RMMsgContext requestRMMsgContext = 
MsgInitializer.initializeMessage(requestMsgContext);
-                                                                               
        
-                                       String requestSideSequencePropertyKey = 
SandeshaUtil.getSequencePropertyKey(requestRMMsgContext);
-                                       RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, 
requestSideSequencePropertyKey);
-                                               
-                                       if (rmdBean != null && 
rmdBean.getReplyToEPR() != null) {
-                                               String beanAcksToValue = 
rmdBean.getReplyToEPR();
-                                               if (beanAcksToValue != null)
-                                                       acksToEPR = new 
EndpointReference(beanAcksToValue);
+                                       Sequence sequence = (Sequence) 
requestRMMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                                       if(sequence != null) {
+                                               String id = 
sequence.getIdentifier().getIdentifier();
+                                               RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, id);
+                                                       
+                                               if (rmdBean != null && 
rmdBean.getReplyToEPR() != null) {
+                                                       String beanAcksToValue 
= rmdBean.getReplyToEPR();
+                                                       if (beanAcksToValue != 
null)
+                                                               acksToEPR = new 
EndpointReference(beanAcksToValue);
+                                               }
                                        }
                                } catch (AxisFault e) {
                                        throw new SandeshaException (e);
                                }
                        }
-                       rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, 
sequencePropertyKey ,internalSequenceId, acksToEPR, storageManager);
+                       rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, 
internalSequenceId, acksToEPR, storageManager);
                }
                
                if (rmsBean == null) {
-                       RMSBean findBean = new RMSBean();
-                       findBean.setInternalSequenceID(internalSequenceId);
-                       rmsBean = 
storageManager.getRMSBeanMgr().findUnique(findBean);
+                       rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
                }
                
                // the message number that was last used.
@@ -396,11 +391,6 @@
                if (!dummyMessage)
                        rmsBean.setNextMessageNumber(messageNumber);
                
-               if (messageNumber == 1 && !sendCreateSequence) {
-                       // Start the sender for the service side.
-                       SandeshaUtil.startSenderForTheSequence(configContext, 
outSequenceID);
-               }
-
                RelatesTo relatesTo = msgContext.getRelatesTo();
                if(relatesTo != null) {
                        rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
@@ -465,7 +455,7 @@
                return true;
        }
 
-       private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, 
RMSBean rmsBean, String sequencePropertyKey, String internalSequenceId, 
EndpointReference acksTo,
+       private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, 
RMSBean rmsBean, String internalSequenceId, EndpointReference acksTo,
                        StorageManager storageManager) throws AxisFault {
 
                if (log.isDebugEnabled())
@@ -543,7 +533,7 @@
                
createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
                createSeqEntry.setTimeToSend(System.currentTimeMillis());
                createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-               createSeqEntry.setInternalSequenceID(sequencePropertyKey);
+               
createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
                // this will be set to true in the sender
                createSeqEntry.setSend(true);
                // Indicate that this message is a create sequence
@@ -558,6 +548,9 @@
 
                retransmitterMgr.insert(createSeqEntry);
 
+               // Setup enough of the workers to get this create sequence off 
the box.
+               SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+               
                if (log.isDebugEnabled())
                        log.debug("Exit: 
ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
                return rmsBean;

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -187,9 +187,7 @@
 
                                        rmsBeanMgr.insert(rMSBean);
                                        
-                                       if(rMSBean.isPollingMode()) {
-                                               
SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), 
false);
-                                       }
+                                       
SandeshaUtil.startWorkersForSequence(context, rMSBean);
                                        
                                } else {
                                        // removing the accept part.
@@ -215,9 +213,7 @@
                        
                        storageManager.getRMDBeanMgr().update(rmdBean);
        
-                       if(rmdBean.isPollingMode()) {
-                               
SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), 
false);
-                       }
+                       SandeshaUtil.startWorkersForSequence(context, rmdBean);
 
                        AxisEngine engine = new AxisEngine(context);
                        try{

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -210,18 +210,12 @@
                        
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
                        
                        rmdBeanMgr.insert(rMDBean);
-                       
-                       if(rMDBean.isPollingMode()) {
-                               
SandeshaUtil.startPollingForTheSequence(configCtx, rMDBean.getSequenceID(), 
false);
-                       }
+                       SandeshaUtil.startWorkersForSequence(configCtx, 
rMDBean);
                }
                
                rmsBean.setLastActivatedTime(System.currentTimeMillis());
                rmsBeanMgr.update(rmsBean);
-               
-               if(rmsBean.isPollingMode()) {
-                       SandeshaUtil.startPollingForTheSequence(configCtx, 
rmsBean.getSequenceID(), true);
-               }
+               SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
 
                // Locate and update all of the messages for this sequence, now 
that we know
                // the sequence id.

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -38,7 +38,7 @@
                                        ConfigurationContext context = 
rmMsgContext.getConfigurationContext();
                                        StorageManager storage = 
SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
                                        PollingManager pollingManager = 
storage.getPollingManager();
-                                       
pollingManager.schedulePollingRequest(sequenceId, false);
+                                       if(pollingManager != null) 
pollingManager.schedulePollingRequest(sequenceId, false);
                                }
                        }
                }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -53,6 +53,7 @@
 import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.workers.SandeshaThread;
 import org.apache.sandesha2.wsrm.Sequence;
 
 /**
@@ -283,11 +284,6 @@
                        }
                }
 
-               // inorder invocation is still a global property
-               boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
-                               
msgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
-
-
                //setting properties for the messageContext
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new
 Long (msgNo));
@@ -328,7 +324,10 @@
                        AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, 
sequenceId, timeToSend, storageManager);
                }
 
-               if (inOrderInvocation) {
+               // If the storage manager has an invoker, then they may be 
implementing inOrder, or
+               // transactional delivery. Either way, if they have one we 
should use it.
+               SandeshaThread invoker = storageManager.getInvoker();
+               if (invoker != null) {
                        // Whatever the MEP, we stop processing here and the 
invoker will do the real work. We only
                        // SUSPEND if we need to keep the backchannel open for 
the response... we may as well ABORT
                        // to let other cases end more quickly.
@@ -345,8 +344,6 @@
                        // This will avoid performing application processing 
more than once.
                        
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
-                       // Starting the invoker if stopped.
-                       
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), 
sequenceId);
                }
                
                if (log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -87,8 +87,6 @@
                        throw new SandeshaException(message);
                }
                
-               String sequencePropertyKey = 
SandeshaUtil.getSequencePropertyKey(terminateSeqRMMsg);
-
                ConfigurationContext context = 
terminateSeqMsg.getConfigurationContext();
                StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
                
@@ -110,9 +108,9 @@
                // add the terminate sequence response if required.
                RMMsgContext terminateSequenceResponse = null;
                if 
(SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
-                       terminateSequenceResponse = 
getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequencePropertyKey, 
sequenceId, storageManager);
+                       terminateSequenceResponse = 
getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequenceId, 
storageManager);
 
-               setUpHighestMsgNumbers(context, 
storageManager,sequencePropertyKey, sequenceId, terminateSeqRMMsg);
+               setUpHighestMsgNumbers(context, storageManager, sequenceId, 
terminateSeqRMMsg);
                
                
                
@@ -139,10 +137,10 @@
                }
                
                if (doFullTermination) {
-                       
TerminateManager.cleanReceivingSideAfterInvocation(context, 
sequencePropertyKey, sequenceId, storageManager);
-                       
TerminateManager.cleanReceivingSideOnTerminateMessage(context, 
sequencePropertyKey, sequenceId, storageManager);
+                       
TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId, 
storageManager);
+                       
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, 
storageManager);
                } else
-                       
TerminateManager.cleanReceivingSideOnTerminateMessage(context, 
sequencePropertyKey, sequenceId, storageManager);
+                       
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, 
storageManager);
                
 
                
@@ -223,7 +221,7 @@
        }
 
        private void setUpHighestMsgNumbers(ConfigurationContext configCtx, 
StorageManager storageManager,
-                       String requestSidesequencePropertyKey, String 
sequenceId, RMMsgContext terminateRMMsg) throws SandeshaException {
+                       String sequenceId, RMMsgContext terminateRMMsg) throws 
SandeshaException {
 
                if (log.isDebugEnabled())
                        log.debug("Enter: 
TerminateSeqMsgProcessor::setUpHighestMsgNumbers, " + sequenceId);
@@ -293,7 +291,7 @@
                        log.debug("Exit: 
TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
        }
 
-       private RMMsgContext getTerminateSequenceResponse(RMMsgContext 
terminateSeqRMMsg, RMDBean rmdBean, String sequencePropertyKey,String 
sequenceId,
+       private RMMsgContext getTerminateSequenceResponse(RMMsgContext 
terminateSeqRMMsg, RMDBean rmdBean, String sequenceId,
                        StorageManager storageManager) throws AxisFault {
 
                if (log.isDebugEnabled())

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
 Wed Feb  7 02:45:23 2007
@@ -75,7 +75,7 @@
                        
                        if (rMDBean!=null && rMDBean.isPollingMode()) {
                                PollingManager manager = 
storageManager.getPollingManager();
-                               
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+                               if(manager != null) 
manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
                        }
                }
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
 Wed Feb  7 02:45:23 2007
@@ -120,7 +120,7 @@
                
                RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
                RMSBean findRMS = new RMSBean();
-               findRMS.setSequenceID(entry.getSequenceId());
+               findRMS.setInternalSequenceID(entry.getSequenceId());
                findRMS.setPollingMode(true);
                findRMS.setTerminated(false);
                RMSBean beanToPoll = rmsBeanManager.findUnique(findRMS);
@@ -192,7 +192,7 @@
                
makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
                makeConnectionSenderBean.setReSend(false);
                makeConnectionSenderBean.setSend(true);
-               makeConnectionSenderBean.setSequenceID(sequenceId);
+               makeConnectionSenderBean.setSequenceID(rmBean.getSequenceID());
                EndpointReference to = makeConnectionRMMessage.getTo();
                if (to!=null)
                        makeConnectionSenderBean.setToAddress(to.getAddress());

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
 Wed Feb  7 02:45:23 2007
@@ -52,8 +52,12 @@
        public void shutdown(){
                //shutdown the running threads
                getSender().stopRunning();
-               getInvoker().stopRunning();
-               getPollingManager().stopRunning();
+               
+               SandeshaThread thread = getInvoker();
+               if(thread != null) thread.stopRunning();
+               
+               thread = getPollingManager();
+               if(thread != null) thread.stopRunning();
        }
        
        public abstract void initStorage (AxisModule moduleDesc) throws 
SandeshaStorageException;
@@ -62,8 +66,20 @@
        
        public abstract SandeshaThread getSender();
        
+       /**
+        * Get the invoker that hands inbound messages over to the application. 
This
+        * may be null, in which case the inbound messages will be dispatched 
directly
+        * to the application without switching them over to the invoker.
+        * @return null if messages should be delivered directly to the 
application,
+        * otherwise return a SandeshaThread.
+        */
        public abstract SandeshaThread getInvoker();
        
+       /**
+        * Get the thread that generates polling requests to send to remote 
endpoints.
+        * This may be null, in which case the storage manager does not support 
polling.
+        * @return null if polling is diabled, otherwise return a 
PollingManager.
+        */
        public abstract PollingManager getPollingManager();
 
        public abstract RMSBeanMgr getRMSBeanMgr();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beanmanagers/SenderBeanMgr.java
 Wed Feb  7 02:45:23 2007
@@ -40,7 +40,7 @@
        
        public SenderBean findUnique (SenderBean bean) throws SandeshaException;
        
-       public SenderBean getNextMsgToSend() throws SandeshaStorageException;
+       public SenderBean getNextMsgToSend(String sequenceId) throws 
SandeshaStorageException;
 
        public boolean update(SenderBean bean) throws SandeshaStorageException;
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 Wed Feb  7 02:45:23 2007
@@ -66,20 +66,19 @@
                return super.find(bean);
        }
 
-       //TODO remove this method, and move this logic out of the 
StorageManager. We should not hv any RM logic inside the StorageManagers.
-       //Otherwise we will hv to repeat that logic inside every SM Impl.
-       public SenderBean getNextMsgToSend() throws SandeshaStorageException {
+       public SenderBean getNextMsgToSend(String sequenceId) throws 
SandeshaStorageException {
+               if(log.isDebugEnabled()) log.debug("Entry: 
InMemorySenderBeanManager::getNextMessageToSend " + sequenceId);
+               
                // Set up match criteria
                SenderBean matcher = new SenderBean();
                matcher.setSend(true);
+               matcher.setSequenceID(sequenceId);
                matcher.setTimeToSend(System.currentTimeMillis());
                
                List matches = super.find(matcher);
+               if(log.isDebugEnabled()) log.debug("Found " + matches.size() + 
" messages");
                
-               // We either return an application message or an RM message. If 
we find
-               // an application message first then we carry on through the 
list to be
-               // sure that we send the lowest app message avaliable. If we 
hit a RM
-               // message first then we are done.
+               // Look for the message with the lowest send time, and send 
that one.
                SenderBean result = null;
                Iterator i = matches.iterator();
                while(i.hasNext()) {
@@ -87,20 +86,17 @@
                        if (bean.getTimeToSend()<0)
                                continue; //Beans with negative timeToSend 
values are not considered as candidates for sending.
                        
-                       if(bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
-                               long number = bean.getMessageNumber();
-                               if(result == null || result.getMessageNumber() 
> number) {
-                                       result = bean;
-                               }
-                       } else if(result == null) {
-                               //making sure that the bean passes the reSend 
test as well
-                               if (bean.getSentCount()==0 || 
(bean.getSentCount()>0 && bean.isReSend())) { 
-                                       result = bean;
-                                       break;
-                               }
+                       if (bean.getSentCount() > 0 && !bean.isReSend())
+                               continue; //Avoid re-sending messages that we 
should not resend
+                       
+                       if(result == null) {
+                               result = bean;
+                       } else if(result.getTimeToSend() > 
bean.getTimeToSend()) {
+                               result = bean;
                        }
                }
                
+               if(log.isDebugEnabled()) log.debug("Exit: 
InMemorySenderBeanManager::getNextMessageToSend");
                return result;
        }
        

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Wed Feb  7 02:45:23 2007
@@ -75,14 +75,20 @@
                
                SandeshaPolicyBean policy = 
SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
                useSerialization = policy.isUseMessageSerialization();
-
+               
+               // Note that while inOrder is a global property we can decide 
if we need the
+               // invoker thread at this point. If we change this to be a 
sequence-level
+               // property then we'll need to revisit this.
+               boolean inOrder = policy.isInOrder();
+               boolean polling = policy.isEnableMakeConnection();
+               
                this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
                this.rMDBeanMgr = new InMemoryRMDBeanMgr (this, context);
                this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
                this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, 
context);
                this.sender = new Sender();
-               this.invoker = new Invoker();
-               this.pollingManager = new PollingManager();
+               if(inOrder) this.invoker = new Invoker();
+               if(polling) this.pollingManager = new PollingManager();
        }
 
        public Transaction getTransaction() {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
 Wed Feb  7 02:45:23 2007
@@ -28,6 +28,7 @@
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beans.RMBean;
+import org.apache.sandesha2.workers.SandeshaThread;
 
 /**
  * This class does not really implement transactions, but it is a good
@@ -57,7 +58,10 @@
        public void commit() {
                releaseLocks();
                if(sentMessages) manager.getSender().wakeThread();
-               if(receivedMessages) manager.getInvoker().wakeThread();
+               if(receivedMessages) {
+                       SandeshaThread invoker = manager.getInvoker();
+                       if(invoker != null) invoker.wakeThread();
+               }
        }
 
        public void rollback() {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
 Wed Feb  7 02:45:23 2007
@@ -389,8 +389,6 @@
                // inserting the new ack.
                retransmitterBeanMgr.insert(ackBean);
 
-               
SandeshaUtil.startSenderForTheSequence(ackRMMsgContext.getConfigurationContext(),
 sequenceId);
-
        }
        
        public static void sendAckNow (RMMsgContext ackRMMsgContext) throws 
AxisFault {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
 Wed Feb  7 02:45:23 2007
@@ -54,8 +54,6 @@
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceID);
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, 
sequenceID);
                
-               String sequencePropertyKey = 
SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-               
                // operation is the lowest level Sandesha2 could be attached.
                SandeshaPolicyBean propertyBean = 
SandeshaUtil.getPropertyBean(rmMsgCtx.getMessageContext().getAxisOperation());
 
@@ -85,7 +83,7 @@
                                // Only messages of outgoing sequences get 
retransmitted. So named
                                // following method according to that.
                                
-                               
finalizeTimedOutSequence(sequencePropertyKey,internalSequenceID, sequenceID, 
rmMsgCtx.getMessageContext(), storageManager);
+                               finalizeTimedOutSequence(internalSequenceID, 
sequenceID, rmMsgCtx.getMessageContext(), storageManager);
                                continueSending = false;
                        }
                }
@@ -140,13 +138,13 @@
                return interval;
        }
 
-       private static void finalizeTimedOutSequence(String sequencePropertyKey 
,String internalSequenceID, String sequenceID, MessageContext messageContext,
+       private static void finalizeTimedOutSequence(String internalSequenceID, 
String sequenceID, MessageContext messageContext,
                        StorageManager storageManager) throws SandeshaException 
{
                ConfigurationContext configurationContext = 
messageContext.getConfigurationContext();
 
                // Already an active transaction, so don't want a new one
                SequenceReport report = 
SandeshaClient.getOutgoingSequenceReport(internalSequenceID, 
configurationContext, false);
-               TerminateManager.timeOutSendingSideSequence(sequencePropertyKey 
,internalSequenceID, false, storageManager);
+               TerminateManager.timeOutSendingSideSequence(internalSequenceID, 
false, storageManager);
 
                SandeshaListener listener = (SandeshaListener) messageContext
                                
.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Wed Feb  7 02:45:23 2007
@@ -77,6 +77,7 @@
 import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
 import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.workers.SandeshaThread;
 import org.apache.sandesha2.wsrm.AckRequested;
@@ -142,46 +143,39 @@
                return ackRanges;
        }
 
-       public static void startSenderForTheSequence(ConfigurationContext 
context, String sequenceID) throws SandeshaException {
+       public static void startWorkersForSequence(ConfigurationContext 
context, RMSequenceBean sequence)
+       throws SandeshaException {
                if (log.isDebugEnabled())
-                       log.debug("Enter: 
SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID " 
+ sequenceID);
+                       log.debug("Enter: 
SandeshaUtil::startWorkersForSequence, sequence " + sequence);
                
-               SandeshaThread sender = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getSender();         
-               sender.runThreadForSequence(context, sequenceID, true);
+               StorageManager mgr = getSandeshaStorageManager(context, 
context.getAxisConfiguration());
+               boolean polling = sequence.isPollingMode();
                
-               if (log.isDebugEnabled())
-                       log.debug("Exit: 
SandeshaUtil::startSenderForTheSequence");
-       }
-
-       public static void startInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) throws SandeshaException {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: 
SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID 
" + sequenceID);
+               SandeshaThread sender = mgr.getSender();
+               SandeshaThread invoker = mgr.getInvoker();
+               SandeshaThread pollMgr = mgr.getPollingManager();
                
-               SandeshaThread invoker = getSandeshaStorageManager(context, 
context.getAxisConfiguration()).getInvoker();
-               invoker.runThreadForSequence(context, sequenceID, false);
-
-               if (log.isDebugEnabled())
-                       log.debug("Exit: 
SandeshaUtil::startInvokerForTheSequence");                    
-       }
-
-       public static void startPollingForTheSequence(ConfigurationContext 
configurationContext, String sequenceID, boolean rmSource) throws 
SandeshaException {
-               if (log.isDebugEnabled())
-                       log.debug("Enter: 
SandeshaUtil::startPollingForTheSequence , context " + configurationContext + 
", sequenceID " + sequenceID + ", rmSource");
-
                // Only start the polling manager if we are configured to use 
MakeConnection
-               SandeshaPolicyBean policy = 
getPropertyBean(configurationContext.getAxisConfiguration());
-               if(!policy.isEnableMakeConnection()) {
+               if(polling && pollMgr == null) {
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.makeConnectionDisabled);
                        throw new SandeshaException(message);
                }
-               
-               SandeshaThread polling = 
getSandeshaStorageManager(configurationContext, 
configurationContext.getAxisConfiguration()).getPollingManager();
-               polling.runThreadForSequence(configurationContext, sequenceID, 
rmSource);
 
-               if (log.isDebugEnabled())
-                       log.debug("Exit: 
SandeshaUtil::startPollingForTheSequence");                    
+               if(sequence instanceof RMSBean) {
+                       // We pass in the internal sequence id for internal 
sequences.
+                       String sequenceId = 
((RMSBean)sequence).getInternalSequenceID();
+                       sender.runThreadForSequence(context, sequenceId, true);
+                       if(polling) pollMgr.runThreadForSequence(context, 
sequenceId, true);
+               } else {
+                       String sequenceId = sequence.getSequenceID();
+                       sender.runThreadForSequence(context, sequenceId, false);
+                       if(invoker != null) 
invoker.runThreadForSequence(context, sequenceId, false);
+                       if(polling) pollMgr.runThreadForSequence(context, 
sequenceId, false);
+               }
+               
+               if (log.isDebugEnabled()) log.debug("Exit: 
SandeshaUtil::startWorkersForSequence");
        }
-       
+
        public static String getMessageTypeString(int messageType) {
                switch (messageType) {
                case Sandesha2Constants.MessageTypes.CREATE_SEQ:
@@ -885,63 +879,6 @@
                }
        }
        
-       /**This returns the Key used when store SequencePropertyBeans for the 
passed message.
-        * For the sending side this will be the internal sequence ID.
-        * For the receiving side this is the sequenceId.
-        * 
-        * @param rmMsgContext
-        * @return
-        */
-       
-       public static String getSequencePropertyKey (RMMsgContext rmMsgContext) 
throws AxisFault {
-               String propertyKey = (String) 
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY);
-               if (propertyKey!=null)
-                       return propertyKey;
-               
-               String sequenceId = (String) 
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID);
-               String internalSequenceId = (String) 
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
-
-               int type = rmMsgContext.getMessageType();
-               int flow = rmMsgContext.getMessageContext().getFLOW();
-               
-               if (flow==MessageContext.OUT_FLOW) {
-                       if (isSequenceResponseMessageType(type))
-                               propertyKey = sequenceId;
-                       else
-                               propertyKey = internalSequenceId;
-               } else if (flow==MessageContext.IN_FLOW || 
-                                                        
flow==MessageContext.IN_FAULT_FLOW) {
-                       if (isSequenceResponseMessageType(type))
-                               propertyKey = internalSequenceId;
-                       else
-                               propertyKey = sequenceId;
-               } else if (flow==MessageContext.OUT_FAULT_FLOW) {
-                       propertyKey = internalSequenceId;
-               }
-               
-               //TODO handler cases not covered from above.
-               
-               if (propertyKey==null) {
-                       String typeStr = 
SandeshaUtil.getMessageTypeString(rmMsgContext.getMessageType());
-                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotFindPropertyKey,typeStr);
-                       throw new SandeshaException (message);
-               }
-               
-               return propertyKey;
-       }
-       
-       private static boolean isSequenceResponseMessageType (int messageType) {
-               if 
(messageType==Sandesha2Constants.MessageTypes.CREATE_SEQ_RESPONSE ||
-                       messageType==Sandesha2Constants.MessageTypes.ACK ||
-                       
messageType==Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE_RESPONSE ||
-                       
messageType==Sandesha2Constants.MessageTypes.TERMINATE_SEQ_RESPONSE) {
-                       
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
        public static boolean isWSRMAnonymous(String address) {
                if (address!=null && 
address.startsWith(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX))
                        return true;

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 Wed Feb  7 02:45:23 2007
@@ -87,7 +87,6 @@
                }
 
                MessageContext createSeqContext = 
createSequenceMsg.getMessageContext();
-               ConfigurationContext configurationContext = 
createSeqContext.getConfigurationContext();
 
                rmdBean.setServerCompletedMessages(new RangeString());
                
@@ -121,10 +120,6 @@
                        storageManager.storeMessageContext(newKey, 
createSeqContext);
                }
 
-               // message to invoke. This will apply for only in-order 
invocations.
-
-               SandeshaUtil.startSenderForTheSequence(configurationContext, 
sequenceId);
-
                String messageRMNamespace = createSequence.getNamespaceValue();
 
                String specVersion = null;
@@ -151,13 +146,12 @@
 
        }
 
-       public static RMSBean setupNewClientSequence(MessageContext 
firstAplicationMsgCtx, String sequencePropertyKey,
+       public static RMSBean setupNewClientSequence(MessageContext 
firstAplicationMsgCtx,
                        String specVersion, StorageManager storageManager) 
throws SandeshaException {
                if (log.isDebugEnabled())
-                       log.debug("Enter: 
SequenceManager::setupNewClientSequence " + sequencePropertyKey);
+                       log.debug("Enter: 
SequenceManager::setupNewClientSequence");
                
                RMSBean rmsBean = new RMSBean();
-               ConfigurationContext configurationContext = 
firstAplicationMsgCtx.getConfigurationContext();
 
                EndpointReference toEPR = firstAplicationMsgCtx.getTo();
                String acksTo = (String) 
firstAplicationMsgCtx.getProperty(SandeshaClientConstants.AcksTo);
@@ -255,8 +249,6 @@
                // updating the last activated time.
                rmsBean.setLastActivatedTime(System.currentTimeMillis());
                
-               SandeshaUtil.startSenderForTheSequence(configurationContext, 
sequencePropertyKey);
-
                updateClientSideListnerIfNeeded(firstAplicationMsgCtx, 
anonAcks);
                if (log.isDebugEnabled())
                        log.debug("Exit: 
SequenceManager::setupNewClientSequence " + rmsBean);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
 Wed Feb  7 02:45:23 2007
@@ -121,7 +121,7 @@
         * @param sequenceID
         * @throws SandeshaException
         */
-       public static void 
cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String 
sequencePropertyKey ,String sequenceId,
+       public static void 
cleanReceivingSideOnTerminateMessage(ConfigurationContext configContext, String 
sequenceId,
                        StorageManager storageManager) throws SandeshaException 
{
 
                // clean senderMap
@@ -148,14 +148,14 @@
                        // there is no invoking by Sandesha2. So clean 
invocations storages.
                        
                        receivingSideCleanMap.put(sequenceId, 
CLEANED_ON_TERMINATE_MSG);
-                       cleanReceivingSideAfterInvocation(configContext, 
sequencePropertyKey, sequenceId, storageManager);
+                       cleanReceivingSideAfterInvocation(configContext, 
sequenceId, storageManager);
                } else {
 
                        String cleanStatus = (String) 
receivingSideCleanMap.get(sequenceId);
                        if (cleanStatus != null
                                        && 
CLEANED_AFTER_INVOCATION.equals(cleanStatus))
                                
completeTerminationOfReceivingSide(configContext,
-                                               sequencePropertyKey, 
sequenceId, storageManager);
+                                               sequenceId, storageManager);
                        else
                                receivingSideCleanMap.put(sequenceId, 
CLEANED_ON_TERMINATE_MSG);
                }
@@ -170,7 +170,7 @@
         * @param sequenceID
         * @throws SandeshaException
         */
-       public static void 
cleanReceivingSideAfterInvocation(ConfigurationContext configContext, String 
sequencePropertyKey ,String sequenceId,
+       public static void 
cleanReceivingSideAfterInvocation(ConfigurationContext configContext, String 
sequenceId,
                        StorageManager storageManager) throws SandeshaException 
{
                InvokerBeanMgr storageMapBeanMgr = 
storageManager.getInvokerBeanMgr();
 
@@ -195,7 +195,7 @@
 
                String cleanStatus = (String) 
receivingSideCleanMap.get(sequenceId);
                if (cleanStatus != null && 
CLEANED_ON_TERMINATE_MSG.equals(cleanStatus))
-                       completeTerminationOfReceivingSide(configContext, 
sequencePropertyKey, sequenceId, storageManager);
+                       completeTerminationOfReceivingSide(configContext, 
sequenceId, storageManager);
                else {
                        receivingSideCleanMap.put(sequenceId, 
CLEANED_AFTER_INVOCATION);
                }
@@ -206,7 +206,7 @@
         * methods.
         * 
         */
-       private static void 
completeTerminationOfReceivingSide(ConfigurationContext configContext, String 
sequencePropertyKey,String sequenceId,
+       private static void 
completeTerminationOfReceivingSide(ConfigurationContext configContext,String 
sequenceId,
                        StorageManager storageManager) throws SandeshaException 
{
                
                // TODO We need to remove the RMDBean, but doing so quickly can 
stop
@@ -241,20 +241,20 @@
                rmsBean.setTerminated(true);            
                storageManager.getRMSBeanMgr().update(rmsBean);
                
-               cleanSendingSideData (rmsBean.getSequenceID(), 
rmsBean.getInternalSequenceID(), serverSide, storageManager);
+               cleanSendingSideData (rmsBean.getInternalSequenceID(), 
serverSide, storageManager);
        }
 
-       public static void timeOutSendingSideSequence(String 
sequencePropertyKey,String internalSequenceId,
+       public static void timeOutSendingSideSequence(String internalSequenceId,
                        boolean serverside, StorageManager storageManager) 
throws SandeshaException {
 
                RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
internalSequenceId);
                rmsBean.setTimedOut(true);
                storageManager.getRMSBeanMgr().update(rmsBean);
 
-               cleanSendingSideData(sequencePropertyKey,internalSequenceId, 
serverside, storageManager);
+               cleanSendingSideData(internalSequenceId, serverside, 
storageManager);
        }
 
-       private static void cleanSendingSideData(String 
sequencePropertyKey,String internalSequenceId,
+       private static void cleanSendingSideData(String internalSequenceId,
                        boolean serverSide, StorageManager storageManager) 
throws SandeshaException {
 
                SenderBeanMgr retransmitterBeanMgr = 
storageManager.getSenderBeanMgr();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 Wed Feb  7 02:45:23 2007
@@ -57,8 +57,6 @@
                        msgToInvoke = 
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
                        RMMsgContext rmMsg = 
MsgInitializer.initializeMessage(msgToInvoke);
 
-                       String sequencePropertyKey = 
SandeshaUtil.getSequencePropertyKey(rmMsg);
-                       
                        // ending the transaction before invocation.
                        if(transaction != null) {
                                transaction.commit();
@@ -126,10 +124,7 @@
                                        //this will work for RM 1.0 only
                                        highestMessage = true;
                                } else {
-
-                                       RMDBean findBean = new RMDBean ();
-                                       findBean.setSequenceID(sequenceId);
-                                       RMDBean rmdBean = 
storageManager.getRMDBeanMgr().findUnique(findBean);
+                                       RMDBean rmdBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
                                        
                                        if (rmdBean!=null && 
rmdBean.isTerminated()) {
                                                long highestInMsgNo = 
rmdBean.getHighestInMessageNumber();
@@ -140,7 +135,7 @@
                                
                                if (highestMessage) {
                                        //do cleaning stuff that hs to be done 
after the invocation of the last message.
-                                       
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext, 
sequencePropertyKey, sequenceId, storageManager);
+                                       
TerminateManager.cleanReceivingSideAfterInvocation(configurationContext, 
sequenceId, storageManager);
                                        // exit from current iteration. (since 
an entry
                                        // was removed)
                                        if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::run Last message return");                      
               

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
 Wed Feb  7 02:45:23 2007
@@ -142,6 +142,11 @@
        }
        
 
+       /**
+        * Ensure that the worker thread is aware of the given sequence. As 
source sequences
+        * do not have a proper sequence id at the time they are bootstrapped, 
the caller
+        * must pass in the internal sequence id when rmSource is true.
+        */
        public synchronized void runThreadForSequence(ConfigurationContext 
context, String sequenceID, boolean rmSource){
                if(log.isDebugEnabled()) log.debug("Entry: 
SandeshaThread::runThreadForSequence, " + this);
 
@@ -175,7 +180,10 @@
         * @return a List of SequenceEntry instances
         */
        public synchronized ArrayList getSequences() {
-               return workingSequences;
+               // Need to copy the list for thread safety
+               ArrayList result = new ArrayList();
+               result.addAll(workingSequences);
+               return result;
        }
 
        protected synchronized boolean hasStoppedRunning() {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=504494&r1=504493&r2=504494
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Wed Feb  7 02:45:23 2007
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2.workers;
 
+import java.util.ArrayList;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -24,6 +26,8 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 
 /**
@@ -35,7 +39,12 @@
 public class Sender extends SandeshaThread {
 
        private static final Log log = LogFactory.getLog(Sender.class);
-           
+
+       // If this sender is working for several sequences, we use round-robin 
to
+       // try and give them all a chance to invoke messages.
+       int nextIndex = 0;
+       boolean processedMessage = false;
+       
        public Sender () {
                super(Sandesha2Constants.SENDER_SLEEP_TIME);
        }
@@ -44,20 +53,63 @@
                if (log.isDebugEnabled()) log.debug("Enter: 
Sender::internalRun");
 
                Transaction transaction = null;
+               boolean sleep = false;
 
                try {
+                       // Pick a sequence using a round-robin approach
+                       ArrayList allSequencesList = getSequences();
+                       int size = allSequencesList.size();
+                       log.debug("Choosing one from " + size + " sequences");
+                       if(nextIndex >= size) {
+                               nextIndex = 0;
+
+                               // We just looped over the set of sequences. If 
we didn't process any
+                               // messages on this loop then we sleep before 
the next one
+                               if(size == 0 || !processedMessage) {
+                                       sleep = true;
+                               }
+                               processedMessage = false;
+                               
+                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, looped over all sequences, sleep " + sleep);
+                               return sleep;
+                       }
+
+                       SequenceEntry entry = (SequenceEntry) 
allSequencesList.get(nextIndex++);
+                       String sequenceId = entry.getSequenceId();
+                       log.debug("Chose sequence " + sequenceId);
+
                        transaction = storageManager.getTransaction();
 
+                       // Check that the sequence is still valid
+                       boolean found = false;
+                       if(entry.isRmSource()) {
+                               RMSBean matcher = new RMSBean();
+                               matcher.setInternalSequenceID(sequenceId);
+                               matcher.setTerminated(false);
+                               RMSBean rms = 
storageManager.getRMSBeanMgr().findUnique(matcher);
+                               if(rms != null) {
+                                       sequenceId = rms.getSequenceID();
+                                       found = true;
+                               }
+                       } else {
+                               RMDBean matcher = new RMDBean();
+                               matcher.setSequenceID(sequenceId);
+                               matcher.setTerminated(false);
+                               RMDBean rmd = 
storageManager.getRMDBeanMgr().findUnique(matcher);
+                               if(rmd != null) found = true;
+                       }
+                       if (!found) {
+                               stopThreadForSequence(sequenceId, 
entry.isRmSource());
+                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, sequence has ended");
+                               return false;
+                       }
+                       
                        SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
-                       SenderBean senderBean = mgr.getNextMsgToSend();
+                       SenderBean senderBean = 
mgr.getNextMsgToSend(sequenceId);
                        
                        if (senderBean == null) {
-                               // As there was no work to do, we sleep for a 
while on the next loop.
-                               if (log.isDebugEnabled()) {
-                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
-                                       log.debug("Exit: Sender::internalRun, " 
+ message + ", sleeping");
-                               }
-                               return true;
+                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, no message for this sequence");
+                               return false; // Move on to the next sequence 
in the list
                        }
 
                        // work Id is used to define the piece of work that 
will be
@@ -96,6 +148,10 @@
                        // makes sure
                        // that all the workIds in the Lock are handled by 
threads.
                        getWorkerLock().addWork(workId);
+
+                       // If we got to here then we found work to do on the 
sequence, so we should
+                       // remember not to sleep at the end of the list of 
sequences.
+                       processedMessage = true;
 
                } catch (Exception e) {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to