Author: mckierna
Date: Fri Dec 14 02:55:51 2007
New Revision: 604158

URL: http://svn.apache.org/viewvc?rev=604158&view=rev
Log:
fix some bugs in reallocate

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Fri Dec 14 02:55:51 2007
@@ -28,6 +28,7 @@
 import java.util.Map;
 
 import javax.xml.namespace.QName;
+import javax.xml.stream.events.Characters;
 
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.util.CopyUtils;
@@ -1078,14 +1079,17 @@
         //internal sequence ID is different
         String internalSequenceID = oldRMSBean.getInternalSequenceID();
         //we also need to obtain the sequenceKey from the internalSequenceID.
-        String sequenceKey = 
+        String oldSequenceKey = 
           
SandeshaUtil.getSequenceKeyFromInternalSequenceID(internalSequenceID, 
oldRMSBean.getToEndpointReference().getAddress());
-        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, 
sequenceKey); 
+        //remove the old sequence key from the internal sequence ID
+        internalSequenceID = internalSequenceID.substring(0, 
internalSequenceID.length()-oldSequenceKey.length());
+        options.setProperty(SandeshaClientConstants.SEQUENCE_KEY, 
+                       SandeshaUtil.getUUID()); //using a new sequence Key to 
differentiate from the old sequence 
         
options.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,
 internalSequenceID);
         options.setProperty(SandeshaClientConstants.RM_SPEC_VERSION, 
oldRMSBean.getRMVersion());
        
options.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, 
Boolean.FALSE);
        
-        //send the msgs
+        //send the msgs - this will setup a new sequence to the same endpoint
        Iterator it = msgsToSend.iterator();
        while(it.hasNext()){
                MessageContext msgCtx = (MessageContext)it.next();

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=604158&r1=604157&r2=604158&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
 Fri Dec 14 02:55:51 2007
@@ -35,6 +35,7 @@
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.SandeshaStorageException;
@@ -271,7 +272,7 @@
                long lastAckedMsg = -1;
                
                if(ranges.length==1){
-                       //a single contiguous acked range
+                       //the sequence is a single contiguous acked range
                        lastAckedMsg = ranges[0].upperValue;
                }
                else{
@@ -286,20 +287,50 @@
                        
if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ
 || rmsBean.isTerminated()){
                                //remove all but terminate sequence messages
                                String messageStoreKey = 
retransmitterBean.getMessageContextRefKey();
+                               //if we have been asked to reallocate we need 
to send all unacked messages to a new sequence.
+                               //We must ensure that we rerieve these messages 
in the correct order 
                                if(reallocateIfPossible
-                                       && 
retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.APPLICATION
+                                       && 
retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION
                                        && 
retransmitterBean.getMessageNumber()==lastAckedMsg+1){
                                        
-                                       //try to reallocate application msgs
+                                       if(log.isDebugEnabled())
+                                               log.debug("adding message for 
reallocate: " + retransmitterBean.getMessageNumber());
+                                       
+                                       //try to reallocate application msgs 
that are next in the outgoing list to 
                                        
msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, 
storageManager.getContext()));
+                                       
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+                                       
storageManager.removeMessageContext(messageStoreKey);   
                                        lastAckedMsg++;
                                }
-                               
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
-                               
storageManager.removeMessageContext(messageStoreKey);                           
+                               else if(reallocateIfPossible){
+                                       //we are reallocating but this message 
does not fit the criteria. We should not delete it
+                                       if(log.isDebugEnabled())
+                                               log.debug("cannot reallocate: " 
+ retransmitterBean.getMessageNumber());
+                                       if(msgsToReallocate.size()==0){
+                                               try{
+                                                       //however we might need 
this message if there are no messages to reallocate but we still
+                                                       //need a new sequence - 
we use a dummy message
+                                                       MessageContext dummy = 
SandeshaUtil.cloneMessageContext(
+                                                                       
storageManager.retrieveMessageContext(messageStoreKey, 
storageManager.getContext()));
+                                                       
dummy.getOptions().setProperty(SandeshaClientConstants.DUMMY_MESSAGE, 
Sandesha2Constants.VALUE_TRUE);   
+                                                       
msgsToReallocate.add(dummy);                                                    
+                                               }
+                                               catch(Exception e){
+                                                       if(log.isDebugEnabled())
+                                                               
log.debug("Exit: TerminateManager::cleanSendingSideData " + e);
+                                                       throw new 
SandeshaStorageException(e);
+                                               }
+                                       }
+                               }
+                               else{
+                                       //we are not reallocating so just 
delete the messages
+                                       
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+                                       
storageManager.removeMessageContext(messageStoreKey);                           
                
+                               }
                        }
                }
                
-               if(reallocateIfPossible && msgsToReallocate.size()>0){
+               if(reallocateIfPossible){
                        try{
                              
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, 
msgsToReallocate);  
                              reallocatedOK = true;



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to