Author: chamikara
Date: Sun Aug 27 20:10:51 2006
New Revision: 437515

URL: http://svn.apache.org/viewvc?rev=437515&view=rev
Log:
Added the WorkerLock class which is used by Invoker and Sender to make sure that
two threads of the thread pools are not working on the Same item.

Corrected a bug in RM elements. RMElements class was not detecting the 
SequneceAck
part due to a recent change. But it is supposed to detect all parts in a RM 
message.
Since there could be multiple Sequence Ack parts in the same message the 
implementation of the 
RMElements and RMMessageContext was changed to support this kind of scenarios.



Added:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java 
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java 
Sun Aug 27 20:10:51 2006
@@ -17,6 +17,7 @@
 
 package org.apache.sandesha2;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 
@@ -32,6 +33,7 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.wsrm.IOMRMElement;
 import org.apache.sandesha2.wsrm.IOMRMPart;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 
 /**
  * This class is used to hold a MessageContext within Sandesha. This is used to
@@ -90,16 +92,25 @@
                SOAPEnvelope envelope = msgContext.getEnvelope();
                Iterator keys = rmMessageParts.keySet().iterator();
                while (keys.hasNext()) {
-                       Object key = keys.next();
-                       IOMRMPart rmPart = (IOMRMPart) rmMessageParts.get(key);
-                       rmPart.toSOAPEnvelope(envelope);
+                       Integer key = (Integer) keys.next();
+                       int partId = key.intValue();
+                       
+                       if (isMultiPart(partId)) {
+                               for (Iterator 
it=getMessageParts(partId);it.hasNext();) {
+                                       IOMRMPart rmPart = (IOMRMPart) 
it.next();
+                                       rmPart.toSOAPEnvelope(envelope);
+                               }
+                       } else {
+                               IOMRMPart rmPart = (IOMRMPart) 
rmMessageParts.get(key);
+                               rmPart.toSOAPEnvelope(envelope);
+                       }
                }
        }
 
        public int getMessageType() {
                return messageType;
        }
-
+       
        
        /**
         * The message type can be used to easily identify what this message is.
@@ -121,12 +132,51 @@
         * @param part
         */
        public void setMessagePart(int partId, IOMRMPart part) {
-               if (partId >= 0 && partId <= 
Sandesha2Constants.MessageParts.MAX_MSG_PART_ID)
-                       rmMessageParts.put(new Integer(partId), part);
+               if (partId >= 0 && partId <= 
Sandesha2Constants.MessageParts.MAX_MSG_PART_ID) {
+                       
+                       if 
(partId==Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT) {
+                               ArrayList sequenceAckList = (ArrayList) 
rmMessageParts.get(new Integer (partId));
+                               if (sequenceAckList==null) {
+                                       sequenceAckList = new ArrayList ();
+                                       rmMessageParts.put(new Integer 
(partId),sequenceAckList);
+                               }
+                       } else {
+                               rmMessageParts.put(new Integer(partId), part); 
+                       }
+               }
        }
+       
 
