Author: mlovett
Date: Tue Jan 30 10:23:44 2007
New Revision: 501504

URL: http://svn.apache.org/viewvc?view=rev&rev=501504
Log:
Fix some sync-2-way errors that stopped the backchannel working with WSRM 1.1

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=501504&r1=501503&r2=501504
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Tue Jan 30 10:23:44 2007
@@ -169,20 +169,12 @@
                String specVersion = rmMsgCtx.getRMSpecVersion();
                Boolean duplicateMessage = (Boolean) 
rmMsgCtx.getProperty(Sandesha2Constants.DUPLICATE_MESSAGE);
                String mep = 
msgCtx.getAxisOperation().getMessageExchangePattern();
+               boolean syncReply = replyTo == null || 
replyTo.hasAnonymousAddress();
                
-               if ((replyTo!=null && replyTo.hasAnonymousAddress()) &&
+               if (syncReply && 
!WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep) && 
                        (specVersion!=null && 
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0)) &&
                        (duplicateMessage!=null && 
duplicateMessage.equals(Boolean.TRUE))){
                        
-                       if (WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep)) {
-                               //This scenario has to be handled only for meps 
with response messages
-                               result = InvocationResponse.SUSPEND;
-                               
-                               if (log.isDebugEnabled())
-                                       log.debug("Exit: 
SequenceProcessor::processReliableMessage"     + result);
-                               return result;
-                       }
-                       
                    String outgoingSideInternalSequenceId = 
SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
                    RMSBean rmsBean = 
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, 
outgoingSideInternalSequenceId);
                    if (rmsBean==null) {
@@ -243,11 +235,10 @@
                                                // pausing the thread, causing 
the transport to wait.
                                                // Sender will send the 
outMessage correctly, using
                                                // 
RequestResponseTransportListner.
-
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Exit: 
SequenceProcessor::processReliableMessage"     + result);
                                                
                                                result = 
InvocationResponse.SUSPEND;
+                                               if (log.isDebugEnabled())
+                                                       log.debug("Exit: 
SequenceProcessor::processReliableMessage" + result);
                                                return result;
                                        }
                                }
@@ -257,8 +248,11 @@
                        throw new SandeshaException (message);
                    
                } else if (duplicateMessage!=null && 
duplicateMessage.equals(Boolean.TRUE)) {
-                       String message = "Unexpected scenario. This message 
should have been dropped in the pre-dispatch level";
-                       throw new SandeshaException (message);
+                       // Abort processing this duplicate
+                       result = InvocationResponse.ABORT;
+                       if (log.isDebugEnabled())
+                               log.debug("Exit: 
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+                       return result;
                }               
                
                String key = SandeshaUtil.getUUID(); // key to store the 
message.
@@ -290,8 +284,10 @@
                                && 
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == 
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
                        // this is a duplicate message and the invocation type 
is
                        // EXACTLY_ONCE.
-                       rmMsgCtx.pause();
-                       result = InvocationResponse.SUSPEND;
+                       result = InvocationResponse.ABORT;
+                       if (log.isDebugEnabled())
+                               log.debug("Exit: 
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
+                       return result;
                }
 
                if (!msgNoPresentInList)
@@ -311,9 +307,6 @@
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID,sequenceId);
                
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.MESSAGE_NUMBER,new
 Long (msgNo));
                
-               
-               
-               
 //             adding of acks
                
 //             if acksTo anonymous 
@@ -352,47 +345,15 @@
 
                if (inOrderInvocation) {
 
-                       //if replyTo is anonymous and this is not an InOnly 
message
-                               //SUSPEND the execution for RM 1.0    
-                               //Sender will attach a sync responseusing the 
RequestResponseTransport object.
-                       //else
-                               //ABORT the execution
-                       
-                       // if (acksTo is anonymous and no response message has 
been added)
-                               //send an ack to the back channel now.
-                       
-                       //add an antry to the invoker
-                       
-                       if ((replyTo!=null && replyTo.hasAnonymousAddress() && 
-                                
!WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep))) {
-                               
-                               if (specVersion!=null && 
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0)) {
-                                       result = InvocationResponse.SUSPEND;
-                                       //in case of RM 1.0 result will be 
suspended, causing the anon-response to be added using RequestResponseTransport
-                                       //ack bean entry added previously may 
cause an ack to be piggybacked.
-                               } else {
-                                       result = InvocationResponse.ABORT;
-                               }
-                       } else {
-                               result = InvocationResponse.ABORT;
-                       }
-
-                       
+                       // Whatever the MEP, we suspend processing here and the 
invoker will do the real work
+                       result = InvocationResponse.SUSPEND;
                        InvokerBeanMgr storageMapMgr = 
