Author: gatfora
Date: Tue Jun 24 01:22:54 2008
New Revision: 671059

URL: http://svn.apache.org/viewvc?rev=671059&view=rev
Log:
Applying patches from SANDESHA2-164, thanks David and Sara

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 Tue Jun 24 01:22:54 2008
@@ -42,6 +42,7 @@
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.Sender;
 
 /**
  * Contains logic for managing acknowledgements.
@@ -58,96 +59,102 @@
         * @param applicationRMMsgContext
         * @throws SandeshaException
         */
-       public static void piggybackAcksIfPresent(RMMsgContext 
rmMessageContext, StorageManager storageManager)
-                       throws SandeshaException {
+       public static void piggybackAcksIfPresent(RMMsgContext 
rmMessageContext, StorageManager storageManager) throws SandeshaException {
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Enter: 
AcknowledgementManager::piggybackAcksIfPresent");
-               
+
                SenderBeanMgr retransmitterBeanMgr = 
storageManager.getSenderBeanMgr();
 
-               // If this message is going to an anonymous address, and the 
inbound sequence has
+               // If this message is going to an anonymous address, and the 
inbound
+               // sequence has
                // anonymous acksTo, then we add in an ack for the inbound 
sequence.
                EndpointReference target = rmMessageContext.getTo();
-               if(target == null || target.hasAnonymousAddress()) {
-                       // We have no good indicator of the identity of the 
destination, so the only sequence
-                       // we can ack is the inbound one that caused us to 
create this response.
+               if (target == null || target.hasAnonymousAddress()) {
+                       // We have no good indicator of the identity of the 
destination, so
+                       // the only sequence
+                       // we can ack is the inbound one that caused us to 
create this
+                       // response.
                        String inboundSequence = (String) 
rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
-                       if(inboundSequence != null) {
+                       if (inboundSequence != null) {
                                RMDBean inboundBean = 
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
-                               if(inboundBean != null && 
!inboundBean.isTerminated()) {
+                               if (inboundBean != null && 
!inboundBean.isTerminated()) {
                                        EndpointReference acksToEPR = 
inboundBean.getAcksToEndpointReference();
 
-                                       if(acksToEPR == null || 
acksToEPR.hasAnonymousAddress()) {
-                                               
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
+                                       if (acksToEPR == null || 
acksToEPR.hasAnonymousAddress()) {
+                                               if 
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+                                                       log.debug("Piggybacking 
ack for inbound sequence: " + inboundSequence);
                                                
RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean, 
false);
                                        }
                                }
                        }
-                       if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, anon");
+                       if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                               log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, anon");
                        return;
-               }
-               else{
-                       //an addressable EPR
-                       if(SandeshaUtil.hasReferenceParameters(target)){
-                               //we should not proceed since we cannot 
properly compare ref params
-                               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+               } else {
+                       // an addressable EPR
+                       if (SandeshaUtil.hasReferenceParameters(target)) {
+                               // we should not proceed since we cannot 
properly compare ref
+                               // params
+                               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                                       log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
                                return;
                        }
-                       
-                   // From here on, we must be dealing with a real address. 
Piggyback all sequences that have an
-                   // acksTo that matches the To address, and that have an 
ackMessage queued up for sending. We
-                   // search for RMDBeans first, to avoid a deadlock.
-                   //
-                   // As a special case, if this is a terminate sequence 
message then add in ack messages for
-                   // any sequences that have an acksTo that matches the 
target address. This helps to ensure
-                   // that request-response sequence pairs end cleanly.
-                   RMDBean findRMDBean = new RMDBean();
-                   findRMDBean.setAcksToEndpointReference(target);
-                   findRMDBean.setTerminated(false);
-                   Collection rmdBeans = 
storageManager.getRMDBeanMgr().find(findRMDBean);
-                   Iterator sequences = rmdBeans.iterator();
-                   while(sequences.hasNext()) {
-                     RMDBean sequence = (RMDBean) sequences.next();
-                     
if(SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())){
-                         //we should not piggy back if there are reference 
parameters in the acksTo EPR since we cannot compare them
-                         if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
-                         break;
-                     }
-                                       
-                     String sequenceId = sequence.getSequenceID();
-                     
-                     // Look for the SenderBean that carries the ack, there 
should be at most one
-                     SenderBean findBean = new SenderBean();
-                     
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-                     findBean.setSend(true);
-                     findBean.setSequenceID(sequenceId);
-                     findBean.setToAddress(target.getAddress());
-                     
-                     SenderBean ackBean = 
retransmitterBeanMgr.findUnique(findBean);
-                     
-                               // Piggybacking will happen only if the end of 
ack interval (timeToSend) is not reached.
-                               long timeNow = System.currentTimeMillis();
-                           if (ackBean != null && ackBean.getTimeToSend() > 
timeNow) {
-                                       // Delete the beans that would have 
sent the ack
-                                       
retransmitterBeanMgr.delete(ackBean.getMessageID());
-                                       
storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
-
-                               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
-                       RMMsgCreator.addAckMessage(rmMessageContext, 
sequenceId, sequence, false);
-
-
-                           } else if(rmMessageContext.getMessageType() == 
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-                               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
-                                 
-                               if(sequence.getHighestInMessageNumber() > 0) {
-                                                 
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) 
log.debug("Piggybacking ack for sequence: " + sequenceId);
 
-                                       
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+                       String inboundSequence = (String) 
rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+                       // If there's an inbound sequence (i.e. we're provider 
side) we'll
+                       // use that, otherwise
+                       // we'll go to the expense of looking the sequence up 
by the acksTo
+                       // address.
+                       if (inboundSequence != null) {
+                               // We used to look for an ack sender bean 
before piggybacking an
+                               // ack, but in the high-througput
+                               // scenarios there always was one, and in the 
low thoughput
+                               // scenarios it's less of an issue if
+                               // we piggyback when we don't have to. so for 
now, lets mimic
+                               // the old high-throughout behaviour
+                               // in a cheap way by always piggybacking.
+                               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                                       log.debug("Piggybacking ack for 
sequence: " + inboundSequence);
+                               RMDBean sequence = 
storageManager.getRMDBeanMgr().retrieve(inboundSequence);
+                               RMMsgCreator.addAckMessage(rmMessageContext, 
inboundSequence, sequence, false);
+                               ((Sender) 
storageManager.getSender()).removeScheduledAcknowledgement(inboundSequence);
+                       } else {
+                               RMDBean findRMDBean = new RMDBean();
+                               findRMDBean.setAcksToEndpointReference(target);
+                               findRMDBean.setTerminated(false);
+                               Collection rmdBeans = 
storageManager.getRMDBeanMgr().find(findRMDBean);
+                               Iterator sequences = rmdBeans.iterator();
+                               while (sequences.hasNext()) {
+                                       RMDBean sequence = (RMDBean) 
sequences.next();
+                                       if 
(SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())) {
+                                               // we should not piggy back if 
there are reference
+                                               // parameters in the acksTo EPR 
since we cannot compare
+                                               // them
+                                               if 
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+                                                       log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
+                                               break;
                                        }
+
+                                       String sequenceId = 
sequence.getSequenceID();
+
+                                       // We used to look for an ack sender 
bean before
+                                       // piggybacking an ack, but in the 
high-througput
+                                       // scenarios there always was one, and 
in the low thoughput
+                                       // scenarios it's less of an issue if
+                                       // we piggyback when we don't have to. 
so for now, lets
+                                       // mimic the old high-throughout 
behaviour
+                                       // in a cheap way by always 
piggybacking.
+                                       if 
(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
+                                               log.debug("Piggybacking ack for 
sequence: " + sequenceId);
+
+                                       
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false);
+
+                                       ((Sender) 
storageManager.getSender()).removeScheduledAcknowledgement(sequenceId);
+
                                }
                        }
                }
-               
+
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Exit: 
AcknowledgementManager::piggybackAcksIfPresent");
                return;
@@ -159,20 +166,18 @@
         * @param sequencePropertyKey
         * @param sequenceId
         * @param storageManager
-        * @param makeResponse Some work will be done to make the new ack 
message the response of the reference message.
+        * @param makeResponse
+        *            Some work will be done to make the new ack message the
+        *            response of the reference message.
         * @return
         * @throws AxisFault
         */
        public static RMMsgContext generateAckMessage(
-                       
-                       RMMsgContext referenceRMMessage,
-                       RMDBean rmdBean,
-                       String sequenceId,
-                       StorageManager storageManager, 
-                       boolean serverSide
-                       
-                       ) throws AxisFault {
-               
+
+       RMMsgContext referenceRMMessage, RMDBean rmdBean, String sequenceId, 
StorageManager storageManager, boolean serverSide
+
+       ) throws AxisFault {
+
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Enter: 
AcknowledgementManager::generateAckMessage " + rmdBean);
 
@@ -180,16 +185,14 @@
 
                EndpointReference acksTo = rmdBean.getAcksToEndpointReference();
 
-               if (acksTo==null || acksTo.getAddress() == null)
+               if (acksTo == null || acksTo.getAddress() == null)
                        throw new 
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
 
-               AxisOperation ackOperation = 
SpecSpecificConstants.getWSRMOperation(
-                               Sandesha2Constants.MessageTypes.ACK,
-                               rmdBean.getRMVersion(),
-                               referenceMsg.getAxisService());
+               AxisOperation ackOperation = 
SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.ACK, 
rmdBean.getRMVersion(), referenceMsg
+                               .getAxisService());
 
                MessageContext ackMsgCtx = 
SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);
-               
+
                
ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
 
                RMMsgContext ackRMMsgCtx = 
MsgInitializer.initializeMessage(ackMsgCtx);
@@ -198,8 +201,7 @@
 
                ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
 
-               SOAPFactory factory = 
SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-                               .getSOAPVersion(referenceMsg.getEnvelope()));
+               SOAPFactory factory = 
SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(referenceMsg.getEnvelope()));
 
                // Setting new envelope
                SOAPEnvelope envelope = factory.getDefaultEnvelope();
@@ -207,7 +209,7 @@
                ackMsgCtx.setEnvelope(envelope);
 
                ackMsgCtx.setTo(acksTo);
-               
+
                ackMsgCtx.setServerSide(serverSide);
 
                // adding the SequenceAcknowledgement part.
@@ -218,16 +220,13 @@
                return ackRMMsgCtx;
        }
 
-       
-       
-
        public static boolean verifySequenceCompletion(RangeString ackRanges, 
long lastMessageNo) {
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Enter: 
AcknowledgementManager::verifySequenceCompletion");
 
                boolean result = false;
                Range complete = new Range(1, lastMessageNo);
-               if(ackRanges.isRangeCompleted(complete)) {
+               if (ackRanges.isRangeCompleted(complete)) {
                        result = true;
                }
 
@@ -235,17 +234,14 @@
                        log.debug("Exit: 
AcknowledgementManager::verifySequenceCompletion " + result);
                return result;
        }
-       
-       public static void addAckBeanEntry (
-                       RMMsgContext ackRMMsgContext,
-                       String sequenceId, 
-                       long timeToSend,
-                       StorageManager storageManager) throws AxisFault {
-               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Enter: 
AcknowledgementManager::addAckBeanEntry");
+
+       public static void addAckBeanEntry(RMMsgContext ackRMMsgContext, String 
sequenceId, long timeToSend, StorageManager storageManager) throws AxisFault {
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Enter: 
AcknowledgementManager::addAckBeanEntry");
 
                // Write the acks into the envelope
                ackRMMsgContext.addSOAPEnvelope();
-               
+
                MessageContext ackMsgContext = 
ackRMMsgContext.getMessageContext();
 
                SenderBeanMgr retransmitterBeanMgr = 
storageManager.getSenderBeanMgr();
@@ -258,7 +254,7 @@
                ackBean.setReSend(false);
                ackBean.setSequenceID(sequenceId);
                EndpointReference to = ackMsgContext.getTo();
-               if (to!=null)
+               if (to != null)
                        ackBean.setToAddress(to.getAddress());
 
                ackBean.setSend(true);
@@ -275,9 +271,9 @@
                Collection coll = retransmitterBeanMgr.find(findBean);
                Iterator it = coll.iterator();
 
-               while(it.hasNext()) {
+               while (it.hasNext()) {
                        SenderBean oldAckBean = (SenderBean) it.next();
-                       if(oldAckBean.getTimeToSend() < timeToSend)
+                       if (oldAckBean.getTimeToSend() < timeToSend)
                                timeToSend = oldAckBean.getTimeToSend();
 
                        // removing the retransmitted entry for the oldAck
@@ -290,42 +286,44 @@
                ackBean.setTimeToSend(timeToSend);
 
                
ackMsgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, 
Sandesha2Constants.VALUE_FALSE);
-               
+
                // passing the message through sandesha2sender
                ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, 
Sandesha2Constants.VALUE_TRUE);
-               
-           SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
+
+               SandeshaUtil.executeAndStore(ackRMMsgContext, key, 
storageManager);
 
                // inserting the new ack.
                retransmitterBeanMgr.insert(ackBean);
 
-               if(LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled()) log.debug("Exit: 
AcknowledgementManager::addAckBeanEntry");
+               if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
+                       log.debug("Exit: 
AcknowledgementManager::addAckBeanEntry");
        }
-       
-       public static void sendAckNow (RMMsgContext ackRMMsgContext) throws 
AxisFault {
+
+       public static void sendAckNow(RMMsgContext ackRMMsgContext) throws 
AxisFault {
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
                        log.debug("Enter: AcknowledgementManager::sendAckNow");
 
                // Write the acks into the envelope
                ackRMMsgContext.addSOAPEnvelope();
-               
+
                MessageContext ackMsgContext = 
ackRMMsgContext.getMessageContext();
-               
+
                // setting CONTEXT_WRITTEN since acksto is anonymous
                if (ackRMMsgContext.getMessageContext().getOperationContext() 
== null) {
                        // operation context will be null when doing in a GLOBAL
                        // handler.
                        AxisOperation op = ackMsgContext.getAxisOperation();
 
-                       OperationContext opCtx = 
OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), 
op, ackRMMsgContext.getMessageContext().getServiceContext());
+                       OperationContext opCtx = 
OperationContextFactory.createOperationContext(op.getAxisSpecificMEPConstant(), 
op, ackRMMsgContext.getMessageContext()
+                                       .getServiceContext());
                        
ackRMMsgContext.getMessageContext().setOperationContext(opCtx);
                }
 
                ackRMMsgContext.getMessageContext().setServerSide(true);
-               
+
                AxisEngine.send(ackMsgContext);
-               
+
                if (LoggingControl.isAnyTracingEnabled() && 
log.isDebugEnabled())
-                       log.debug("Exit: AcknowledgementManager::sendAckNow");  
        
-       }       
+                       log.debug("Exit: AcknowledgementManager::sendAckNow");
+       }
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
 Tue Jun 24 01:22:54 2008
@@ -19,10 +19,15 @@
 
 package org.apache.sandesha2.util;
 
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.Parameter;
@@ -204,10 +209,11 @@
                        // Server side, we want the replyTo and AcksTo EPRs to 
point into this server.
                        // We can work that out by looking at the RMD bean that 
pulled the message in,
                        // and copying its 'ReplyTo' address.
-                       if(inboundBean != null && 
inboundBean.getReplyToEndpointReference() != null) {
-                               acksToEPR = 
inboundBean.getReplyToEndpointReference();
-                               replyToEPR = 
inboundBean.getReplyToEndpointReference();
-                       } else {
+                       EndpointReference strippedReplyToEpr = 
stripAddress(inboundBean.getReplyToEndpointReference());
+                       if(inboundBean != null && strippedReplyToEpr != null) {
+                               acksToEPR = strippedReplyToEpr;
+                               replyToEPR = strippedReplyToEpr;
+               } else {
                                String beanInfo = (inboundBean == null) ? 
"null" : inboundBean.toString();
                                String message = 
SandeshaMessageHelper.getMessage(
                                                
SandeshaMessageKeys.cannotChooseAcksTo, inboundSequence, beanInfo);
@@ -384,4 +390,40 @@
 
                return specVersion;
        }
+       
+       /* becuase RM reuses the incoming EPRs.  Need to use only the address.
+        */
+       private static EndpointReference stripAddress(EndpointReference eprIn){
+               if(log.isDebugEnabled()) log.debug("stripAddress from 
EndpointReference : " + eprIn);
+               EndpointReference epr = new 
EndpointReference(eprIn.getAddress());
+               return epr;
+/**            
+               String schemaNs = 
"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd";;
+               String securityNs = 
"http://schemas.xmlsoap.org/ws/2003/06/utility";;
+               
+               Iterator it = eprOut.getAttributes().iterator();
+               while(it.hasNext()){
+                       OMAttribute attribute = (OMAttribute)it.next();
+                       String ns = attribute.getNamespace().getNamespaceURI();
+                       String name = attribute.getLocalName();
+                       if((schemaNs.equals(ns) || securityNs.equals(ns)) && 
"Id".equals(name)){
+                               //delete attribute
+                               it.remove();
+                       } 
+               }
+               Iterator it2 = eprOut.getAddressAttributes().iterator();
+               while(it2.hasNext()){
+                       OMAttribute attribute = (OMAttribute)it2.next();
+                       String ns = attribute.getNamespace().getNamespaceURI();
+                       String name = attribute.getLocalName();
+                       if((schemaNs.equals(ns) || securityNs.equals(ns)) && 
"Id".equals(name)){
+                               //delete attribute
+                               it2.remove();
+                       } 
+               }
+               
+               return eprOut;
+               **/
+}
+
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=671059&r1=671058&r2=671059&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Tue Jun 24 01:22:54 2008
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.MessageContext;
@@ -63,17 +64,39 @@
        // If this sender is working for several sequences, we use round-robin 
to
        // try and give them all a chance to invoke messages.
        int nextIndex = 0;
+
        boolean processedMessage = false;
+
        long lastHousekeeping = 0;
-       
+
        private static int HOUSEKEEPING_INTERVAL = 20000;
-       
-       public Sender () {
+
+       private ConcurrentHashMap ackMap = new ConcurrentHashMap();
+
+       private static class AckHolder {
+               public long tts = 0;
+
+               public RMMsgContext refMsg;
+       }
+
+       public Sender() {
                super(Sandesha2Constants.SENDER_SLEEP_TIME);
        }
 
+       public void scheduleAddressableAcknowledgement(String sequenceId, long 
ackInterval, RMMsgContext ref) {
+               AckHolder ackH = new AckHolder();
+               ackH.tts = System.currentTimeMillis() + ackInterval;
+               ackH.refMsg = ref;
+               ackMap.putIfAbsent(sequenceId, ackH);
+       }
+
+       public void removeScheduledAcknowledgement(String sequenceId) {
+               ackMap.remove(sequenceId);
+       }
+
        protected boolean internalRun() {
-               if (log.isDebugEnabled()) log.debug("Enter: 
Sender::internalRun");
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::internalRun");
 
                Transaction transaction = null;
                boolean sleep = false;
@@ -85,33 +108,38 @@
 
                        if (log.isDebugEnabled())
                                log.debug("Choosing one from " + size + " 
sequences");
-                       if(nextIndex >= size) {
+                       if (nextIndex >= size) {
                                nextIndex = 0;
 
-                               // We just looped over the set of sequences. If 
we didn't process any
+                               // We just looped over the set of sequences. If 
we didn't
+                               // process any
                                // messages on this loop then we sleep before 
the next one
-                               if(size == 0 || !processedMessage) {
+                               if (size == 0 || !processedMessage) {
                                        sleep = true;
                                }
                                processedMessage = false;
-                               
-                               if(System.currentTimeMillis()-lastHousekeeping 
> HOUSEKEEPING_INTERVAL){
-                                       // At this point - delete any sequences 
that have timed out, or been terminated.
+
+                               if (System.currentTimeMillis() - 
lastHousekeeping > HOUSEKEEPING_INTERVAL) {
+                                       // At this point - delete any sequences 
that have timed out,
+                                       // or been terminated.
                                        
deleteTerminatedSequences(storageManager);
 
-                                       // Also clean up and sender beans that 
are not yet eligible for sending, but
+                                       // Also clean up and sender beans that 
are not yet eligible
+                                       // for sending, but
                                        // are blocking the transport threads.
                                        unblockTransportThreads(storageManager);
 
-                                       // Finally, check for messages that can 
only be serviced by polling, and warn
+                                       // Finally, check for messages that can 
only be serviced by
+                                       // polling, and warn
                                        // the user if they are too old
                                        checkForOrphanMessages(storageManager);
                                        lastHousekeeping = 
System.currentTimeMillis();
                                }
-                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, looped over all sequences, sleep " + sleep);
+                               if (log.isDebugEnabled())
+                                       log.debug("Exit: Sender::internalRun, 
looped over all sequences, sleep " + sleep);
                                return sleep;
                        }
-                       
+
                        transaction = storageManager.getTransaction();
 
                        SequenceEntry entry = (SequenceEntry) 
allSequencesList.get(nextIndex++);
@@ -122,62 +150,78 @@
                        String rmVersion = null;
                        // Check that the sequence is still valid
                        boolean found = false;
-                       if(entry.isRmSource()) {
+                       if (entry.isRmSource()) {
                                RMSBean matcher = new RMSBean();
                                matcher.setInternalSequenceID(sequenceId);
                                matcher.setTerminated(false);
                                RMSBean rms = 
storageManager.getRMSBeanMgr().findUnique(matcher);
-                               if(rms != null && !rms.isTerminated() && 
!rms.isTimedOut()) {
-                                       sequenceId = rms.getSequenceID();       
                                
-                                       if 
(SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))          
                             
+                               if (rms != null && !rms.isTerminated() && 
!rms.isTimedOut()) {
+                                       sequenceId = rms.getSequenceID();
+                                       if 
(SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
                                                
SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, 
storageManager);
                                        else
                                                found = true;
                                        rmVersion = rms.getRMVersion();
                                }
-                               
+
                        } else {
                                RMDBean matcher = new RMDBean();
                                matcher.setSequenceID(sequenceId);
                                matcher.setTerminated(false);
                                RMDBean rmd = 
storageManager.getRMDBeanMgr().findUnique(matcher);
-                               if(rmd != null) {
+                               if (rmd != null) {
                                        found = true;
                                        rmVersion = rmd.getRMVersion();
                                }
                        }
                        if (!found) {
                                stopThreadForSequence(sequenceId, 
entry.isRmSource());
-                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, sequence has ended");
-                               
-                               if(transaction != null && 
transaction.isActive()) {
+                               if (log.isDebugEnabled())
+                                       log.debug("Exit: Sender::internalRun, 
sequence has ended");
+
+                               if (transaction != null && 
transaction.isActive()) {
                                        transaction.commit();
                                        transaction = null;
                                }
-                               
+
                                return false;
                        }
-                       
+                       if (sequenceId != null) {
+                               AckHolder acktts = (AckHolder) 
ackMap.get(sequenceId);
+                               if (acktts != null && acktts.tts < 
System.currentTimeMillis()) {
+                                       ackMap.remove(sequenceId);
+                                       RMDBean rmd = 
storageManager.getRMDBeanMgr().retrieve(sequenceId);
+                                       if (rmd != null) {
+                                               RMMsgContext ackRMMsgContext = 
AcknowledgementManager.generateAckMessage(acktts.refMsg, rmd, sequenceId, 
storageManager, true);
+
+                                               
AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, acktts.tts, 
storageManager);
+                                               transaction.commit();
+                                               transaction = 
storageManager.getTransaction();
+                                       }
+
+                               }
+                       }
                        SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
                        SenderBean senderBean = 
mgr.getNextMsgToSend(sequenceId);
-                       
+
                        if (senderBean == null) {
-                               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, no message for this sequence");
-                               
-                               if(transaction != null && 
transaction.isActive()) {
+                               if (log.isDebugEnabled())
+                                       log.debug("Exit: Sender::internalRun, 
no message for this sequence");
+
+                               if (transaction != null && 
transaction.isActive()) {
                                        transaction.commit();
                                        transaction = null;
                                }
-                               
+
                                return false; // Move on to the next sequence 
in the list
                        }
 
                        // work Id is used to define the piece of work that 
will be
                        // assigned to the Worker thread,
                        // to handle this Sender bean.
-                       
-                       //workId contains a timeTiSend part to cater for 
retransmissions.
-                       //This will cause retransmissions to be treated as new 
work.
+
+                       // workId contains a timeTiSend part to cater for 
retransmissions.
+                       // This will cause retransmissions to be treated as new 
work.
                        String workId = senderBean.getMessageID() + 
senderBean.getTimeToSend();
 
                        // check weather the bean is already assigned to a 
worker.
@@ -185,51 +229,51 @@
                                // As there is already a worker running we are 
probably looping
                                // too fast, so sleep on the next loop.
                                if (log.isDebugEnabled()) {
-                                       String message = 
SandeshaMessageHelper.getMessage(
-                                                                       
SandeshaMessageKeys.workAlreadyAssigned,
-                                                                       workId);
+                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, 
workId);
                                        log.debug("Exit: Sender::internalRun, " 
+ message + ", sleeping");
                                }
-                               
-                               if(transaction != null && 
transaction.isActive()) {
+
+                               if (transaction != null && 
transaction.isActive()) {
                                        transaction.commit();
                                        transaction = null;
                                }
-                               
+
                                return true;
                        }
 
-                       //commiting the transaction here to release resources 
early.
-                       if(transaction != null && transaction.isActive()) 
transaction.commit();
+                       // commiting the transaction here to release resources 
early.
+                       if (transaction != null && transaction.isActive())
+                               transaction.commit();
                        transaction = null;
 
                        // start a worker which will work on this messages.
                        SenderWorker worker = new SenderWorker(context, 
senderBean, rmVersion);
                        worker.setLock(getWorkerLock());
                        worker.setWorkId(workId);
-                       
+
                        try {
                                // Set the lock up before we start the thread, 
but roll it back
                                // if we hit any problems
                                getWorkerLock().addWork(workId, worker);
                                threadPool.execute(worker);
-                       } catch(Exception e) {
+                       } catch (Exception e) {
                                getWorkerLock().removeWork(workId);
-                       }                       
+                       }
 
-                       // If we got to here then we found work to do on the 
sequence, so we should
+                       // If we got to here then we found work to do on the 
sequence, so we
+                       // should
                        // remember not to sleep at the end of the list of 
sequences.
                        processedMessage = true;
-                       
+
                } catch (Exception e) {
 
                        // TODO : when this is the client side throw the 
exception to
                        // the client when necessary.
 
-                       
-                       //TODO rollback only if a SandeshaStorageException.
-                       //This allows the other Exceptions to be used within 
the Normal flow.
-                       
+                       // TODO rollback only if a SandeshaStorageException.
+                       // This allows the other Exceptions to be used within 
the Normal
+                       // flow.
+
                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, 
e.toString());
                        log.debug(message, e);
                } finally {
@@ -238,111 +282,113 @@
                                        transaction.rollback();
                                        transaction = null;
                                } catch (Exception e) {
-                                       String message = SandeshaMessageHelper
-                                                       
.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
+                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, 
e.toString());
                                        log.debug(message, e);
                                }
                        }
                }
-               if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, 
not sleeping");
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::internalRun, not sleeping");
                return false;
        }
 
        /**
-        * Finds any RMDBeans that have not been used inside the set 
InnactivityTimeoutInterval
+        * Finds any RMDBeans that have not been used inside the set
+        * InnactivityTimeoutInterval
+        * 
+        * Iterates through RMSBeans and RMDBeans that have been terminated or 
timed
+        * out and deletes them.
         * 
-        * Iterates through RMSBeans and RMDBeans that have been terminated or 
timed out and 
-        * deletes them.
-        *
         */
        private void deleteTerminatedSequences(StorageManager storageManager) {
-               if (log.isDebugEnabled()) 
+               if (log.isDebugEnabled())
                        log.debug("Enter: Sender::deleteTerminatedSequences");
 
                RMSBean finderBean = new RMSBean();
                finderBean.setTerminated(true);
-               
+
                Transaction transaction = null;
-               
+
                try {
                        transaction = storageManager.getTransaction();
-                       
-                       SandeshaPolicyBean propertyBean = 
-                               
SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
                       
-
-       long deleteTime = propertyBean.getSequenceRemovalTimeoutInterval();
-       if (deleteTime < 0)
-               deleteTime = 0;
 
-       if (deleteTime > 0) {
+                       SandeshaPolicyBean propertyBean = 
SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
+
+                       long deleteTime = 
propertyBean.getSequenceRemovalTimeoutInterval();
+                       if (deleteTime < 0)
+                               deleteTime = 0;
+
+                       if (deleteTime > 0) {
                                // Find terminated sequences.
-                   List rmsBeans = 
storageManager.getRMSBeanMgr().find(finderBean);
-                   
-                   deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-                   
-                   finderBean.setTerminated(false);
-                   finderBean.setTimedOut(true);
-                   
-                   // Find timed out sequences
-                   rmsBeans = storageManager.getRMSBeanMgr().find(finderBean);
-                           
-                   deleteRMSBeans(rmsBeans, propertyBean, deleteTime);
-                   
-                   // Remove any terminated RMDBeans.
-                   RMDBean finderRMDBean = new RMDBean();
-                   finderRMDBean.setTerminated(true);
-                   
-                   List rmdBeans = 
storageManager.getRMDBeanMgr().find(finderRMDBean);
-       
-                   Iterator beans = rmdBeans.iterator();
-                   while (beans.hasNext()) {
-                       RMDBean rmdBean = (RMDBean)beans.next();
-                       
-                       long timeNow = System.currentTimeMillis();
-                       long lastActivated = rmdBean.getLastActivatedTime();
-       
-                       // delete sequences that have been timedout or deleted 
for more than 
-                       // the SequenceRemovalTimeoutInterval
-                       if ((lastActivated + deleteTime) < timeNow) {
-                               if (log.isDebugEnabled())
-                                       log.debug("Deleting RMDBean " + 
deleteTime + " : " + rmdBean);
-                               
storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
-                       }                               
-                   }
-       }
+                               List rmsBeans = 
storageManager.getRMSBeanMgr().find(finderBean);
+
+                               deleteRMSBeans(rmsBeans, propertyBean, 
deleteTime);
+
+                               finderBean.setTerminated(false);
+                               finderBean.setTimedOut(true);
+
+                               // Find timed out sequences
+                               rmsBeans = 
storageManager.getRMSBeanMgr().find(finderBean);
+
+                               deleteRMSBeans(rmsBeans, propertyBean, 
deleteTime);
+
+                               // Remove any terminated RMDBeans.
+                               RMDBean finderRMDBean = new RMDBean();
+                               finderRMDBean.setTerminated(true);
+
+                               List rmdBeans = 
storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+                               Iterator beans = rmdBeans.iterator();
+                               while (beans.hasNext()) {
+                                       RMDBean rmdBean = (RMDBean) 
beans.next();
 
-           // Terminate RMD Sequences that have been inactive.                 
+                                       long timeNow = 
System.currentTimeMillis();
+                                       long lastActivated = 
rmdBean.getLastActivatedTime();
+
+                                       // delete sequences that have been 
timedout or deleted for
+                                       // more than
+                                       // the SequenceRemovalTimeoutInterval
+                                       if ((lastActivated + deleteTime) < 
timeNow) {
+                                               if (log.isDebugEnabled())
+                                                       log.debug("Deleting 
RMDBean " + deleteTime + " : " + rmdBean);
+                                               
storageManager.getRMDBeanMgr().delete(rmdBean.getSequenceID());
+                                       }
+                               }
+                       }
+
+                       // Terminate RMD Sequences that have been inactive.
                        if (propertyBean.getInactivityTimeoutInterval() > 0) {
-                   RMDBean finderRMDBean = new RMDBean();
-                   finderRMDBean.setTerminated(false);
-                               
-                   List rmdBeans = 
storageManager.getRMDBeanMgr().find(finderRMDBean);
-                       
-                   Iterator beans = rmdBeans.iterator();
-                   while (beans.hasNext()) {
-                       RMDBean rmdBean = (RMDBean)beans.next();
-                       
-                       long timeNow = System.currentTimeMillis();
-                       long lastActivated = rmdBean.getLastActivatedTime();
-                       
-                       if ((lastActivated + 
propertyBean.getInactivityTimeoutInterval()) < timeNow) {
-                               // Terminate
-                               rmdBean.setTerminated(true);
-                               rmdBean.setLastActivatedTime(timeNow);
-                               if (log.isDebugEnabled())
-                                       log.debug(System.currentTimeMillis() + 
"Marking RMDBean as terminated " + rmdBean);
-                               storageManager.getRMDBeanMgr().update(rmdBean);
-                       }                               
-                   }
-                       }           
-           
-                       if(transaction != null && transaction.isActive()) 
transaction.commit();
-                       
+                               RMDBean finderRMDBean = new RMDBean();
+                               finderRMDBean.setTerminated(false);
+
+                               List rmdBeans = 
storageManager.getRMDBeanMgr().find(finderRMDBean);
+
+                               Iterator beans = rmdBeans.iterator();
+                               while (beans.hasNext()) {
+                                       RMDBean rmdBean = (RMDBean) 
beans.next();
+
+                                       long timeNow = 
System.currentTimeMillis();
+                                       long lastActivated = 
rmdBean.getLastActivatedTime();
+
+                                       if ((lastActivated + 
propertyBean.getInactivityTimeoutInterval()) < timeNow) {
+                                               // Terminate
+                                               rmdBean.setTerminated(true);
+                                               
rmdBean.setLastActivatedTime(timeNow);
+                                               if (log.isDebugEnabled())
+                                                       
log.debug(System.currentTimeMillis() + "Marking RMDBean as terminated " + 
rmdBean);
+                                               
storageManager.getRMDBeanMgr().update(rmdBean);
+                                       }
+                               }
+                       }
+
+                       if (transaction != null && transaction.isActive())
+                               transaction.commit();
+
                } catch (SandeshaException e) {
                        if (log.isErrorEnabled())
                                log.error(e);
                } finally {
-                       if(transaction != null && transaction.isActive()) {
+                       if (transaction != null && transaction.isActive()) {
                                try {
                                        transaction.rollback();
                                } catch (SandeshaStorageException e) {
@@ -351,67 +397,72 @@
                                }
                        }
                }
-               
-               if (log.isDebugEnabled()) 
+
+               if (log.isDebugEnabled())
                        log.debug("Exit: Sender::deleteTerminatedSequences");
        }
-       
-       private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean 
propertyBean, long deleteTime) 
 
-       throws SandeshaStorageException {               
-               if (log.isDebugEnabled()) 
+       private void deleteRMSBeans(List rmsBeans, SandeshaPolicyBean 
propertyBean, long deleteTime)
+
+       throws SandeshaStorageException {
+               if (log.isDebugEnabled())
                        log.debug("Enter: Sender::deleteRMSBeans");
 
-    Iterator beans = rmsBeans.iterator();
-    
-    while (beans.hasNext())
-    {
-       RMSBean rmsBean = (RMSBean)beans.next();
-       long timeNow = System.currentTimeMillis();
-       long lastActivated = rmsBean.getLastActivatedTime();
-       // delete sequences that have been timedout or deleted for more than 
-       // the SequenceRemovalTimeoutInterval
-       
-       if ((lastActivated + deleteTime) < timeNow) {
-               if (log.isDebugEnabled())
-                       log.debug("Removing RMSBean " + rmsBean);
-               
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
-               storageManager.removeMessageContext( 
rmsBean.getReferenceMessageStoreKey() );
-       }               
-    }
+               Iterator beans = rmsBeans.iterator();
 
-               if (log.isDebugEnabled()) 
+               while (beans.hasNext()) {
+                       RMSBean rmsBean = (RMSBean) beans.next();
+                       long timeNow = System.currentTimeMillis();
+                       long lastActivated = rmsBean.getLastActivatedTime();
+                       // delete sequences that have been timedout or deleted 
for more than
+                       // the SequenceRemovalTimeoutInterval
+
+                       if ((lastActivated + deleteTime) < timeNow) {
+                               if (log.isDebugEnabled())
+                                       log.debug("Removing RMSBean " + 
rmsBean);
+                               
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+                               
storageManager.removeMessageContext(rmsBean.getReferenceMessageStoreKey());
+                       }
+               }
+
+               if (log.isDebugEnabled())
                        log.debug("Exit: Sender::deleteRMSBeans");
        }
 
-       private void unblockTransportThreads(StorageManager manager)
-       throws SandeshaStorageException
-       {
-               if (log.isDebugEnabled()) log.debug("Enter: 
Sender::unblockTransportThreads");
+       private void unblockTransportThreads(StorageManager manager) throws 
SandeshaStorageException {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::unblockTransportThreads");
 
                Transaction transaction = null;
                try {
                        transaction = manager.getTransaction();
-                       
-                       // This finder will look for beans that have been 
locking the transport for longer than
-                       // the TRANSPORT_WAIT_TIME. The match method for 
SenderBeans does the time comparison
+
+                       // This finder will look for beans that have been 
locking the
+                       // transport for longer than
+                       // the TRANSPORT_WAIT_TIME. The match method for 
SenderBeans does
+                       // the time comparison
                        // for us.
                        SenderBean finder = new SenderBean();
                        finder.setSend(false);
                        finder.setTransportAvailable(true);
                        finder.setTimeToSend(System.currentTimeMillis() - 
Sandesha2Constants.TRANSPORT_WAIT_TIME);
-                       
+
                        List beans = manager.getSenderBeanMgr().find(finder);
                        Iterator beanIter = beans.iterator();
-                       while(beanIter.hasNext()) {
-                               // The beans we have found are assigned to an 
internal sequence id, but the create
-                               // sequence has not completed yet (and perhaps 
never will). Server-side, most of the
-                               // info that we can usefully print is 
associated with the inbound sequence that generated
+                       while (beanIter.hasNext()) {
+                               // The beans we have found are assigned to an 
internal sequence
+                               // id, but the create
+                               // sequence has not completed yet (and perhaps 
never will).
+                               // Server-side, most of the
+                               // info that we can usefully print is 
associated with the
+                               // inbound sequence that generated
                                // this message.
                                SenderBean bean = (SenderBean) beanIter.next();
-                               
-                               // Load the message, so that we can free the 
transport (if there is one there). The
-                               // case we are trying to free up is when there 
is a request-response transport, and
+
+                               // Load the message, so that we can free the 
transport (if there
+                               // is one there). The
+                               // case we are trying to free up is when there 
is a
+                               // request-response transport, and
                                // it's still there waiting.
                                MessageContext msgCtx = 
manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
 
@@ -422,111 +473,126 @@
                                        inMsg = 
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
                                if (inMsg != null)
                                        t = (RequestResponseTransport) 
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-       
-                               if((t != null && 
RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
-                                       if(log.isWarnEnabled()) {
+
+                               if ((t != null && 
RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
+                                       if (log.isWarnEnabled()) {
                                                String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
                                                log.warn(message);
                                        }
-                                       // If the message is a reply, then the 
request may need to be acked. Rather
-                                       // than just return a HTTP 202, we 
should try to send an ack.
+                                       // If the message is a reply, then the 
request may need to
+                                       // be acked. Rather
+                                       // than just return a HTTP 202, we 
should try to send an
+                                       // ack.
                                        boolean sendAck = false;
                                        RMDBean inbound = null;
                                        String inboundSeq = 
bean.getInboundSequenceId();
-                                       if(inboundSeq != null) 
+                                       if (inboundSeq != null)
                                                inbound = 
SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
-                                       
-                                       if(inbound != null) {
+
+                                       if (inbound != null) {
                                                EndpointReference acksToEPR = 
inbound.getAcksToEndpointReference();
-                                               if(acksToEPR!=null && 
acksToEPR.hasAnonymousAddress())
+                                               if (acksToEPR != null && 
acksToEPR.hasAnonymousAddress())
                                                        sendAck = true;
                                        }
-                                       
-                                       if(sendAck) {
+
+                                       if (sendAck) {
                                                RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
-                                               RMMsgContext ackRMMsgCtx = 
AcknowledgementManager.generateAckMessage(
-                                                               rmMsgCtx, 
inbound, inbound.getSequenceID(), storageManager, true);
+                                               RMMsgContext ackRMMsgCtx = 
AcknowledgementManager.generateAckMessage(rmMsgCtx, inbound, 
inbound.getSequenceID(), storageManager, true);
                                                
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
                                                
TransportUtils.setResponseWritten(msgCtx, true);
                                        } else {
                                                
TransportUtils.setResponseWritten(msgCtx, false);
                                        }
-       
-                                       // Mark the bean so that we know the 
transport is missing, and reset the send time
+
+                                       // Mark the bean so that we know the 
transport is missing,
+                                       // and reset the send time
                                        bean.setTransportAvailable(false);
                                        
bean.setTimeToSend(System.currentTimeMillis());
-                                       
+
                                        // Update the bean
                                        manager.getSenderBeanMgr().update(bean);
                                }
                        }
-       
-                       if(transaction != null && transaction.isActive()) 
transaction.commit();
+
+                       if (transaction != null && transaction.isActive())
+                               transaction.commit();
                        transaction = null;
-                       
-               } catch(Exception e) {
-                       // There isn't much we can do here, so log the 
exception and continue.
-                       if(log.isDebugEnabled()) log.debug("Exception", e);
+
+               } catch (Exception e) {
+                       // There isn't much we can do here, so log the 
exception and
+                       // continue.
+                       if (log.isDebugEnabled())
+                               log.debug("Exception", e);
                } finally {
-                       if(transaction != null && transaction.isActive()) 
transaction.rollback();
+                       if (transaction != null && transaction.isActive())
+                               transaction.rollback();
                }
-               
-               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::unblockTransportThreads");
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::unblockTransportThreads");
        }
-               
-       private void checkForOrphanMessages(StorageManager manager)
-       throws SandeshaStorageException
-       {
-               if(log.isDebugEnabled()) log.debug("Enter: 
Sender::checkForOrphanMessages");
-               
+
+       private void checkForOrphanMessages(StorageManager manager) throws 
SandeshaStorageException {
+               if (log.isDebugEnabled())
+                       log.debug("Enter: Sender::checkForOrphanMessages");
+
                Transaction tran = null;
                try {
                        tran = manager.getTransaction();
-       
-                       // This finder will look for beans that should have 
been sent, but could not be sent
-                       // because they need a MakeConnection message to come 
in to pick it up. We also factor
-                       // in TRANSPORT_WAIT_TIME to give the MakeConnection a 
chance to arrive.
+
+                       // This finder will look for beans that should have 
been sent, but
+                       // could not be sent
+                       // because they need a MakeConnection message to come 
in to pick it
+                       // up. We also factor
+                       // in TRANSPORT_WAIT_TIME to give the MakeConnection a 
chance to
+                       // arrive.
                        SenderBean finder = new SenderBean();
                        finder.setSend(true);
                        finder.setTransportAvailable(false);
                        finder.setTimeToSend(System.currentTimeMillis() - 
Sandesha2Constants.TRANSPORT_WAIT_TIME);
-                       
+
                        List beans = manager.getSenderBeanMgr().find(finder);
                        Iterator beanIter = beans.iterator();
-                       while(beanIter.hasNext()) {
+                       while (beanIter.hasNext()) {
                                SenderBean bean = (SenderBean) beanIter.next();
-                               
-                               // Emit a message to warn the user that 
MakeConnections are not arriving to pick
+
+                               // Emit a message to warn the user that 
MakeConnections are not
+                               // arriving to pick
                                // messages up
-                               if(log.isWarnEnabled()) {
+                               if (log.isWarnEnabled()) {
                                        String message = null;
                                        String internalSequenceID = 
bean.getInternalSequenceID();
                                        String sequenceID = 
bean.getSequenceID();
-                                       if (bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION)                                    
   
-                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, 
internalSequenceID);                              
-                                       else
-                                       {
+                                       if (bean.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION)
+                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, 
internalSequenceID);
+                                       else {
                                                String messageType = 
Integer.toString(bean.getMessageType());
                                                message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, 
messageType, sequenceID, internalSequenceID);
                                        }
                                        log.warn(message);
                                }
-                               
-                               // Update the bean so that we won't emit 
another message for another TRANSPORT_WAIT_TIME
+
+                               // Update the bean so that we won't emit 
another message for
+                               // another TRANSPORT_WAIT_TIME
                                bean.setTimeToSend(System.currentTimeMillis());
                                manager.getSenderBeanMgr().update(bean);
                        }
-       
-                       if(tran != null && tran.isActive()) tran.commit();
+
+                       if (tran != null && tran.isActive())
+                               tran.commit();
                        tran = null;
-       
-               } catch(Exception e) {
-                       // There isn't much we can do here, so log the 
exception and continue.
-                       if(log.isDebugEnabled()) log.debug("Exception", e);
+
+               } catch (Exception e) {
+                       // There isn't much we can do here, so log the 
exception and
+                       // continue.
+                       if (log.isDebugEnabled())
+                               log.debug("Exception", e);
                } finally {
-                       if(tran != null && tran.isActive()) tran.rollback();
+                       if (tran != null && tran.isActive())
+                               tran.rollback();
                }
-               
-               if(log.isDebugEnabled()) log.debug("Exit: 
Sender::checkForOrphanMessages");
+
+               if (log.isDebugEnabled())
+                       log.debug("Exit: Sender::checkForOrphanMessages");
        }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to