Author: mckierna
Date: Thu Jul 31 05:38:36 2008
New Revision: 681358
URL: http://svn.apache.org/viewvc?rev=681358&view=rev
Log:
See https://issues.apache.org/jira/browse/SANDESHA2-172, thanks Dave
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Thu Jul 31 05:38:36 2008
@@ -181,12 +181,13 @@
// offered seq id
String offeredSequenceID =
offer.getIdentifer().getIdentifier();
- boolean offerAccepted =
offerAccepted(offeredSequenceID, context, createSeqRMMsg, storageManager);
-
+ boolean isValidseqID =
isValidseqID(offeredSequenceID, context, createSeqRMMsg, storageManager);
+ boolean offerAccepted = true;
+
RMSBean rMSBean = null;
//Before processing this offer any further we
need to perform some extra checks
//on the offered EP if WS-RM Spec 1.1 is being
used
- if(offerAccepted &&
Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
+ if(isValidseqID &&
Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmdBean.getRMVersion())){
Endpoint endpoint = offer.getEndpoint();
if (endpoint!=null) {
//Check to see if the offer
endpoint has a value of WSA Anonymous
@@ -210,20 +211,28 @@
log.debug("Offer
Refused as it included a null endpoint");
offerAccepted = false;
}
- } else if (offerAccepted &&
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
+ } else if (isValidseqID &&
Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmdBean.getRMVersion())){
rMSBean = new RMSBean();
}
- if (offerAccepted) {
+ String outgoingSideInternalSequenceId =
SandeshaUtil
+
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
+ if(isValidseqID){
// Setting the CreateSequence table
entry for the outgoing
// side.
-
rMSBean.setSequenceID(offeredSequenceID);
- String outgoingSideInternalSequenceId =
SandeshaUtil
-
.getOutgoingSideInternalSequenceID(rmdBean.getSequenceID());
+
rMSBean.setSequenceID(offeredSequenceID);
rMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
// this is a dummy value
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
+ //Try inserting the new RMSBean
+
if(!storageManager.getRMSBeanMgr().insert(rMSBean)){
+ offerAccepted = false;
+ }
+ }
+
+ if (offerAccepted) {
if(rmdBean.getToEndpointReference() !=
null){
rMSBean.setToEndpointReference(rmdBean.getToEndpointReference());
} else {
@@ -263,7 +272,7 @@
// Set the SOAP Version for this
sequence.
rMSBean.setSoapVersion(SandeshaUtil.getSOAPVersion(createSeqRMMsg.getSOAPEnvelope()));
-
storageManager.getRMSBeanMgr().insert(rMSBean);
+
storageManager.getRMSBeanMgr().update(rMSBean);
SandeshaUtil.startWorkersForSequence(context, rMSBean);
@@ -371,34 +380,25 @@
return true;
}
- private boolean offerAccepted(String sequenceId, ConfigurationContext
configCtx, RMMsgContext createSeqRMMsg,
+ private boolean isValidseqID(String sequenceId, ConfigurationContext
configCtx, RMMsgContext createSeqRMMsg,
StorageManager storageManager) throws SandeshaException
{
if (log.isDebugEnabled())
- log.debug("Enter: CreateSeqMsgProcessor::offerAccepted,
" + sequenceId);
+ log.debug("Enter: CreateSeqMsgProcessor::isValidseqID,
" + sequenceId);
if ("".equals(sequenceId)) {
if (log.isDebugEnabled())
- log.debug("Exit:
CreateSeqMsgProcessor::offerAccepted, " + false);
- return false;
- }
-
- RMSBean createSeqFindBean = new RMSBean();
- createSeqFindBean.setSequenceID(sequenceId);
- Collection arr =
storageManager.getRMSBeanMgr().find(createSeqFindBean);
-
- if (arr.size() > 0) {
- if (log.isDebugEnabled())
- log.debug("Exit:
CreateSeqMsgProcessor::offerAccepted, " + false);
+ log.debug("Exit:
CreateSeqMsgProcessor::isValidseqID, " + false);
return false;
}
+
if (sequenceId.length() <= 1) {
if (log.isDebugEnabled())
- log.debug("Exit:
CreateSeqMsgProcessor::offerAccepted, " + false);
+ log.debug("Exit:
CreateSeqMsgProcessor::isValidseqID, " + false);
return false; // Single character offers are NOT
accepted.
}
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::offerAccepted,
" + true);
+ log.debug("Exit: CreateSeqMsgProcessor::isValidseqID, "
+ true);
return true;
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Jul 31 05:38:36 2008
@@ -46,6 +46,7 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -132,7 +133,7 @@
"Existing id:" +
rmsBean.getSequenceID() + ", new id:" + newOutSequenceId);
return false;
}
-
+
// Store the new sequence id
rmsBean.setSequenceID(newOutSequenceId);
@@ -147,110 +148,119 @@
}
}
}
-
- // Get the CreateSeqBean based on the message id to take a lock on the bean
- SenderBean createSeqBean = retransmitterMgr.retrieve(createSeqMsgId);
-
- // deleting the create sequence sender bean entry.
- retransmitterMgr.delete(createSeqBean.getMessageID());
+
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
- // Remove the create sequence message
-
storageManager.removeMessageContext(rmsBean.getCreateSequenceMsgStoreKey());
-
- // processing for accept (offer has been sent)
- Accept accept = createSeqResponsePart.getAccept();
- if (accept != null) {
-
- // TODO this should be detected in the Fault manager.
- if (rmsBean.getOfferedSequence() == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
- log.debug(message);
- throw new SandeshaException(message);
+ if(!rmsBeanMgr.update(rmsBean)){
+ //Im not setting the createSeqBean sender bean to
resend true as the reallocation of msgs will do this
+ try{
+ TerminateManager.terminateSendingSide(rmsBean,
storageManager, true);
+ } catch(Exception e){
+ if (log.isDebugEnabled())
+ log.debug(e);
}
+ } else {
+ // processing for accept (offer has been sent)
+ Accept accept = createSeqResponsePart.getAccept();
+ if (accept != null) {
+
+ // TODO this should be detected in the Fault
manager.
+ if (rmsBean.getOfferedSequence() == null) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.accptButNoSequenceOffered);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
- RMDBean rMDBean = new RMDBean();
-
- EndpointReference acksToEPR =
accept.getAcksTo().getEPR();
- rMDBean.setAcksToEndpointReference(acksToEPR);
- rMDBean.setSequenceID(rmsBean.getOfferedSequence());
- rMDBean.setNextMsgNoToProcess(1);
-
rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
+ RMDBean rMDBean = new RMDBean();
+
+ EndpointReference acksToEPR =
accept.getAcksTo().getEPR();
+ rMDBean.setAcksToEndpointReference(acksToEPR);
+
rMDBean.setSequenceID(rmsBean.getOfferedSequence());
+ rMDBean.setNextMsgNoToProcess(1);
+
rMDBean.setOutboundInternalSequence(rmsBean.getInternalSequenceID());
-
rMDBean.setServiceName(createSeqResponseRMMsgCtx.getMessageContext().getAxisService().getName());
-
- //Storing the referenceMessage of the sending side
sequence as the reference message
- //of the receiving side as well.
- //This can be used when creating new outgoing messages.
-
- String referenceMsgStoreKey =
rmsBean.getReferenceMessageStoreKey();
- MessageContext referenceMsg =
storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
-
- String newMessageStoreKey = SandeshaUtil.getUUID();
-
storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
-
- rMDBean.setReferenceMessageKey(newMessageStoreKey);
+
rMDBean.setServiceName(createSeqResponseRMMsgCtx.getMessageContext().getAxisService().getName());
+
+ //Storing the referenceMessage of the sending
side sequence as the reference message
+ //of the receiving side as well.
+ //This can be used when creating new outgoing
messages.
+
+ String referenceMsgStoreKey =
rmsBean.getReferenceMessageStoreKey();
+ MessageContext referenceMsg =
storageManager.retrieveMessageContext(referenceMsgStoreKey, configCtx);
+
+ String newMessageStoreKey =
SandeshaUtil.getUUID();
+
storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
+
+
rMDBean.setReferenceMessageKey(newMessageStoreKey);
- // If this is an offered sequence that needs polling
then we need to setup the
- // rmdBean for polling too, so that it still gets
serviced after the outbound
- // sequence terminates.
- if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
{
- if(rmsBean.isPollingMode()) {
- rMDBean.setPollingMode(true);
+ // If this is an offered sequence that needs
polling then we need to setup the
+ // rmdBean for polling too, so that it still
gets serviced after the outbound
+ // sequence terminates.
+ if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
{
+ if(rmsBean.isPollingMode()) {
+ rMDBean.setPollingMode(true);
+ }
}
- }
-
- String rmSpecVersion =
createSeqResponseRMMsgCtx.getRMSpecVersion();
- rMDBean.setRMVersion(rmSpecVersion);
-
- EndpointReference toEPR =
createSeqResponseRMMsgCtx.getTo();
- if (toEPR==null) {
- //Most probably this is a sync response
message, using the replyTo of the request message
- OperationContext operationContext =
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
- if (operationContext!=null) {
- MessageContext createSequnceMessage =
operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
- if (createSequnceMessage!=null)
- toEPR =
createSequnceMessage.getReplyTo();
+
+ String rmSpecVersion =
createSeqResponseRMMsgCtx.getRMSpecVersion();
+ rMDBean.setRMVersion(rmSpecVersion);
+
+ EndpointReference toEPR =
createSeqResponseRMMsgCtx.getTo();
+ if (toEPR==null) {
+ //Most probably this is a sync response
message, using the replyTo of the request message
+ OperationContext operationContext =
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext();
+ if (operationContext!=null) {
+ MessageContext
createSequnceMessage =
operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_OUT_VALUE);
+ if (createSequnceMessage!=null)
+ toEPR =
createSequnceMessage.getReplyTo();
+ }
}
- }
-
- if (toEPR!=null)
- rMDBean.setToAddress(toEPR.getAddress());
-
- rMDBean.setServerCompletedMessages(new RangeString());
- RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+
+ if (toEPR!=null)
+
rMDBean.setToAddress(toEPR.getAddress());
+
+ rMDBean.setServerCompletedMessages(new
RangeString());
+ RMDBeanMgr rmdBeanMgr =
storageManager.getRMDBeanMgr();
- // Store the security token for the offered sequence
-
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
-
-
rMDBean.setLastActivatedTime(System.currentTimeMillis());
+ // Store the security token for the offered
sequence
+
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
+
+
rMDBean.setLastActivatedTime(System.currentTimeMillis());
+
+ rmdBeanMgr.insert(rMDBean);
+ SandeshaUtil.startWorkersForSequence(configCtx,
rMDBean);
+ }
- rmdBeanMgr.insert(rMDBean);
- SandeshaUtil.startWorkersForSequence(configCtx,
rMDBean);
+ // Get the CreateSeqBean based on the message id to
take a lock on the bean
+ SenderBean createSeqBean =
retransmitterMgr.retrieve(createSeqMsgId);
+
+ // deleting the create sequence sender bean entry.
+ retransmitterMgr.delete(createSeqBean.getMessageID());
+
+ // Remove the create sequence message
+
storageManager.removeMessageContext(rmsBean.getCreateSequenceMsgStoreKey());
+ SandeshaUtil.startWorkersForSequence(configCtx,
rmsBean);
+
+ // Locate and update all of the messages for this
sequence, now that we know
+ // the sequence id.
+ SenderBean target = new SenderBean();
+ target.setInternalSequenceID(internalSequenceId);
+ target.setSend(false);
+
+ Iterator iterator =
retransmitterMgr.find(target).iterator();
+ while (iterator.hasNext()) {
+ SenderBean tempBean = (SenderBean)
iterator.next();
+
+ // asking to send the application msssage
+ tempBean.setSend(true);
+ tempBean.setSequenceID(newOutSequenceId);
+ retransmitterMgr.update(tempBean);
+ }
+
+ // TODO - does this do anything?
+ createSeqResponseRMMsgCtx.pause();
}
- rmsBean.setLastActivatedTime(System.currentTimeMillis());
- rmsBeanMgr.update(rmsBean);
- SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
-
- // Locate and update all of the messages for this sequence, now
that we know
- // the sequence id.
- SenderBean target = new SenderBean();
- target.setInternalSequenceID(internalSequenceId);
- target.setSend(false);
-
- Iterator iterator = retransmitterMgr.find(target).iterator();
- while (iterator.hasNext()) {
- SenderBean tempBean = (SenderBean) iterator.next();
-
- // asking to send the application msssage
- tempBean.setSend(true);
- tempBean.setSequenceID(newOutSequenceId);
- retransmitterMgr.update(tempBean);
- }
-
- // TODO - does this do anything?
- createSeqResponseRMMsgCtx.pause();
-
if (log.isDebugEnabled())
log.debug("Exit:
CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
return true;
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryRMSBeanMgr.java
Thu Jul 31 05:38:36 2008
@@ -26,6 +26,9 @@
import org.apache.axis2.context.AbstractContext;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -36,27 +39,46 @@
private ConcurrentHashMap seqID2csm = new ConcurrentHashMap();
private ConcurrentHashMap intSeqID2csm = new ConcurrentHashMap();
+ private ConcurrentHashMap inUseSeqIDs = new ConcurrentHashMap();
public InMemoryRMSBeanMgr(InMemoryStorageManager mgr, AbstractContext
context) {
super(mgr, context, Sandesha2Constants.BeanMAPs.CREATE_SEQUECE);
}
+
+ private boolean isSeqIDUsable(String seqID, String createSeqMsgID,
boolean isInsert){
+ boolean isUsable = true;
+ if(seqID != null) {
+ Object o = inUseSeqIDs.putIfAbsent(seqID,
createSeqMsgID);
+
+ if(isInsert && o!= null){
+ isUsable = false;
+ }
+
+ if(o != null && !o.equals(createSeqMsgID)){
+ isUsable = false;
+ }
+ }
+ return isUsable;
+ }
public boolean insert(RMSBean bean) throws SandeshaStorageException {
boolean res = false;
lock.lock();
- if(intSeqID2csm.get(bean.getInternalSequenceID())==null){
- res = super.insert(bean.getCreateSeqMsgID(), bean);
- if(res){
- if(bean.getInternalSequenceID()!=null){
-
intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
- }
- if(bean.getSequenceID()!=null){
- seqID2csm.put(bean.getSequenceID(),
bean.getCreateSeqMsgID());
+ if(isSeqIDUsable(bean.getSequenceID(),
bean.getCreateSeqMsgID(), true)){
+
if(intSeqID2csm.get(bean.getInternalSequenceID())==null){
+ res = super.insert(bean.getCreateSeqMsgID(),
bean);
+ if(res){
+ if(bean.getInternalSequenceID()!=null){
+
intSeqID2csm.put(bean.getInternalSequenceID(), bean.getCreateSeqMsgID());
+ }
+ if(bean.getSequenceID()!=null){
+
seqID2csm.put(bean.getSequenceID(), bean.getCreateSeqMsgID());
+ }
}
- }
- }
- lock.unlock();
+ }
+ }
+ lock.unlock();
return res;
}
@@ -65,7 +87,8 @@
RMSBean removed = (RMSBean) super.delete(msgId);
if(removed!=null){
seqID2csm.remove(removed.getSequenceID());
- intSeqID2csm.remove(removed.getInternalSequenceID());
+ intSeqID2csm.remove(removed.getInternalSequenceID());
+ inUseSeqIDs.remove(removed.getSequenceID());
}
return removed!=null;
@@ -76,15 +99,19 @@
}
public boolean update(RMSBean bean) throws SandeshaStorageException {
- boolean result = super.update(bean.getCreateSeqMsgID(), bean);
- if(bean.getInternalSequenceID()!=null){
- intSeqID2csm.put(bean.getInternalSequenceID(),
bean.getCreateSeqMsgID());
- }
- if(bean.getSequenceID()!=null){
- seqID2csm.put(bean.getSequenceID(),
bean.getCreateSeqMsgID());
- }
- return result;
+ boolean result = false;
+ if(isSeqIDUsable(bean.getSequenceID(),
bean.getCreateSeqMsgID(), false)){
+ result = super.update(bean.getCreateSeqMsgID(), bean);
+ if(bean.getInternalSequenceID()!=null){
+ intSeqID2csm.put(bean.getInternalSequenceID(),
bean.getCreateSeqMsgID());
+ }
+ if(bean.getSequenceID()!=null){
+ seqID2csm.put(bean.getSequenceID(),
bean.getCreateSeqMsgID());
+ }
+ }
+
+ return result;
}
public List find(RMSBean bean) throws SandeshaStorageException {
Modified:
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java?rev=681358&r1=681357&r2=681358&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
(original)
+++
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/storage/RMSBeanMgrTest.java
Thu Jul 31 05:38:36 2008
@@ -82,22 +82,18 @@
createSeqBean2.setCreateSeqMsgID("CreateSeqMsgId2");
createSeqBean2.setSequenceID("SeqId1");
- mgr.insert(createSeqBean1);
- mgr.insert(createSeqBean2);
+ assertTrue(mgr.insert(createSeqBean1));
+
+ //This RMSBean won't get added
+ //as we protect against adding two RMSBeans with identical Seq ID's
+ assertFalse(mgr.insert(createSeqBean2));
RMSBean target = new RMSBean();
target.setSequenceID("SeqId1");
Iterator iter = mgr.find(target).iterator();
RMSBean tmp = (RMSBean) iter.next();
- if (tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1")) {
- tmp = (RMSBean) iter.next();
- assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId2"));
-
- } else {
- tmp = (RMSBean) iter.next();
- assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1"));
- }
+ assertTrue(tmp.getCreateSeqMsgID().equals("CreateSeqMsgId1"));
}
public void testInsert() throws SandeshaStorageException{
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]