Author: parsonsd
Date: Tue Jul  7 15:03:47 2009
New Revision: 791857

URL: http://svn.apache.org/viewvc?rev=791857&view=rev
Log:
- Msgs are being lost because RMDBeans are being updated to say msgs have been 
received before complete insertion of InvokerBeans.  The fix is to make the 
updating and insertion complete under one transaction.
- The sync1way reallocation code isn't triggered because the client doesn't 
receive any faults from the server.  The fix is to set the faultTo so wsa has 
somewhere to send the fault.


Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=791857&r1=791856&r2=791857&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
 Tue Jul  7 15:03:47 2009
@@ -208,12 +208,7 @@
                
rmMsgCtx.setProperty(Constants.Configuration.DISABLE_RESPONSE_ACK, 
Boolean.TRUE);
         }
         
-        if (!msgNoPresentInList) {
-          serverCompletedMessageRanges.addRange(new Range(msgNo));
-          
-          storageManager.getRMDBeanMgr().update(bean);
-        }
-        else {
+        if (msgNoPresentInList){
            
           if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
               log.debug("Detected duplicate message " + msgNo);
@@ -262,7 +257,7 @@
       }
       
       if(transaction != null && transaction.isActive()) transaction.commit();
-      transaction = null;
+               transaction = null;
     }catch (RuntimeException e)
     {
        

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=791857&r1=791856&r2=791857&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
 Tue Jul  7 15:03:47 2009
@@ -482,6 +482,14 @@
                                                }
                                        }
                                }
+                               // Set the faultTo to anonymous to make sure we 
get Sandesha faults back.
+                               if(mep == WSDLConstants.MEP_CONSTANT_OUT_ONLY
+                                       || 
(Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion) && replyTo == null)) 
{
+                                       if (log.isDebugEnabled()) 
+                                               log.debug("Setting the faultTo 
to anonymous as a oneWay MEP is being used and fault msgs can then be delivered 
back on the backchannel");
+                                       if(msgContext.getFaultTo() == null)
+                                               msgContext.setFaultTo(new 
EndpointReference(AddressingConstants.Final.WSA_ANONYMOUS_URL));
+                               }
                        } 
                        
                        boolean startPolling = false;
@@ -501,7 +509,7 @@
                                }
                        }
 
-                       if (log.isDebugEnabled()) log.debug("App msg using 
replyTo EPR as " + msgContext.getReplyTo());
+                       if (log.isDebugEnabled()) log.debug("App msg using 
replyTo EPR as " + msgContext.getReplyTo() + " and faultTo EPR as " + 
msgContext.getFaultTo());
                        
                        RelatesTo relatesTo = msgContext.getRelatesTo();
                        if(relatesTo != null) {

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=791857&r1=791856&r2=791857&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Tue Jul  7 15:03:47 2009
@@ -22,6 +22,7 @@
 import javax.xml.namespace.QName;
 
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
@@ -53,6 +54,8 @@
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
@@ -173,7 +176,15 @@
                EndpointReference replyTo = rmMsgCtx.getReplyTo();
                if (log.isDebugEnabled())
                        log.debug("SequenceProcessor::processReliableMessage 
replyTo = " + replyTo);
-               
+
+               // Updating the server completed message ranges list
+               RangeString serverCompletedMessageRanges = 
bean.getServerCompletedMessages();
+               // See if the message is in the list of completed ranges
+               boolean msgNoPresentInList =  
serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
+               if (!msgNoPresentInList){
+                        serverCompletedMessageRanges.addRange(new 
Range(msgNo));
+               }
+
                // updating the Highest_In_Msg_No property which gives the 
highest
                // message number retrieved from this sequence.
                long highestInMsgNo = bean.getHighestInMessageNumber();

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=791857&r1=791856&r2=791857&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Tue Jul  7 15:03:47 2009
@@ -1035,7 +1035,6 @@
 
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Enter: 
SandeshaUtil::reallocateMessagesToNewSequence");
-           
                ConfigurationContext ctx = storageManager.getContext();
                ServiceClient client = new ServiceClient(ctx,  null);
                
@@ -1088,10 +1087,20 @@
                        AxisOperation axisOperation = msgCtx.getAxisOperation();
 
                        //If it's oneway or async, reallocate
-                       //Fail if replyTo is annonymous as this is currently 
not supported because in twoway we can't get responses back to th eold something
-                       if(axisOperation.getAxisSpecificMEPConstant() == 
WSDLConstants.MEP_CONSTANT_OUT_ONLY){
+                       EndpointReference replyTo = 
oldRMSBean.getReplyToEndpointReference();
+                       int mep = axisOperation.getAxisSpecificMEPConstant();
+                       if(mep == WSDLConstants.MEP_CONSTANT_OUT_ONLY){
                                
client.fireAndForget(msgCtx.getEnvelope().getBody().cloneOMElement().getFirstElement());
-                       } else if 
(client.getOptions().getReplyTo().hasAnonymousAddress()){
+                       } else if (replyTo == null || 
replyTo.hasAnonymousAddress()){
+                               //It is sync2way and therefore we should fail
+                               transaction = storageManager.getTransaction();
+
+                               RMSBeanMgr mgr = storageManager.getRMSBeanMgr();
+
+                               RMSBean finder = new RMSBean();
+                               
finder.setSequenceID(oldRMSBean.getSequenceID());
+                               RMSBean bean = mgr.findUnique(finder);
+
                                
oldRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
                                
storageManager.getRMSBeanMgr().update(oldRMSBean);
                                throw new 
SandeshaException(SandeshaMessageKeys.reallocationForSyncRequestReplyNotSupported);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to