-       public IOMRMElement getMessagePart(int partId) {
+       public IOMRMElement getMessagePart(int partId) throws SandeshaException 
{
+               if (isMultiPart(partId)) {
+                       String message = "It is possible for a multiple 
MessageParts of this type to exit. Please call the 'getMessageParts' method";
+                       throw new SandeshaException (message);
+               }
+               
                return (IOMRMElement) rmMessageParts.get(new Integer(partId));
+       }
+       
+       public Iterator getMessageParts (int partId) {
+               Object obj = rmMessageParts.get(new Integer (partId));
+               if (obj==null)
+                       return new ArrayList().iterator();
+               
+               if (obj instanceof ArrayList) {
+                       return ((ArrayList) obj).iterator();
+               } else {
+                       ArrayList arr = new ArrayList ();
+                       arr.add(obj);
+                       return arr.iterator();
+               }
+       }
+       
+       //checks weather there can be multiple elements of these parts,
+       //if so getMessageParts method has to be called to get a ArrayList of 
parts..
+       public boolean isMultiPart (int messagePartId) {
+               if 
(messagePartId==Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT)
+                       return true;
+               
+               return false;
        }
 
        public EndpointReference getFrom() {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Sun Aug 27 20:10:51 2006
@@ -64,6 +64,9 @@
        public static final String cannotInnitMessage="cannotInnitMessage";
        public static final String propertyInvalidValue="propertyInvalidValue";
        public static final String 
couldNotCopyParameters="couldNotCopyParameters";
+       public static final String senderBeanNotFound="couldNotCopyParameters";
+       public static final String workAlreadyAssigned="couldNotCopyParameters";
+       
 
 
        public static final String 
rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 Sun Aug 27 20:10:51 2006
@@ -56,8 +56,11 @@
 axisOperationError=Sandesha2 Internal Error: could not create the 
''AxisOperation'' due to an error {0}
 axisOperationRegisterError=Sandesha2 Internal Error: could not register an 
''OutInAxisOperation'' due to exception {0}
 transportOutNotPresent=Sandesha2 Internal Error: original transport sender is 
not present
-storedKeyNotPresent=Sandesha2 Internal Error: stored key not present in the 
retransmittable message.
+storedKeyNotPresent=Sandesha2 Internal Error: stored key not present in the 
retransmittable message
 invalidQName=Invalid QName string: {0}
+senderBeanNotFound=SenderBean was not found
+workAlreadyAssigned=Work '{0}' is already assigned to a different Worker. Will 
try the next one
+
 
 dummyCallback=Sandesha2 Internal Error: dummy callback was called but this 
should never happen.
 dummyCallbackError=Sandesha2 Internal Error: dummy callback received an error 
but this should never happen.

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
 Sun Aug 27 20:10:51 2006
@@ -49,6 +49,7 @@
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Nack;
@@ -252,9 +253,11 @@
                        }
                }
 
-               // TODO - surely this is only appropriate for standalone ack 
messages?
-               // stopping the progress of the message further.
-               rmMsgCtx.pause();
+
+               String action = msgCtx.getOptions().getAction();
+               if (action!=null && 
action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion())))
 {
+                       rmMsgCtx.pause();
+               }
 
                if (log.isDebugEnabled())
                        log.debug("Exit: 
AcknowledgementProcessor::processAckHeader");

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
 Sun Aug 27 20:10:51 2006
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2.msgprocessors;
 
+import java.util.Iterator;
+
 import javax.xml.namespace.QName;
 
 import org.apache.axiom.om.OMElement;
@@ -272,12 +274,24 @@
 
                RMMsgContext ackRMMessage = 
AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequenceID,
                                storageManager);
-               SequenceAcknowledgement seqAck = (SequenceAcknowledgement) 
ackRMMessage
-                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-               
terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 seqAck);
-
+               
+               Iterator iter = 
ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+               
+               if (iter.hasNext()) {
+                       SequenceAcknowledgement seqAck = 
(SequenceAcknowledgement) iter.next();
+                       if (seqAck==null) {
+                               String message = "No SequenceAcknowledgement 
part is present";
+                               throw new SandeshaException (message);
+                       }
+               
+                       
terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 seqAck);
+               } else {
+                       //TODO 
+               }
+               
                terminateSeqResponseRMMsg.addSOAPEnvelope();
 
+               
                terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
                
terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
 "true");
 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
 Sun Aug 27 20:10:51 2006
@@ -17,6 +17,9 @@
 
 package org.apache.sandesha2.util;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -103,10 +106,11 @@
                        rmNamespace = 
elements.getSequence().getNamespaceValue();
                }
 
-               if (elements.getSequenceAcknowledgement() != null) {
-                       
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 elements
-                                       .getSequenceAcknowledgement());
-                       rmNamespace = 
elements.getSequenceAcknowledgement().getNamespaceValue();
+               
+               for (Iterator iter = 
elements.getSequenceAcknowledgements();iter.hasNext();) {
+                       SequenceAcknowledgement sequenceAck = 
(SequenceAcknowledgement) iter.next();
+                       
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
 sequenceAck);
+                       rmNamespace = sequenceAck.getNamespaceValue();
                }
 
                if (elements.getTerminateSequence() != null) {
@@ -168,8 +172,8 @@
                                
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
                TerminateSequenceResponse terminateSequenceResponse = 
(TerminateSequenceResponse) rmMsgCtx
                                
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
-               SequenceAcknowledgement sequenceAcknowledgement = 
(SequenceAcknowledgement) rmMsgCtx
-                               
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+               Iterator sequenceAcknowledgementsIter = rmMsgCtx
+                               
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
                Sequence sequence = (Sequence) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                AckRequested ackRequest = (AckRequested) 
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
                CloseSequence closeSequence = (CloseSequence) rmMsgCtx
@@ -192,8 +196,11 @@
                } else if 
(rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE) != null) {
                        
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
                        sequenceID = sequence.getIdentifier().getIdentifier();
-               } else if (sequenceAcknowledgement != null) {
+               } else if (sequenceAcknowledgementsIter.hasNext()) {
                        
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+                       SequenceAcknowledgement sequenceAcknowledgement = 
(SequenceAcknowledgement) sequenceAcknowledgementsIter.next();
+                       
+                       //picking the sequenceId of the first sequence ack.
                        sequenceID = 
sequenceAcknowledgement.getIdentifier().getIdentifier();
                } else if (ackRequest != null) {
                        
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
@@ -218,6 +225,10 @@
                        }
                }
 
+               //In case of ack messages RM Namespace is decided based on the 
sequenceId of the first 
+               //sequence Ack. In other words Sandesha2 does not expect to 
receive two SequenceAcknowledgements
+               //of different RM specifications in the same incoming message.
+               
                String rmNamespace = rmMsgCtx.getRMNamespaceValue();
                if (sequenceID != null) {
                        String specVersion = 
SandeshaUtil.getRMVersion(propertyKey, storageManager);

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java 
Sun Aug 27 20:10:51 2006
@@ -882,7 +882,7 @@
                return propertyBean;
        }
 
-       public static String getSequenceIDFromRMMessage(RMMsgContext 
rmMessageContext) {
+       public static String getSequenceIDFromRMMessage(RMMsgContext 
rmMessageContext) throws SandeshaException {
                int messageType = rmMessageContext.getMessageType();
 
                String sequenceID = null;
@@ -890,8 +890,14 @@
                        Sequence sequence = (Sequence) 
rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
                        sequenceID = sequence.getIdentifier().getIdentifier();
                } else if (messageType == Sandesha2Constants.MessageTypes.ACK) {
-                       SequenceAcknowledgement sequenceAcknowledgement = 
(SequenceAcknowledgement) rmMessageContext
-                                       
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+                       Iterator sequenceAckIter = rmMessageContext
+                                       
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+                       
+                       //In case of ack messages sequenceId is decided based 
on the sequenceId of the first 
+                       //sequence Ack. In other words Sandesha2 does not 
expect to receive two SequenceAcknowledgements
+                       //of different RM specifications in the same incoming 
message.
+                       
+                       SequenceAcknowledgement sequenceAcknowledgement = 
(SequenceAcknowledgement) sequenceAckIter.next();
                        sequenceID = 
sequenceAcknowledgement.getIdentifier().getIdentifier();
                } else if (messageType == 
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
                        AckRequested ackRequested = (AckRequested) 
rmMessageContext

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
 Sun Aug 27 20:10:51 2006
@@ -123,9 +123,7 @@
        
        public static String getAckRequestAction (String specVersion) throws 
SandeshaException {
                if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion)) 
-                       throw new SandeshaException 
(SandeshaMessageHelper.getMessage(
-                                       
SandeshaMessageKeys.emptyAckRequestSpecLevel,
-                                       specVersion));
+                       return null;  //No action defined for ackRequests
                else if 
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) 
                        return 
Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
                else 

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Sun Aug 27 20:10:51 2006
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Random;
 
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
@@ -55,22 +56,20 @@
 public class Invoker extends Thread {
 
        private boolean runInvoker = false;
-
        private ArrayList workingSequences = new ArrayList();
-
        private ConfigurationContext context = null;
-
        private static final Log log = LogFactory.getLog(Invoker.class);
-
        private boolean hasStopped = false;
-
+       
        private transient ThreadFactory threadPool;
-
        public int INVOKER_THREADPOOL_SIZE = 5;
 
+       private WorkerLock lock = null;
+       
        public Invoker() {
                threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
                                INVOKER_THREADPOOL_SIZE);
+               lock = new WorkerLock ();
        }
 
        public synchronized void stopInvokerForTheSequence(String sequenceID) {
@@ -207,15 +206,22 @@
                                //TODO pick the sequence randomly.
                                ArrayList allSequencesList = SandeshaUtil
                                                
.getArrayListFromString(allSequencesBean.getValue());
-                               Iterator allSequencesItr = 
allSequencesList.iterator();
-                               String sequenceId = (String) 
allSequencesItr.next();
+
+
+                               int size = allSequencesList.size();
+                               Random r = new Random ();
+                               int index = r.nextInt(size);
+                               
+                               String sequenceId = (String) 
allSequencesList.get(index);
+                               
 
                                NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
                                if (nextMsgBean == null) {
                                        String message = "Next message not set 
correctly. Removing invalid entry.";
                                        log.debug(message);
-                                       allSequencesItr.remove();
-
+       
+                                       allSequencesList.remove(size);
+                                       
                                        // cleaning the invalid data of the all 
sequences.
                                        
allSequencesBean.setValue(allSequencesList.toString());
                                        
sequencePropMgr.update(allSequencesBean);
@@ -236,17 +242,34 @@
                                                new InvokerBean(null, 
nextMsgno, sequenceId))
                                                .iterator();
 
-                               if (stMapIt.hasNext()) {
-
-                                       InvokerBean invokerBean = (InvokerBean) 
stMapIt.next();
+                               if (stMapIt.hasNext()) { //the next Msg entry 
is present.
 
+                                       String workId = sequenceId + "::" + 
nextMsgno; //creating a workId to uniquely identify the
+                                                                               
                                                   //piece of work that will be 
assigned to the Worker.
+                                                                               
+                                       //check weather the bean is already 
assigned to a worker.
+                                       if (lock.isWorkPresent(workId)) {
+                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
+                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, 
workId);
+                                               log.debug(message);
+                                               continue;
+                                       }
+                                       
+                                       lock.addWork(workId);
+                                       
+                                       InvokerBean bean = (InvokerBean) 
stMapIt.next();
+                                       String messageContextKey = 
bean.getMessageContextRefKey();
+                                       
                                        transaction.commit();
+                                       
 
                                        // start a new worker thread and let it 
do the invocation.
-                                       InvokerWorker worker = new 
InvokerWorker(context,
-                                                       invokerBean);
+                                       InvokerWorker worker = new 
InvokerWorker(context,messageContextKey);
+                                       
+                                       worker.setLock(lock);
+                                       worker.setWorkId(workId);
+                                       
                                        threadPool.execute(worker);
-
                                }
 
                        } catch (Exception e) {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
 Sun Aug 27 20:10:51 2006
@@ -21,15 +21,16 @@
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.Sequence;
 
-public class InvokerWorker implements Runnable {
+public class InvokerWorker extends SandeshaWorker implements Runnable {
 
        ConfigurationContext configurationContext = null;
-       InvokerBean invokerBean = null;
+       String messageContextKey;
+       
        Log log = LogFactory.getLog(InvokerWorker.class);
        
-       public InvokerWorker (ConfigurationContext configurationContext, 
InvokerBean invokerBean) {
+       public InvokerWorker (ConfigurationContext configurationContext, String 
messageContextKey) {
                this.configurationContext = configurationContext;
-               this.invokerBean = invokerBean;
+               this.messageContextKey = messageContextKey;
        }
        
        public void run() {
@@ -46,9 +47,12 @@
                        //starting a transaction
                        transaction = storageManager.getTransaction();
                        
-                       String key = invokerBean.getMessageContextRefKey();
+                       InvokerBean invokerBean = 
invokerBeanMgr.retrieve(messageContextKey);
                        
-                       msgToInvoke = 
storageManager.retrieveMessageContext(key, configurationContext);
+                       String sequenceId = invokerBean.getSequenceID();
+                       long messageNo = invokerBean.getMsgNo();
+                       
+                       msgToInvoke = 
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
                        RMMsgContext rmMsg = 
MsgInitializer.initializeMessage(msgToInvoke);
 
                        //endint the transaction before invocation.
@@ -56,8 +60,6 @@
                                
                        boolean invoked = false;
                        
-                       long messageNo = invokerBean.getMsgNo();
-                       
                        try {
 
                                // Invocation is not done within a transation. 
This
@@ -82,12 +84,12 @@
                                if (postFailureInvocation) {
                                        
makeMessageReadyForReinjection(msgToInvoke);
                                        if (log.isDebugEnabled())
-                                               log.debug("Receiving message, 
key=" + key + ", msgCtx="
+                                               log.debug("Receiving message, 
key=" + messageContextKey + ", msgCtx="
                                                                + 
msgToInvoke.getEnvelope().getHeader());
                                        engine.receive(msgToInvoke);
                                } else {
                                        if (log.isDebugEnabled())
-                                               log.debug("Resuming message, 
key=" + key + ", msgCtx="
+                                               log.debug("Resuming message, 
key=" + messageContextKey + ", msgCtx="
                                                                + 
msgToInvoke.getEnvelope().getHeader());
                                        msgToInvoke.setPaused(false);
                                        engine.resumeReceive(msgToInvoke);
@@ -110,17 +112,17 @@
                        // Service will be invoked only once. I.e. even if an
                        // exception get thrown in invocation
                        // the service will not be invoked again.
-                       invokerBeanMgr.delete(key);
+                       invokerBeanMgr.delete(messageContextKey);
 
                        // removing the corresponding message context as well.
-                       MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, configurationContext);
+                       MessageContext msgCtx = 
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
                        if (msgCtx != null) {
-                               storageManager.removeMessageContext(key);
+                               
storageManager.removeMessageContext(messageContextKey);
                        }
 
                        // updating the next msg to invoke
 
-                       String sequenceId = invokerBean.getSequenceID();
+                       String s = invokerBean.getSequenceID();
                        NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
 
                        
@@ -159,6 +161,10 @@
                } finally {
                        if (transaction!=null && transaction.isActive())
                                transaction.commit();
+                       
+                       if (workId !=null && lock!=null) {
+                               lock.removeWork(workId);
+                       }
                }
        }
 

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java?rev=437515&view=auto
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
 Sun Aug 27 20:10:51 2006
@@ -0,0 +1,23 @@
+package org.apache.sandesha2.workers;
+
+public class SandeshaWorker {
+       WorkerLock lock = null;
+       String workId = null;
+       
+       public WorkerLock getLock() {
+               return lock;
+       }
+       public void setLock(WorkerLock lock) {
+               this.lock = lock;
+       }
+       public String getWorkId() {
+               return workId;
+       }
+       public void setWorkId(String workId) {
+               this.workId = workId;
+       }
+       
+
+       
+       
+}

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Sun Aug 27 20:10:51 2006
@@ -51,8 +51,11 @@
     private transient ThreadFactory threadPool;
     public int SENDER_THREADPOOL_SIZE = 5;
     
+    private WorkerLock lock = null;
+    
     public Sender () {
        threadPool = new ThreadPool 
(SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+       lock = new WorkerLock ();
     }
 
        public synchronized void stopSenderForTheSequence(String sequenceID) {
@@ -159,20 +162,42 @@
                                        throw new SandeshaException(message);
                                }
 
+                               //TODO make sure this locks on reads.
                                transaction = storageManager.getTransaction();
 
                                SenderBeanMgr mgr = 
storageManager.getRetransmitterBeanMgr();
                                SenderBean senderBean = mgr.getNextMsgToSend();
                                if (senderBean == null) {
-                                       if (log.isDebugEnabled())
-                                               log.debug("SenderBean not 
found");
+                                       if (log.isDebugEnabled()) {
+                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+                                               message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+                                               log.debug(message);
+                                       }
                                        continue;
                                }
                                
+                               String messageId = senderBean.getMessageID();
+                               
+                               //work Id is used to define the piece of work 
that will be assigned to the Worker thread,
+                               //to handle this Sender bean.
+                               String workId = messageId;
+                               
+                               //check weather the bean is already assigned to 
a worker.
+                               if (lock.isWorkPresent(workId)) {
+                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
+                                       message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, 
workId);
+                                       log.debug(message);
+                                       continue;
+                               }
+       
+                               lock.addWork(workId);
+                               
                                transaction.commit();
                                
-                               //start a worker which will work on this 
message.s
-                               SenderWorker worker = new SenderWorker 
(context,senderBean);
+                               //start a worker which will work on this 
messages.
+                               SenderWorker worker = new SenderWorker 
(context,messageId);
+                               worker.setLock (lock);
+                               worker.setWorkId(messageId);
                                threadPool.execute(worker);
 
                        } catch (Exception e) {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
 Sun Aug 27 20:10:51 2006
@@ -31,18 +31,17 @@
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
-public class SenderWorker implements Runnable {
+public class SenderWorker extends SandeshaWorker implements Runnable {
 
        ConfigurationContext configurationContext = null;
-       SenderBean senderBean = null;
+       String messageId = null;
        Log log = LogFactory.getLog(SenderWorker.class);
        
-       public SenderWorker (ConfigurationContext configurationContext, 
SenderBean senderBean) {
+       public SenderWorker (ConfigurationContext configurationContext, String 
messageId) {
                this.configurationContext = configurationContext;
-               this.senderBean = senderBean;
+               this.messageId = messageId;
        }
        
        public void run () {
@@ -55,6 +54,7 @@
                        
                        transaction = storageManager.getTransaction();
 
+                       SenderBean senderBean = 
senderBeanMgr.retrieve(messageId);
                        String key = senderBean.getMessageContextRefKey();
                        MessageContext msgCtx = 
storageManager.retrieveMessageContext(key, configurationContext);
                        
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, 
Sandesha2Constants.VALUE_TRUE);
@@ -197,6 +197,10 @@
                } finally {
                        if (transaction!=null && transaction.isActive())
                                transaction.commit();
+                       
+                       if (lock!=null && workId!=null) {
+                               lock.removeWork(workId);
+                       }
                }
        }
        

Added: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java?rev=437515&view=auto
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
 (added)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
 Sun Aug 27 20:10:51 2006
@@ -0,0 +1,25 @@
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+public class WorkerLock {
+
+       public ArrayList workList = null;
+       
+       public WorkerLock () {
+               workList = new ArrayList ();
+       }
+       
+       public synchronized void addWork (String work) {
+               workList.add(work);
+       }
+       
+       public synchronized void removeWork (String work) {
+               workList.remove(work);
+       }
+       
+       public synchronized boolean isWorkPresent (String work) {
+               return workList.contains(work);
+       }
+
+}

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java 
Sun Aug 27 20:10:51 2006
@@ -18,7 +18,9 @@
 package org.apache.sandesha2.wsrm;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
+import javax.swing.text.StyledEditorKit.ItalicAction;
 import javax.xml.namespace.QName;
 
 import org.apache.axiom.om.OMElement;
@@ -43,7 +45,10 @@
 public class RMElements {
 
        private Sequence sequence = null;
-       private SequenceAcknowledgement sequenceAcknowledgement = null;
+       
+       //there can be more than one sequence acks in a single message.
+       private ArrayList sequenceAcknowledgements = null;
+       
        private CreateSequence createSequence = null;
        private CreateSequenceResponse createSequenceResponse = null;
        private TerminateSequence terminateSequence = null;
@@ -56,10 +61,11 @@
        private String addressingNamespaceValue = null;
        
        public RMElements () {
-               
+               sequenceAcknowledgements = new ArrayList ();
        }
        
        public RMElements (String addressingNamespace) {
+               this ();
                this.addressingNamespaceValue = addressingNamespace;
        }
        
@@ -168,14 +174,26 @@
                        closeSequenceResponse = new CloseSequenceResponse  
(factory,rmNamespaceValue);
                        closeSequenceResponse.fromOMElement(envelope.getBody());
                }
+               
+               Iterator sequenceAcknowledgementIter = envelope.getHeader()
+                               .getChildrenWithName (new 
QName(rmNamespaceValue,
+                                               
Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK));
+               while (sequenceAcknowledgementIter.hasNext()) {
+                       OMElement sequenceAckElement = (OMElement) 
sequenceAcknowledgementIter.next();
+                       SequenceAcknowledgement sequenceAcknowledgement = new 
SequenceAcknowledgement  (factory,rmNamespaceValue);
+                       
sequenceAcknowledgement.fromOMElement(sequenceAckElement);
+                       
+                       sequenceAcknowledgements.add(sequenceAcknowledgement);
+               }
        }
 
        public SOAPEnvelope toSOAPEnvelope(SOAPEnvelope envelope) throws 
SandeshaException  {
                if (sequence != null) {
                        sequence.toOMElement(envelope.getHeader());
                }
-               if (sequenceAcknowledgement != null) {
-                       
sequenceAcknowledgement.toOMElement(envelope.getHeader());
+               for (Iterator 
iter=sequenceAcknowledgements.iterator();iter.hasNext();) {
+                       SequenceAcknowledgement sequenceAck = 
(SequenceAcknowledgement) iter.next();
+                       sequenceAck.toOMElement(envelope.getHeader());
                }
                if (createSequence != null) {
                        createSequence.toOMElement(envelope.getBody());
@@ -217,8 +235,8 @@
                return sequence;
        }
 
-       public SequenceAcknowledgement getSequenceAcknowledgement() {
-               return sequenceAcknowledgement;
+       public Iterator getSequenceAcknowledgements() {
+               return sequenceAcknowledgements.iterator();
        }
 
        public TerminateSequence getTerminateSequence() {
@@ -242,9 +260,13 @@
                this.sequence = sequence;
        }
 
-       public void setSequenceAcknowledgement(
-                       SequenceAcknowledgement sequenceAcknowledgement) {
-               this.sequenceAcknowledgement = sequenceAcknowledgement;
+       public void setSequenceAcknowledgements(
+                       ArrayList sequenceAcknowledgements) {
+               this.sequenceAcknowledgements = sequenceAcknowledgements;
+       }
+       
+       public void addSequenceAcknowledgement (SequenceAcknowledgement 
sequenceAcknowledgement) {
+               sequenceAcknowledgements.add(sequenceAcknowledgement);
        }
 
        public void setTerminateSequence(TerminateSequence terminateSequence) {

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
 Sun Aug 27 20:10:51 2006
@@ -69,23 +69,16 @@
                return namespaceValue;
        }
 
-       public Object fromOMElement(OMElement element) throws 
OMException,SandeshaException {
+       public Object fromOMElement(OMElement sequenceAckElement) throws 
OMException,SandeshaException {
 
-               OMElement sequenceAckPart = element;
-               if (sequenceAckPart == null)
-                       throw new OMException(SandeshaMessageHelper.getMessage(
-                                       SandeshaMessageKeys.seqAckPartIsNull));
-
-               ackElement = sequenceAckPart;
-               
-               OMFactory factory = element.getOMFactory();
+               OMFactory factory = sequenceAckElement.getOMFactory();
                if (factory==null)
                        factory = defaultFactory;
                
                identifier = new Identifier(defaultFactory,namespaceValue);
-               identifier.fromOMElement(sequenceAckPart);
+               identifier.fromOMElement(sequenceAckElement);
 
-               Iterator ackRangeParts = 
sequenceAckPart.getChildrenWithName(new QName(
+               Iterator ackRangeParts = 
sequenceAckElement.getChildrenWithName(new QName(
                                namespaceValue, 
Sandesha2Constants.WSRM_COMMON.ACK_RANGE));
 
                while (ackRangeParts.hasNext()) {
@@ -96,7 +89,7 @@
                        acknowledgementRangeList.add(ackRange);
                }
 
-               Iterator nackParts = sequenceAckPart.getChildrenWithName(new 
QName(
+               Iterator nackParts = sequenceAckElement.getChildrenWithName(new 
QName(
                                namespaceValue, 
Sandesha2Constants.WSRM_COMMON.NACK));
 
                while (nackParts.hasNext()) {
@@ -109,18 +102,18 @@
                String rmSpecVersion = 
SpecSpecificConstants.getSpecVersionString (namespaceValue);
                
                if (SpecSpecificConstants.isAckFinalAllowed(rmSpecVersion)) {
-                       OMElement ackFinalPart = 
sequenceAckPart.getFirstChildWithName(new QName 
(namespaceValue,Sandesha2Constants.WSRM_COMMON.FINAL));
+                       OMElement ackFinalPart = 
sequenceAckElement.getFirstChildWithName(new QName 
(namespaceValue,Sandesha2Constants.WSRM_COMMON.FINAL));
                        if (ackFinalPart!=null) {
                                ackFinal = new AckFinal 
(defaultFactory,namespaceValue);
-                               ackFinal.fromOMElement(sequenceAckPart);
+                               ackFinal.fromOMElement(sequenceAckElement);
                        }
                }
                
                if (SpecSpecificConstants.isAckNoneAllowed(rmSpecVersion)) {
-                       OMElement ackNonePart = 
sequenceAckPart.getFirstChildWithName(new QName 
(namespaceValue,Sandesha2Constants.WSRM_COMMON.NONE));
+                       OMElement ackNonePart = 
sequenceAckElement.getFirstChildWithName(new QName 
(namespaceValue,Sandesha2Constants.WSRM_COMMON.NONE));
                        if (ackNonePart!=null) {
                                ackNone = new AckNone 
(defaultFactory,namespaceValue);
-                               ackNone.fromOMElement(sequenceAckPart);
+                               ackNone.fromOMElement(sequenceAckElement);
                        }
                }
                



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to