Author: gatfora
Date: Wed Feb 21 02:55:08 2007
New Revision: 509965

URL: http://svn.apache.org/viewvc?view=rev&rev=509965
Log:
Modify SequenceProcessor so that ACKS are sent to non anonymous acks-to 
addresses when processing a duplicate message

Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
 Wed Feb 21 02:55:08 2007
@@ -223,37 +223,32 @@
                        if((replyTo==null || replyTo.hasAnonymousAddress()) &&
                           (specVersion!=null && 
specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
 
-                           SenderBeanMgr senderBeanMgr = 
storageManager.getSenderBeanMgr();
-                           SenderBean findSenderBean = new SenderBean ();
-                           
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-                           
findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
-                           
findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
-                           findSenderBean.setSend(true);
+                         SenderBeanMgr senderBeanMgr = 
storageManager.getSenderBeanMgr();
+                         SenderBean findSenderBean = new SenderBean ();
+                         
findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+                         
findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+                         
findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+                         findSenderBean.setSend(true);
                
-                           SenderBean replyMessageBean = 
senderBeanMgr.findUnique(findSenderBean);
+                         SenderBean replyMessageBean = 
senderBeanMgr.findUnique(findSenderBean);
                            
-                           // this is effectively a poll for the replyMessage, 
wo re-use the logic in the MakeConnection
-                           // processor. This will use this thread to re-send 
the reply, writing it into the transport.
-                           // As the reply is now written we do not want to 
continue processing, or suspend, so we abort.
-                           if(replyMessageBean != null) {
-                               if(log.isDebugEnabled()) log.debug("Found 
matching reply for replayed message");
-                               MakeConnectionProcessor.replyToPoll(rmMsgCtx, 
replyMessageBean, storageManager, false, null);
+                         // this is effectively a poll for the replyMessage, 
wo re-use the logic in the MakeConnection
+                         // processor. This will use this thread to re-send 
the reply, writing it into the transport.
+                         // As the reply is now written we do not want to 
continue processing, or suspend, so we abort.
+                         if(replyMessageBean != null) {
+                               if(log.isDebugEnabled()) log.debug("Found 
matching reply for replayed message");
+                               MakeConnectionProcessor.replyToPoll(rmMsgCtx, 
replyMessageBean, storageManager, false, null);
                                        result = InvocationResponse.ABORT;
                                        if (log.isDebugEnabled())
                                                log.debug("Exit: 
SequenceProcessor::processReliableMessage, replayed message: " + result);
                                        return result;
-                           }
-                   }
+                         }
+                 }
+                       
                        EndpointReference acksTo = new EndpointReference 
(bean.getAcksToEPR());
-                       if (acksTo.hasAnonymousAddress()) {
-                               RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, 
storageManager,false,true);
-                               
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 Constants.VALUE_TRUE);
-                               
AcknowledgementManager.sendAckNow(ackRMMsgContext);
-                               result = InvocationResponse.ABORT;
-                               if (log.isDebugEnabled())
-                                       log.debug("Exit: 
SequenceProcessor::processReliableMessage, acking duplicate message: " + 
result);
-                               return result;
-                       }
+                       
+                       // Send an Ack if needed.
+                       sendAckIfNeeded(sequenceId, rmMsgCtx, storageManager, 
true, acksTo.hasAnonymousAddress());                      
                        
                        result = InvocationResponse.ABORT;
                        if (log.isDebugEnabled())
@@ -383,59 +378,28 @@
        }
 
 
-       public static void sendAckIfNeeded(RMMsgContext rmMsgCtx, 
StorageManager storageManager, boolean serverSide)
+       private static void sendAckIfNeeded(String sequenceId, RMMsgContext 
rmMsgCtx, 
+                       StorageManager storageManager, boolean serverSide, 
boolean anonymousAcksTo)
                                        throws AxisFault {
 
                if (log.isDebugEnabled())
-                       log.debug("Enter: SequenceProcessor::sendAckIfNeeded");
-               
-               Sequence sequence = (Sequence) rmMsgCtx
-                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-               
-               if(sequence!=null){
-                       String sequenceId = 
sequence.getIdentifier().getIdentifier();
-                       ConfigurationContext configCtx = 
rmMsgCtx.getMessageContext()
-                                       .getConfigurationContext();
-                       if (configCtx == null) {
-                               String message = SandeshaMessageHelper
-                                               
.getMessage(SandeshaMessageKeys.configContextNotSet);
-                               if (log.isDebugEnabled())
-                                       log.debug(message);
-                               throw new SandeshaException(message);
-                       }
+                       log.debug("Enter: SequenceProcessor::sendAckIfNeeded " 
+ sequenceId);
 
                        RMMsgContext ackRMMsgCtx = 
AcknowledgementManager.generateAckMessage(
                                        rmMsgCtx , sequenceId, storageManager,
                                        false, serverSide);
-                       MessageContext ackMsgCtx = 
ackRMMsgCtx.getMessageContext();
-
-                       EndpointReference acksTo = ackRMMsgCtx.getTo();
-                       EndpointReference replyTo = rmMsgCtx.getReplyTo();
-                       boolean anonAck = (acksTo == null) || 
acksTo.hasAnonymousAddress();
-                       boolean anonReply = (replyTo == null) || 
replyTo.hasAnonymousAddress();
-
-                       // Only use the backchannel for ack messages if we are 
sure that the
-                       // application
-                       // doesn't need it. A 1-way MEP should be complete by 
now.
-                       boolean complete = 
ackMsgCtx.getOperationContext().isComplete();
-                       if (anonAck && anonReply && !complete) {
-                               if (log.isDebugEnabled())
-                                       log
-                                                       .debug("Exit: 
SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
-                               return;
-                       }
-
-                       long ackInterval = SandeshaUtil.getPropertyBean(
-                                       
rmMsgCtx.getMessageContext().getAxisService())
-                                       .getAcknowledgementInterval();
 
-                       long timeToSend = System.currentTimeMillis() + 
ackInterval;
-                       if (anonAck) {
+                       if (anonymousAcksTo) {
+                               
rmMsgCtx.getMessageContext().getOperationContext().
+                                       
setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
                                AcknowledgementManager.sendAckNow(ackRMMsgCtx);
-                       } else if (!anonAck) {
+                       } else {                                
+                               long ackInterval = SandeshaUtil.getPropertyBean(
+                                               
rmMsgCtx.getMessageContext().getAxisService())
+                                               .getAcknowledgementInterval();
+                               long timeToSend = System.currentTimeMillis() + 
ackInterval;
                                
AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, 
storageManager);
                        }                       
-               }
 
                if (log.isDebugEnabled())
                        log.debug("Exit: SequenceProcessor::sendAckIfNeeded");

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=509965&r1=509964&r2=509965
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
 Wed Feb 21 02:55:08 2007
@@ -268,6 +268,7 @@
                result.append(this.getClass().getName());
                result.append("\nSequence Id    : "); result.append(sequenceID);
                result.append("\nInternal Seq Id: "); 
result.append(internalSequenceID);
+               result.append("\nTo             : "); result.append(toAddress);
                result.append("\nMessage Number : "); 
result.append(messageNumber);
                result.append("\nMessage Type   : "); 
result.append(messageType);
                result.append("\nMessage Key    : "); 
result.append(messageContextRefKey);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to