Author: chamikara
Date: Sun Aug 27 20:10:51 2006
New Revision: 437515
URL: http://svn.apache.org/viewvc?rev=437515&view=rev
Log:
Added the WorkerLock class which is used by Invoker and Sender to make sure that
two threads of the thread pools are not working on the Same item.
Corrected a bug in RM elements. RMElements class was not detecting the
SequneceAck
part due to a recent change. But it is supposed to detect all parts in a RM
message.
Since there could be multiple Sequence Ack parts in the same message the
implementation of the
RMElements and RMMessageContext was changed to support this kind of scenarios.
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
(original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/RMMsgContext.java
Sun Aug 27 20:10:51 2006
@@ -17,6 +17,7 @@
package org.apache.sandesha2;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -32,6 +33,7 @@
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.wsrm.IOMRMElement;
import org.apache.sandesha2.wsrm.IOMRMPart;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
/**
* This class is used to hold a MessageContext within Sandesha. This is used to
@@ -90,16 +92,25 @@
SOAPEnvelope envelope = msgContext.getEnvelope();
Iterator keys = rmMessageParts.keySet().iterator();
while (keys.hasNext()) {
- Object key = keys.next();
- IOMRMPart rmPart = (IOMRMPart) rmMessageParts.get(key);
- rmPart.toSOAPEnvelope(envelope);
+ Integer key = (Integer) keys.next();
+ int partId = key.intValue();
+
+ if (isMultiPart(partId)) {
+ for (Iterator
it=getMessageParts(partId);it.hasNext();) {
+ IOMRMPart rmPart = (IOMRMPart)
it.next();
+ rmPart.toSOAPEnvelope(envelope);
+ }
+ } else {
+ IOMRMPart rmPart = (IOMRMPart)
rmMessageParts.get(key);
+ rmPart.toSOAPEnvelope(envelope);
+ }
}
}
public int getMessageType() {
return messageType;
}
-
+
/**
* The message type can be used to easily identify what this message is.
@@ -121,12 +132,51 @@
* @param part
*/
public void setMessagePart(int partId, IOMRMPart part) {
- if (partId >= 0 && partId <=
Sandesha2Constants.MessageParts.MAX_MSG_PART_ID)
- rmMessageParts.put(new Integer(partId), part);
+ if (partId >= 0 && partId <=
Sandesha2Constants.MessageParts.MAX_MSG_PART_ID) {
+
+ if
(partId==Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT) {
+ ArrayList sequenceAckList = (ArrayList)
rmMessageParts.get(new Integer (partId));
+ if (sequenceAckList==null) {
+ sequenceAckList = new ArrayList ();
+ rmMessageParts.put(new Integer
(partId),sequenceAckList);
+ }
+ } else {
+ rmMessageParts.put(new Integer(partId), part);
+ }
+ }
}
+
- public IOMRMElement getMessagePart(int partId) {
+ public IOMRMElement getMessagePart(int partId) throws SandeshaException
{
+ if (isMultiPart(partId)) {
+ String message = "It is possible for a multiple
MessageParts of this type to exit. Please call the 'getMessageParts' method";
+ throw new SandeshaException (message);
+ }
+
return (IOMRMElement) rmMessageParts.get(new Integer(partId));
+ }
+
+ public Iterator getMessageParts (int partId) {
+ Object obj = rmMessageParts.get(new Integer (partId));
+ if (obj==null)
+ return new ArrayList().iterator();
+
+ if (obj instanceof ArrayList) {
+ return ((ArrayList) obj).iterator();
+ } else {
+ ArrayList arr = new ArrayList ();
+ arr.add(obj);
+ return arr.iterator();
+ }
+ }
+
+ //checks weather there can be multiple elements of these parts,
+ //if so getMessageParts method has to be called to get a ArrayList of
parts..
+ public boolean isMultiPart (int messagePartId) {
+ if
(messagePartId==Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT)
+ return true;
+
+ return false;
}
public EndpointReference getFrom() {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Sun Aug 27 20:10:51 2006
@@ -64,6 +64,9 @@
public static final String cannotInnitMessage="cannotInnitMessage";
public static final String propertyInvalidValue="propertyInvalidValue";
public static final String
couldNotCopyParameters="couldNotCopyParameters";
+ public static final String senderBeanNotFound="couldNotCopyParameters";
+ public static final String workAlreadyAssigned="couldNotCopyParameters";
+
public static final String
rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
Sun Aug 27 20:10:51 2006
@@ -56,8 +56,11 @@
axisOperationError=Sandesha2 Internal Error: could not create the
''AxisOperation'' due to an error {0}
axisOperationRegisterError=Sandesha2 Internal Error: could not register an
''OutInAxisOperation'' due to exception {0}
transportOutNotPresent=Sandesha2 Internal Error: original transport sender is
not present
-storedKeyNotPresent=Sandesha2 Internal Error: stored key not present in the
retransmittable message.
+storedKeyNotPresent=Sandesha2 Internal Error: stored key not present in the
retransmittable message
invalidQName=Invalid QName string: {0}
+senderBeanNotFound=SenderBean was not found
+workAlreadyAssigned=Work '{0}' is already assigned to a different Worker. Will
try the next one
+
dummyCallback=Sandesha2 Internal Error: dummy callback was called but this
should never happen.
dummyCallbackError=Sandesha2 Internal Error: dummy callback received an error
but this should never happen.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
Sun Aug 27 20:10:51 2006
@@ -49,6 +49,7 @@
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Nack;
@@ -252,9 +253,11 @@
}
}
- // TODO - surely this is only appropriate for standalone ack
messages?
- // stopping the progress of the message further.
- rmMsgCtx.pause();
+
+ String action = msgCtx.getOptions().getAction();
+ if (action!=null &&
action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion())))
{
+ rmMsgCtx.pause();
+ }
if (log.isDebugEnabled())
log.debug("Exit:
AcknowledgementProcessor::processAckHeader");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Sun Aug 27 20:10:51 2006
@@ -17,6 +17,8 @@
package org.apache.sandesha2.msgprocessors;
+import java.util.Iterator;
+
import javax.xml.namespace.QName;
import org.apache.axiom.om.OMElement;
@@ -272,12 +274,24 @@
RMMsgContext ackRMMessage =
AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, sequenceID,
storageManager);
- SequenceAcknowledgement seqAck = (SequenceAcknowledgement)
ackRMMessage
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-
terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
seqAck);
-
+
+ Iterator iter =
ackRMMessage.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ if (iter.hasNext()) {
+ SequenceAcknowledgement seqAck =
(SequenceAcknowledgement) iter.next();
+ if (seqAck==null) {
+ String message = "No SequenceAcknowledgement
part is present";
+ throw new SandeshaException (message);
+ }
+
+
terminateSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
seqAck);
+ } else {
+ //TODO
+ }
+
terminateSeqResponseRMMsg.addSOAPEnvelope();
+
terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
terminateSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/MsgInitializer.java
Sun Aug 27 20:10:51 2006
@@ -17,6 +17,9 @@
package org.apache.sandesha2.util;
+import java.util.ArrayList;
+import java.util.Iterator;
+
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -103,10 +106,11 @@
rmNamespace =
elements.getSequence().getNamespaceValue();
}
- if (elements.getSequenceAcknowledgement() != null) {
-
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
elements
- .getSequenceAcknowledgement());
- rmNamespace =
elements.getSequenceAcknowledgement().getNamespaceValue();
+
+ for (Iterator iter =
elements.getSequenceAcknowledgements();iter.hasNext();) {
+ SequenceAcknowledgement sequenceAck =
(SequenceAcknowledgement) iter.next();
+
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
sequenceAck);
+ rmNamespace = sequenceAck.getNamespaceValue();
}
if (elements.getTerminateSequence() != null) {
@@ -168,8 +172,8 @@
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
TerminateSequenceResponse terminateSequenceResponse =
(TerminateSequenceResponse) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ_RESPONSE);
- SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) rmMsgCtx
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ Iterator sequenceAcknowledgementsIter = rmMsgCtx
+
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
AckRequested ackRequest = (AckRequested)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
CloseSequence closeSequence = (CloseSequence) rmMsgCtx
@@ -192,8 +196,11 @@
} else if
(rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE) != null) {
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
sequenceID = sequence.getIdentifier().getIdentifier();
- } else if (sequenceAcknowledgement != null) {
+ } else if (sequenceAcknowledgementsIter.hasNext()) {
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) sequenceAcknowledgementsIter.next();
+
+ //picking the sequenceId of the first sequence ack.
sequenceID =
sequenceAcknowledgement.getIdentifier().getIdentifier();
} else if (ackRequest != null) {
rmMsgCtx.setMessageType(Sandesha2Constants.MessageTypes.ACK_REQUEST);
@@ -218,6 +225,10 @@
}
}
+ //In case of ack messages RM Namespace is decided based on the
sequenceId of the first
+ //sequence Ack. In other words Sandesha2 does not expect to
receive two SequenceAcknowledgements
+ //of different RM specifications in the same incoming message.
+
String rmNamespace = rmMsgCtx.getRMNamespaceValue();
if (sequenceID != null) {
String specVersion =
SandeshaUtil.getRMVersion(propertyKey, storageManager);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Sun Aug 27 20:10:51 2006
@@ -882,7 +882,7 @@
return propertyBean;
}
- public static String getSequenceIDFromRMMessage(RMMsgContext
rmMessageContext) {
+ public static String getSequenceIDFromRMMessage(RMMsgContext
rmMessageContext) throws SandeshaException {
int messageType = rmMessageContext.getMessageType();
String sequenceID = null;
@@ -890,8 +890,14 @@
Sequence sequence = (Sequence)
rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
sequenceID = sequence.getIdentifier().getIdentifier();
} else if (messageType == Sandesha2Constants.MessageTypes.ACK) {
- SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) rmMessageContext
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ Iterator sequenceAckIter = rmMessageContext
+
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ //In case of ack messages sequenceId is decided based
on the sequenceId of the first
+ //sequence Ack. In other words Sandesha2 does not
expect to receive two SequenceAcknowledgements
+ //of different RM specifications in the same incoming
message.
+
+ SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) sequenceAckIter.next();
sequenceID =
sequenceAcknowledgement.getIdentifier().getIdentifier();
} else if (messageType ==
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
AckRequested ackRequested = (AckRequested)
rmMessageContext
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
Sun Aug 27 20:10:51 2006
@@ -123,9 +123,7 @@
public static String getAckRequestAction (String specVersion) throws
SandeshaException {
if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion))
- throw new SandeshaException
(SandeshaMessageHelper.getMessage(
-
SandeshaMessageKeys.emptyAckRequestSpecLevel,
- specVersion));
+ return null; //No action defined for ackRequests
else if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion))
return
Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
else
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
Sun Aug 27 20:10:51 2006
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
@@ -55,22 +56,20 @@
public class Invoker extends Thread {
private boolean runInvoker = false;
-
private ArrayList workingSequences = new ArrayList();
-
private ConfigurationContext context = null;
-
private static final Log log = LogFactory.getLog(Invoker.class);
-
private boolean hasStopped = false;
-
+
private transient ThreadFactory threadPool;
-
public int INVOKER_THREADPOOL_SIZE = 5;
+ private WorkerLock lock = null;
+
public Invoker() {
threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
INVOKER_THREADPOOL_SIZE);
+ lock = new WorkerLock ();
}
public synchronized void stopInvokerForTheSequence(String sequenceID) {
@@ -207,15 +206,22 @@
//TODO pick the sequence randomly.
ArrayList allSequencesList = SandeshaUtil
.getArrayListFromString(allSequencesBean.getValue());
- Iterator allSequencesItr =
allSequencesList.iterator();
- String sequenceId = (String)
allSequencesItr.next();
+
+
+ int size = allSequencesList.size();
+ Random r = new Random ();
+ int index = r.nextInt(size);
+
+ String sequenceId = (String)
allSequencesList.get(index);
+
NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
if (nextMsgBean == null) {
String message = "Next message not set
correctly. Removing invalid entry.";
log.debug(message);
- allSequencesItr.remove();
-
+
+ allSequencesList.remove(size);
+
// cleaning the invalid data of the all
sequences.
allSequencesBean.setValue(allSequencesList.toString());
sequencePropMgr.update(allSequencesBean);
@@ -236,17 +242,34 @@
new InvokerBean(null,
nextMsgno, sequenceId))
.iterator();
- if (stMapIt.hasNext()) {
-
- InvokerBean invokerBean = (InvokerBean)
stMapIt.next();
+ if (stMapIt.hasNext()) { //the next Msg entry
is present.
+ String workId = sequenceId + "::" +
nextMsgno; //creating a workId to uniquely identify the
+
//piece of work that will be
assigned to the Worker.
+
+ //check weather the bean is already
assigned to a worker.
+ if (lock.isWorkPresent(workId)) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
+ message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
+ log.debug(message);
+ continue;
+ }
+
+ lock.addWork(workId);
+
+ InvokerBean bean = (InvokerBean)
stMapIt.next();
+ String messageContextKey =
bean.getMessageContextRefKey();
+
transaction.commit();
+
// start a new worker thread and let it
do the invocation.
- InvokerWorker worker = new
InvokerWorker(context,
- invokerBean);
+ InvokerWorker worker = new
InvokerWorker(context,messageContextKey);
+
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+
threadPool.execute(worker);
-
}
} catch (Exception e) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/InvokerWorker.java
Sun Aug 27 20:10:51 2006
@@ -21,15 +21,16 @@
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.Sequence;
-public class InvokerWorker implements Runnable {
+public class InvokerWorker extends SandeshaWorker implements Runnable {
ConfigurationContext configurationContext = null;
- InvokerBean invokerBean = null;
+ String messageContextKey;
+
Log log = LogFactory.getLog(InvokerWorker.class);
- public InvokerWorker (ConfigurationContext configurationContext,
InvokerBean invokerBean) {
+ public InvokerWorker (ConfigurationContext configurationContext, String
messageContextKey) {
this.configurationContext = configurationContext;
- this.invokerBean = invokerBean;
+ this.messageContextKey = messageContextKey;
}
public void run() {
@@ -46,9 +47,12 @@
//starting a transaction
transaction = storageManager.getTransaction();
- String key = invokerBean.getMessageContextRefKey();
+ InvokerBean invokerBean =
invokerBeanMgr.retrieve(messageContextKey);
- msgToInvoke =
storageManager.retrieveMessageContext(key, configurationContext);
+ String sequenceId = invokerBean.getSequenceID();
+ long messageNo = invokerBean.getMsgNo();
+
+ msgToInvoke =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
RMMsgContext rmMsg =
MsgInitializer.initializeMessage(msgToInvoke);
//endint the transaction before invocation.
@@ -56,8 +60,6 @@
boolean invoked = false;
- long messageNo = invokerBean.getMsgNo();
-
try {
// Invocation is not done within a transation.
This
@@ -82,12 +84,12 @@
if (postFailureInvocation) {
makeMessageReadyForReinjection(msgToInvoke);
if (log.isDebugEnabled())
- log.debug("Receiving message,
key=" + key + ", msgCtx="
+ log.debug("Receiving message,
key=" + messageContextKey + ", msgCtx="
+
msgToInvoke.getEnvelope().getHeader());
engine.receive(msgToInvoke);
} else {
if (log.isDebugEnabled())
- log.debug("Resuming message,
key=" + key + ", msgCtx="
+ log.debug("Resuming message,
key=" + messageContextKey + ", msgCtx="
+
msgToInvoke.getEnvelope().getHeader());
msgToInvoke.setPaused(false);
engine.resumeReceive(msgToInvoke);
@@ -110,17 +112,17 @@
// Service will be invoked only once. I.e. even if an
// exception get thrown in invocation
// the service will not be invoked again.
- invokerBeanMgr.delete(key);
+ invokerBeanMgr.delete(messageContextKey);
// removing the corresponding message context as well.
- MessageContext msgCtx =
storageManager.retrieveMessageContext(key, configurationContext);
+ MessageContext msgCtx =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
if (msgCtx != null) {
- storageManager.removeMessageContext(key);
+
storageManager.removeMessageContext(messageContextKey);
}
// updating the next msg to invoke
- String sequenceId = invokerBean.getSequenceID();
+ String s = invokerBean.getSequenceID();
NextMsgBean nextMsgBean =
nextMsgMgr.retrieve(sequenceId);
@@ -159,6 +161,10 @@
} finally {
if (transaction!=null && transaction.isActive())
transaction.commit();
+
+ if (workId !=null && lock!=null) {
+ lock.removeWork(workId);
+ }
}
}
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java?rev=437515&view=auto
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
(added)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaWorker.java
Sun Aug 27 20:10:51 2006
@@ -0,0 +1,23 @@
+package org.apache.sandesha2.workers;
+
+public class SandeshaWorker {
+ WorkerLock lock = null;
+ String workId = null;
+
+ public WorkerLock getLock() {
+ return lock;
+ }
+ public void setLock(WorkerLock lock) {
+ this.lock = lock;
+ }
+ public String getWorkId() {
+ return workId;
+ }
+ public void setWorkId(String workId) {
+ this.workId = workId;
+ }
+
+
+
+
+}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Sun Aug 27 20:10:51 2006
@@ -51,8 +51,11 @@
private transient ThreadFactory threadPool;
public int SENDER_THREADPOOL_SIZE = 5;
+ private WorkerLock lock = null;
+
public Sender () {
threadPool = new ThreadPool
(SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+ lock = new WorkerLock ();
}
public synchronized void stopSenderForTheSequence(String sequenceID) {
@@ -159,20 +162,42 @@
throw new SandeshaException(message);
}
+ //TODO make sure this locks on reads.
transaction = storageManager.getTransaction();
SenderBeanMgr mgr =
storageManager.getRetransmitterBeanMgr();
SenderBean senderBean = mgr.getNextMsgToSend();
if (senderBean == null) {
- if (log.isDebugEnabled())
- log.debug("SenderBean not
found");
+ if (log.isDebugEnabled()) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+ message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+ log.debug(message);
+ }
continue;
}
+ String messageId = senderBean.getMessageID();
+
+ //work Id is used to define the piece of work
that will be assigned to the Worker thread,
+ //to handle this Sender bean.
+ String workId = messageId;
+
+ //check weather the bean is already assigned to
a worker.
+ if (lock.isWorkPresent(workId)) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned);
+ message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
+ log.debug(message);
+ continue;
+ }
+
+ lock.addWork(workId);
+
transaction.commit();
- //start a worker which will work on this
message.s
- SenderWorker worker = new SenderWorker
(context,senderBean);
+ //start a worker which will work on this
messages.
+ SenderWorker worker = new SenderWorker
(context,messageId);
+ worker.setLock (lock);
+ worker.setWorkId(messageId);
threadPool.execute(worker);
} catch (Exception e) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Sun Aug 27 20:10:51 2006
@@ -31,18 +31,17 @@
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
-public class SenderWorker implements Runnable {
+public class SenderWorker extends SandeshaWorker implements Runnable {
ConfigurationContext configurationContext = null;
- SenderBean senderBean = null;
+ String messageId = null;
Log log = LogFactory.getLog(SenderWorker.class);
- public SenderWorker (ConfigurationContext configurationContext,
SenderBean senderBean) {
+ public SenderWorker (ConfigurationContext configurationContext, String
messageId) {
this.configurationContext = configurationContext;
- this.senderBean = senderBean;
+ this.messageId = messageId;
}
public void run () {
@@ -55,6 +54,7 @@
transaction = storageManager.getTransaction();
+ SenderBean senderBean =
senderBeanMgr.retrieve(messageId);
String key = senderBean.getMessageContextRefKey();
MessageContext msgCtx =
storageManager.retrieveMessageContext(key, configurationContext);
msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
@@ -197,6 +197,10 @@
} finally {
if (transaction!=null && transaction.isActive())
transaction.commit();
+
+ if (lock!=null && workId!=null) {
+ lock.removeWork(workId);
+ }
}
}
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java?rev=437515&view=auto
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
(added)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/WorkerLock.java
Sun Aug 27 20:10:51 2006
@@ -0,0 +1,25 @@
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+public class WorkerLock {
+
+ public ArrayList workList = null;
+
+ public WorkerLock () {
+ workList = new ArrayList ();
+ }
+
+ public synchronized void addWork (String work) {
+ workList.add(work);
+ }
+
+ public synchronized void removeWork (String work) {
+ workList.remove(work);
+ }
+
+ public synchronized boolean isWorkPresent (String work) {
+ return workList.contains(work);
+ }
+
+}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/RMElements.java
Sun Aug 27 20:10:51 2006
@@ -18,7 +18,9 @@
package org.apache.sandesha2.wsrm;
import java.util.ArrayList;
+import java.util.Iterator;
+import javax.swing.text.StyledEditorKit.ItalicAction;
import javax.xml.namespace.QName;
import org.apache.axiom.om.OMElement;
@@ -43,7 +45,10 @@
public class RMElements {
private Sequence sequence = null;
- private SequenceAcknowledgement sequenceAcknowledgement = null;
+
+ //there can be more than one sequence acks in a single message.
+ private ArrayList sequenceAcknowledgements = null;
+
private CreateSequence createSequence = null;
private CreateSequenceResponse createSequenceResponse = null;
private TerminateSequence terminateSequence = null;
@@ -56,10 +61,11 @@
private String addressingNamespaceValue = null;
public RMElements () {
-
+ sequenceAcknowledgements = new ArrayList ();
}
public RMElements (String addressingNamespace) {
+ this ();
this.addressingNamespaceValue = addressingNamespace;
}
@@ -168,14 +174,26 @@
closeSequenceResponse = new CloseSequenceResponse
(factory,rmNamespaceValue);
closeSequenceResponse.fromOMElement(envelope.getBody());
}
+
+ Iterator sequenceAcknowledgementIter = envelope.getHeader()
+ .getChildrenWithName (new
QName(rmNamespaceValue,
+
Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK));
+ while (sequenceAcknowledgementIter.hasNext()) {
+ OMElement sequenceAckElement = (OMElement)
sequenceAcknowledgementIter.next();
+ SequenceAcknowledgement sequenceAcknowledgement = new
SequenceAcknowledgement (factory,rmNamespaceValue);
+
sequenceAcknowledgement.fromOMElement(sequenceAckElement);
+
+ sequenceAcknowledgements.add(sequenceAcknowledgement);
+ }
}
public SOAPEnvelope toSOAPEnvelope(SOAPEnvelope envelope) throws
SandeshaException {
if (sequence != null) {
sequence.toOMElement(envelope.getHeader());
}
- if (sequenceAcknowledgement != null) {
-
sequenceAcknowledgement.toOMElement(envelope.getHeader());
+ for (Iterator
iter=sequenceAcknowledgements.iterator();iter.hasNext();) {
+ SequenceAcknowledgement sequenceAck =
(SequenceAcknowledgement) iter.next();
+ sequenceAck.toOMElement(envelope.getHeader());
}
if (createSequence != null) {
createSequence.toOMElement(envelope.getBody());
@@ -217,8 +235,8 @@
return sequence;
}
- public SequenceAcknowledgement getSequenceAcknowledgement() {
- return sequenceAcknowledgement;
+ public Iterator getSequenceAcknowledgements() {
+ return sequenceAcknowledgements.iterator();
}
public TerminateSequence getTerminateSequence() {
@@ -242,9 +260,13 @@
this.sequence = sequence;
}
- public void setSequenceAcknowledgement(
- SequenceAcknowledgement sequenceAcknowledgement) {
- this.sequenceAcknowledgement = sequenceAcknowledgement;
+ public void setSequenceAcknowledgements(
+ ArrayList sequenceAcknowledgements) {
+ this.sequenceAcknowledgements = sequenceAcknowledgements;
+ }
+
+ public void addSequenceAcknowledgement (SequenceAcknowledgement
sequenceAcknowledgement) {
+ sequenceAcknowledgements.add(sequenceAcknowledgement);
}
public void setTerminateSequence(TerminateSequence terminateSequence) {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java?rev=437515&r1=437514&r2=437515&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
Sun Aug 27 20:10:51 2006
@@ -69,23 +69,16 @@
return namespaceValue;
}
- public Object fromOMElement(OMElement element) throws
OMException,SandeshaException {
+ public Object fromOMElement(OMElement sequenceAckElement) throws
OMException,SandeshaException {
- OMElement sequenceAckPart = element;
- if (sequenceAckPart == null)
- throw new OMException(SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.seqAckPartIsNull));
-
- ackElement = sequenceAckPart;
-
- OMFactory factory = element.getOMFactory();
+ OMFactory factory = sequenceAckElement.getOMFactory();
if (factory==null)
factory = defaultFactory;
identifier = new Identifier(defaultFactory,namespaceValue);
- identifier.fromOMElement(sequenceAckPart);
+ identifier.fromOMElement(sequenceAckElement);
- Iterator ackRangeParts =
sequenceAckPart.getChildrenWithName(new QName(
+ Iterator ackRangeParts =
sequenceAckElement.getChildrenWithName(new QName(
namespaceValue,
Sandesha2Constants.WSRM_COMMON.ACK_RANGE));
while (ackRangeParts.hasNext()) {
@@ -96,7 +89,7 @@
acknowledgementRangeList.add(ackRange);
}
- Iterator nackParts = sequenceAckPart.getChildrenWithName(new
QName(
+ Iterator nackParts = sequenceAckElement.getChildrenWithName(new
QName(
namespaceValue,
Sandesha2Constants.WSRM_COMMON.NACK));
while (nackParts.hasNext()) {
@@ -109,18 +102,18 @@
String rmSpecVersion =
SpecSpecificConstants.getSpecVersionString (namespaceValue);
if (SpecSpecificConstants.isAckFinalAllowed(rmSpecVersion)) {
- OMElement ackFinalPart =
sequenceAckPart.getFirstChildWithName(new QName
(namespaceValue,Sandesha2Constants.WSRM_COMMON.FINAL));
+ OMElement ackFinalPart =
sequenceAckElement.getFirstChildWithName(new QName
(namespaceValue,Sandesha2Constants.WSRM_COMMON.FINAL));
if (ackFinalPart!=null) {
ackFinal = new AckFinal
(defaultFactory,namespaceValue);
- ackFinal.fromOMElement(sequenceAckPart);
+ ackFinal.fromOMElement(sequenceAckElement);
}
}
if (SpecSpecificConstants.isAckNoneAllowed(rmSpecVersion)) {
- OMElement ackNonePart =
sequenceAckPart.getFirstChildWithName(new QName
(namespaceValue,Sandesha2Constants.WSRM_COMMON.NONE));
+ OMElement ackNonePart =
sequenceAckElement.getFirstChildWithName(new QName
(namespaceValue,Sandesha2Constants.WSRM_COMMON.NONE));
if (ackNonePart!=null) {
ackNone = new AckNone
(defaultFactory,namespaceValue);
- ackNone.fromOMElement(sequenceAckPart);
+ ackNone.fromOMElement(sequenceAckElement);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]