storageManager.getInvokerBeanMgr();
 
-                       // saving the message.
-                       try {
-                               storageManager.storeMessageContext(key, 
rmMsgCtx.getMessageContext());
-                               storageMapMgr.insert(new InvokerBean(key, 
msgNo, sequenceId));
-
-                               // This will avoid performing application 
processing more
-                               // than
-                               // once.
-                               
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+                       storageManager.storeMessageContext(key, 
rmMsgCtx.getMessageContext());
+                       storageMapMgr.insert(new InvokerBean(key, msgNo, 
sequenceId));
 
-                       } catch (Exception ex) {
-                               throw new SandeshaException(ex.getMessage(), 
ex);
-                       }
+                       // This will avoid performing application processing 
more than once.
+                       
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
                        // Starting the invoker if stopped.
                        
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), 
sequenceId);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=501504&r1=501503&r2=501504
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Tue Jan 30 10:23:44 2007
@@ -115,42 +115,27 @@
                                return; 
                        }
 
-                       String specVersion = rmMsgCtx.getRMSpecVersion();
-                       
-                       // If we are sending to the anonymous URI and this is 
WSRM 1.0 then we _must_ have a transport waiting,
+                       // If we are sending to the anonymous URI then we 
_must_ have a transport waiting,
                        // or the message can't go anywhere. If there is 
nothing here then we leave the
-                       // message in the sender queue, and a MakeConnection 
will hopefully pick it up
-                       // soon.
-                       // If this is RM 1.1 we have the MakeConnection 
mechanism so this check is not necessary.
+                       // message in the sender queue, and a MakeConnection 
(or a retransmitted request)
+                       // will hopefully pick it up soon.
+                       Boolean makeConnection = (Boolean) 
msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
                        EndpointReference toEPR = msgCtx.getTo();
-                       if(toEPR.hasAnonymousAddress() ) {
-                               
-                               if (specVersion != null && 
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) {
-                                       //for RM 1.0 we dont let this go 
forward until a valid RequestResponseTransport is available to attach a 
-                                       //async response.
-                                       RequestResponseTransport t = null;
-                                       MessageContext inMsg = null;
-                                       OperationContext op = 
msgCtx.getOperationContext();
-                                       if (op != null)
-                                               inMsg = 
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-                                       if (inMsg != null)
-                                               t = (RequestResponseTransport) 
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+                       if(toEPR.hasAnonymousAddress() &&
+                               (makeConnection == null || 
!makeConnection.booleanValue())) {
 
-                                       if (t == null || 
!t.getStatus().equals(RequestResponseTransportStatus.WAITING)) {
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Exit: 
SenderWorker::run, no response transport for anonymous message");
-                                               return;
-                                       }
-                               } else {
-                                       //for RM 1.1 we dont let this go 
forward only if this is an response to a MakeConnection message
-                                       
-                                       Boolean makeConnectionResponse = 
(Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
-                                       
-                                       if (makeConnectionResponse==null || 
!Boolean.TRUE.equals(makeConnectionResponse)) {
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Exit: 
SenderWorker::run, no response transport for anonymous message");
-                                               return;
-                                       }
+                               RequestResponseTransport t = null;
+                               MessageContext inMsg = null;
+                               OperationContext op = 
msgCtx.getOperationContext();
+                               if (op != null)
+                                       inMsg = 
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+                               if (inMsg != null)
+                                       t = (RequestResponseTransport) 
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+                               if (t == null || 
!t.getStatus().equals(RequestResponseTransportStatus.WAITING)) {
+                                       if (log.isDebugEnabled())
+                                               log.debug("Exit: 
SenderWorker::run, no response transport for anonymous message");
+                                       return;
                                }
                        }
                        



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to