Author: mlovett
Date: Thu Dec 14 01:59:01 2006
New Revision: 487154
URL: http://svn.apache.org/viewvc?view=rev&rev=487154
Log:
Support the RM anon URI template, see SANDESHA2-62.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Thu Dec 14 01:59:01 2006
@@ -60,9 +60,10 @@
public interface SPEC_2006_08 {
- String NS_URI = "http://docs.oasis-open.org/ws-rx/wsrm/200608";
- String SEC_NS_URI =
"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd";
-
+ String NS_URI =
"http://docs.oasis-open.org/ws-rx/wsrm/200608";
+ String ANONYMOUS_URI_PREFIX =
"http://docs.oasis-open.org/ws-rx/wsrm/200608/anonymous?id=";
+ String SEC_NS_URI =
"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd";
+
public interface Actions {
String ACTION_CREATE_SEQUENCE =
"http://docs.oasis-open.org/ws-rx/wsrm/200608/CreateSequence";
@@ -551,8 +552,6 @@
String MODULE_CLASS_LOADER = "Sandesha2ModuleClassLoader";
String SECURITY_MANAGER = "Sandesha2SecurityManager";
-
- String WSRM_ANONYMOUS_URI_PREFIX =
"http://docs.oasis-open.org/ws-rx/wsrm/200608/anonymous";
String RM_IN_OUT_OPERATION_NAME = "RMInOutOperation";
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageHelper.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
Thu Dec 14 01:59:01 2006
@@ -1,3 +1,20 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
package org.apache.sandesha2.i18n;
import java.util.Locale;
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Thu Dec 14 01:59:01 2006
@@ -1,3 +1,20 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ *
+ */
+
package org.apache.sandesha2.i18n;
public class SandeshaMessageKeys {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
Thu Dec 14 01:59:01 2006
@@ -187,11 +187,7 @@
// the
//
RMMsgCreator
- String addressingNamespaceURI =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
storageManager);
- String anonymousURI =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (anonymousURI.equals(acksTo.getAddress())) {
+ if (acksTo.hasAnonymousAddress()) {
AxisEngine engine = new
AxisEngine(ackRMMsgCtx.getMessageContext().getConfigurationContext());
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
Thu Dec 14 01:59:01 2006
@@ -48,7 +48,6 @@
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
-import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.CreateSequence;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -267,12 +266,7 @@
}
EndpointReference toEPR = new
EndpointReference(toBean.getValue());
-
- String addressingNamespaceURI =
SandeshaUtil.getSequenceProperty(newSequenceId,
-
Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
storageManager);
- String anonymousURI =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (anonymousURI.equals(toEPR.getAddress())) {
+ if (toEPR.hasAnonymousAddress()) {
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
"true");
} else {
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
"false");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Dec 14 01:59:01 2006
@@ -22,7 +22,6 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
@@ -131,6 +130,19 @@
String sequencePropertyKey =
SandeshaUtil.getSequencePropertyKey(createSeqResponseRMMsgCtx);
createSeqBean.setSequenceID(newOutSequenceId);
+
+ // We must poll for any reply-to that uses the anonymous URI.
If it is a ws-a reply to then
+ // the create must include an offer (or this client cannot be
identified). If the reply-to
+ // is the RM anon URI template then the offer is not required.
+ if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
{
+ String replyToAddress =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+
Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
+ if(SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+ createSeqBean.setPollingMode(true);
+ SandeshaUtil.startPollingManager(configCtx);
+ }
+ }
+
createSeqMgr.update(createSeqBean);
SenderBean createSequenceSenderBean =
retransmitterMgr.retrieve(createSeqMsgId);
@@ -154,6 +166,7 @@
sequencePropMgr.insert(newToken);
}
+
// processing for accept (offer has been sent)
Accept accept = createSeqResponsePart.getAccept();
if (accept != null) {
@@ -183,20 +196,6 @@
rMDBean.setNextMsgNoToProcess(1);
- boolean pollingMode = false;
- if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
{
- String replyToAddress =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
- if (replyToAddress!=null) {
- if
(AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(replyToAddress))
- pollingMode = true;
- else if
(AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(replyToAddress))
- pollingMode = true;
- else if
(replyToAddress.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
- pollingMode = true;
- }
- }
-
//Storing the referenceMessage of the sending side
sequence as the reference message
//of the receiving side as well.
//This can be used when creating new outgoing messages.
@@ -208,12 +207,19 @@
storageManager.storeMessageContext(newMessageStoreKey,referenceMsg);
rMDBean.setReferenceMessageKey(newMessageStoreKey);
-
- rMDBean.setPollingMode(pollingMode);
-
- //if PollingMode is true, starting the pollingmanager.
- if (pollingMode)
- SandeshaUtil.startPollingManager(configCtx);
+
+ // If this is an offered sequence that needs polling
then we need to setup the
+ // rmdBean for polling too, so that it still gets
serviced after the outbound
+ // sequence terminates.
+ if
(Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion()))
{
+ String replyToAddress =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+
Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, storageManager);
+ EndpointReference ref = new
EndpointReference(replyToAddress);
+ if(!createSeqBean.isPollingMode() &&
ref.hasAnonymousAddress()) {
+ rMDBean.setPollingMode(true);
+
SandeshaUtil.startPollingManager(configCtx);
+ }
+ }
RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
nextMsgMgr.insert(rMDBean);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Thu Dec 14 01:59:01 2006
@@ -296,13 +296,15 @@
RMMsgContext ackRMMsgCtx =
AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey,
sequenceId, storageManager);
MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
- boolean anonAck = ackRMMsgCtx.getTo().hasAnonymousAddress();
+ EndpointReference ackTo = ackRMMsgCtx.getTo();
EndpointReference replyTo = rmMsgCtx.getReplyTo();
+ boolean anonAck = ackTo == null || ackTo.hasAnonymousAddress();
+ boolean anonReply = replyTo == null ||
replyTo.hasAnonymousAddress();
// Only use the backchannel for ack messages if we are sure
that the application
// doesn't need it. A 1-way MEP should be complete by now.
boolean complete = ackMsgCtx.getOperationContext().isComplete();
- if (anonAck && !complete && (replyTo == null ||
replyTo.hasAnonymousAddress())) {
+ if (anonAck && anonReply && !complete) {
if (log.isDebugEnabled()) log.debug("Exit:
SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
return;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Thu Dec 14 01:59:01 2006
@@ -140,16 +140,7 @@
engine.send(outMessage);
- String addressingNamespaceURI = SandeshaUtil
- .getSequenceProperty(
- sequencePropertyKey,
-
Sandesha2Constants.SequenceProperties.ADDRESSING_NAMESPACE_VALUE,
- storageManager);
-
- String anonymousURI = SpecSpecificConstants
-
.getAddressingAnonymousURI(addressingNamespaceURI);
-
- if (anonymousURI.equals(toEPR.getAddress())) {
+ if (toEPR.hasAnonymousAddress()) {
terminateSeqMsg.getOperationContext().setProperty(
org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
} else {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
Thu Dec 14 01:59:01 2006
@@ -19,7 +19,6 @@
import java.util.HashMap;
import java.util.List;
-import java.util.Random;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -30,11 +29,14 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
@@ -50,6 +52,11 @@
private ConfigurationContext configurationContext = null;
private StorageManager storageManager = null;
private boolean poll = false;
+
+ // Variables used to help round-robin across the sequences that we can
poll for
+ private int rmsIndex = 0;
+ private int rmdIndex = 0;
+
/**
* By adding an entry to this, the PollingManager will be asked to do a
polling request on this sequence.
*/
@@ -62,8 +69,13 @@
Transaction t = null;
try {
t = storageManager.getTransaction();
- internalRun();
+ pollRMDSide();
t.commit();
+
+ t = storageManager.getTransaction();
+ pollRMSSide();
+ t.commit();
+
t = null;
} catch (Exception e) {
if(log.isDebugEnabled()) log.debug("Exception",
e);
@@ -84,50 +96,74 @@
}
}
- private void internalRun() throws AxisFault {
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
+ private void pollRMSSide() throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMSSide");
+ RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
+ RMSBean findRMS = new RMSBean();
+ findRMS.setPollingMode(true);
+ List results = rmsBeanManager.find(findRMS);
+ int size = results.size();
+ log.debug("Choosing one from " + size + " RMS sequences");
+ if(rmsIndex >= size) {
+ rmsIndex = 0;
+ if (size == 0) {
+ if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMSSide, nothing to poll");
+ return;
+ }
+ }
+ RMSBean beanToPoll = (RMSBean) results.get(rmsIndex++);
+ pollForSequence(beanToPoll.getSequenceID(),
beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey());
+
+ if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMSSide");
+ }
+
+ private void pollRMDSide() throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMDSide");
//geting the sequences to be polled.
//if shedule contains any requests, do the earliest one.
//else pick one randomly.
-
+ RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
String sequenceId = getNextSheduleEntry ();
- RMDBean nextMsgBean = null;
RMDBean findBean = new RMDBean();
findBean.setPollingMode(true);
findBean.setSequenceID(sequenceId); // Note that this may be
null
List results = nextMsgMgr.find(findBean);
int size = results.size();
- if (size>0) {
- Random random = new Random ();
- int item = random.nextInt(size);
- nextMsgBean = (RMDBean) results.get(item);
- }
- //If not valid entry is found, try again later.
- if (nextMsgBean==null) {
- if(log.isDebugEnabled()) log.debug("No polling requests
queued");
- return;
+ log.debug("Choosing one from " + size + " RMD sequences");
+ if(rmdIndex >= size) {
+ rmdIndex = 0;
+ if (size == 0) {
+ if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollRMDSide, nothing to poll");
+ return;
+ }
}
- sequenceId = nextMsgBean.getSequenceID();
-
- if(log.isDebugEnabled()) log.debug("Polling for sequence " +
sequenceId);
+ RMDBean nextMsgBean = (RMDBean) results.get(rmdIndex++);
+ pollForSequence(nextMsgBean.getSequenceID(),
nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey());
+
+ if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollRMDSide");
+ }
+
+ private void pollForSequence(String sequenceId, String
sequencePropertyKey, String referenceMsgKey) throws SandeshaException,
SandeshaStorageException, AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry:
PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey +
", " + referenceMsgKey);
//create a MakeConnection message
- String referenceMsgKey = nextMsgBean.getReferenceMessageKey();
-
- String sequencePropertyKey = sequenceId;
String replyTo =
SandeshaUtil.getSequenceProperty(sequencePropertyKey,
Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,storageManager);
String WSRMAnonReplyToURI = null;
- if (SandeshaUtil.isWSRMAnonymousReplyTo(replyTo))
+ if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
+ // If we are polling on a RM anon URI then we don't
want to include the sequence id
+ // in the MakeConnection message.
+ sequenceId = null;
WSRMAnonReplyToURI = replyTo;
+ }
MessageContext referenceMessage =
storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
RMMsgContext referenceRMMessage =
MsgInitializer.initializeMessage(referenceMessage);
RMMsgContext makeConnectionRMMessage =
RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
- sequenceId , WSRMAnonReplyToURI,storageManager);
+ sequenceId, WSRMAnonReplyToURI, storageManager);
// Put our transaction onto the message context
makeConnectionRMMessage.setProperty(Sandesha2Constants.WITHIN_TRANSACTION,
Sandesha2Constants.VALUE_TRUE);
@@ -159,7 +195,9 @@
SandeshaUtil.executeAndStore(makeConnectionRMMessage,
makeConnectionMsgStoreKey);
- senderBeanMgr.insert(makeConnectionSenderBean);
+ senderBeanMgr.insert(makeConnectionSenderBean);
+
+ if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::pollForSequence");
}
private synchronized String getNextSheduleEntry () {
@@ -168,12 +206,11 @@
if (sheduledPollingRequests.size()>0) {
sequenceId = (String)
sheduledPollingRequests.keySet().iterator().next();
- Integer sequencEntryCount = (Integer)
sheduledPollingRequests.get(sequenceId);
+ Integer sequencEntryCount = (Integer)
sheduledPollingRequests.remove(sequenceId);
Integer leftCount = new Integer
(sequencEntryCount.intValue() -1 );
- if (leftCount.intValue()==0)
- sheduledPollingRequests.remove(sequenceId);
-
+ if (leftCount.intValue() > 0)
+ sheduledPollingRequests.put(sequenceId,
leftCount);
}
if(log.isDebugEnabled()) log.debug("Exit:
PollingManager::getNextSheduleEntry, " + sequenceId);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
Thu Dec 14 01:59:01 2006
@@ -54,7 +54,11 @@
private String securityTokenData;
-// private boolean pollingMode;
+ /**
+ * Set to true if this RMS used a WS-RM anon replyTo, so that we can
poll for messages
+ * related to that URI.
+ */
+ private boolean pollingMode;
/**
@@ -124,6 +128,14 @@
this.referenceMessageStoreKey = referenceMessageStoreKey;
}
+ public boolean isPollingMode() {
+ return pollingMode;
+ }
+
+ public void setPollingMode(boolean pollingMode) {
+ this.pollingMode = pollingMode;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -133,6 +145,7 @@
result.append("\nHas SecurityToken: ");
result.append(securityTokenData != null && securityTokenData.length() > 0);
result.append("\nCreateSeq Msg Key: ");
result.append(createSequenceMsgStoreKey);
result.append("\nReference Msg Key: ");
result.append(referenceMessageStoreKey);
+ result.append("\nPolling : ");
result.append(pollingMode);
return result.toString();
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Thu Dec 14 01:59:01 2006
@@ -153,10 +153,6 @@
return invokerBeanMgr;
}
- public void init(ConfigurationContext context) {
- setContext(context);
- }
-
public static InMemoryStorageManager getInstance(
ConfigurationContext context) {
if (instance == null)
@@ -196,6 +192,7 @@
}
public void storeMessageContext(String key,MessageContext msgContext) {
+ if(log.isDebugEnabled()) log.debug("Entry:
InMemoryStorageManager::storeMessageContext, key: " + key);
HashMap storageMap = (HashMap)
getContext().getProperty(MESSAGE_MAP_KEY);
if (storageMap==null) {
@@ -224,9 +221,12 @@
envMap.put(key, clonedEnvelope);
}
+ if(log.isDebugEnabled()) log.debug("Exit:
InMemoryStorageManager::storeMessageContext, key: " + key);
}
public void updateMessageContext(String key,MessageContext msgContext)
throws SandeshaStorageException {
+ if(log.isDebugEnabled()) log.debug("Entry:
InMemoryStorageManager::updateMessageContext, key: " + key);
+
HashMap storageMap = (HashMap)
getContext().getProperty(MESSAGE_MAP_KEY);
if (storageMap==null) {
@@ -246,9 +246,13 @@
envMap.remove(key);
storeMessageContext(key,msgContext);
+
+ if(log.isDebugEnabled()) log.debug("Exit:
InMemoryStorageManager::updateMessageContext, key: " + key);
}
public void removeMessageContext(String key) throws
SandeshaStorageException {
+ if(log.isDebugEnabled()) log.debug("Entry:
InMemoryStorageManager::removeMessageContext, key: " + key);
+
HashMap storageMap = (HashMap)
getContext().getProperty(MESSAGE_MAP_KEY);
HashMap envelopeMap = (HashMap)
getContext().getProperty(ENVELOPE_MAP_KEY);
@@ -259,23 +263,13 @@
if (envelopeMap!=null)
envelopeMap.remove(key);
+ if(log.isDebugEnabled()) log.debug("Exit:
InMemoryStorageManager::removeMessageContext, key: " + key);
}
public void initStorage (AxisModule moduleDesc) {
}
- public SOAPEnvelope retrieveSOAPEnvelope(String key) throws
SandeshaStorageException {
- // TODO no real value
- return null;
- }
-
- public void storeSOAPEnvelope(SOAPEnvelope envelope, String key) throws
SandeshaStorageException {
- // TODO no real value
- }
-
-
-
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
Thu Dec 14 01:59:01 2006
@@ -87,7 +87,7 @@
if(log.isDebugEnabled())
log.debug("Piggybacking ack for " + inboundSequenceId);
RMMsgCreator.addAckMessage(rmMessageContext,
inboundSequenceId, inboundSequenceId, storageManager);
}
- if(log.isDebugEnabled()) log.debug("Enter:
AcknowledgementManager::piggybackAcksIfPresent, anon");
+ if(log.isDebugEnabled()) log.debug("Exit:
AcknowledgementManager::piggybackAcksIfPresent, anon");
return;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
Thu Dec 14 01:59:01 2006
@@ -26,7 +26,6 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -141,12 +140,7 @@
}
// Finally fall back to using an anonymous endpoint
if (offeredEndpoint==null) {
- String anon = null;
-
if(AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNamespaceValue)) {
- anon =
AddressingConstants.Final.WSA_ANONYMOUS_URL;
- } else {
- anon =
AddressingConstants.Submission.WSA_ANONYMOUS_URL;
- }
+ String anon =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespaceValue);
offeredEndpoint = new EndpointReference(anon);
}
if (offeredSequence != null &&
!"".equals(offeredSequence)) {
@@ -172,14 +166,8 @@
if (replyTo == null) {
// using wsa:Anonymous as ReplyTo
-
- String addressingNamespace = applicationRMMsg
- .getAddressingNamespaceValue();
- if (AddressingConstants.Submission.WSA_NAMESPACE
- .equals(addressingNamespace))
- replyTo =
AddressingConstants.Submission.WSA_ANONYMOUS_URL;
- else
- replyTo =
AddressingConstants.Final.WSA_ANONYMOUS_URL;
+ String addressingNamespace =
applicationRMMsg.getAddressingNamespaceValue();
+ replyTo =
SpecSpecificConstants.getAddressingAnonymousURI(addressingNamespace);
}
if (to == null) {
@@ -288,11 +276,6 @@
if (terminateMessage == null)
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgContextNotSet));
- // setUpMessage(referenceMessage, terminateMessage);
-
- SOAPFactory factory =
SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(referenceMessage
- .getEnvelope()));
-
terminateMessage.setMessageID(SandeshaUtil.getUUID());
AxisOperation referenceMsgOperation =
referenceMessage.getAxisOperation();
@@ -595,8 +578,12 @@
makeConnection.setAddress(address);
}
- //setting the addressing properties
- makeConnectionMessageCtx.setTo(new EndpointReference
(referenceMessage.getTo().getAddress()));
+ // Setting the addressing properties. As this is a poll we must
send it to an non-anon
+ // EPR, so we check both To and ReplyTo from the reference
message
+ EndpointReference epr = referenceMessage.getTo();
+ if(epr.hasAnonymousAddress()) epr =
referenceMessage.getReplyTo();
+
+ makeConnectionMessageCtx.setTo(epr);
makeConnectionMessageCtx.setWSAAction(SpecSpecificConstants.getMakeConnectionAction(rmVersion));
makeConnectionMessageCtx.setMessageID(SandeshaUtil.getUUID());
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Thu Dec 14 01:59:01 2006
@@ -65,7 +65,6 @@
import org.apache.axis2.util.UUIDGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.rampart.RampartMessageData;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
@@ -86,7 +85,6 @@
import org.apache.sandesha2.wsrm.CloseSequenceResponse;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-import org.apache.ws.security.handler.WSHandlerConstants;
/**
* Contains utility methods that are used in many plases of Sandesha2.
@@ -1088,27 +1086,12 @@
}
}
- public static boolean isWSRMAnonymousReplyTo (String replyTo) {
- if (replyTo!=null &&
replyTo.startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
+ public static boolean isWSRMAnonymous(String address) {
+ if (address!=null &&
address.startsWith(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX))
return true;
else
return false;
}
-
- public static boolean isAnonymousURI (String address) {
- if (address==null)
- return false;
-
- if
(AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address.trim()))
- return true;
- else if
(AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address.trim()))
- return true;
- else if (isWSRMAnonymousReplyTo(address))
- return true;
-
- return false;
- }
-
public static void executeAndStore (RMMsgContext rmMsgContext, String
storageKey) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SandeshaUtil::executeAndStore, " +
storageKey);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Thu Dec 14 01:59:01 2006
@@ -8,7 +8,6 @@
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collection;
import javax.xml.namespace.QName;
@@ -22,8 +21,6 @@
import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.engine.ListenerManager;
-import org.apache.axis2.i18n.Messages;
-import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -52,7 +49,7 @@
private static Log log = LogFactory.getLog(SequenceManager.class);
/**
- * Set up a new outbound sequence, triggered by the arrival of a create
sequence message. As this
+ * Set up a new inbound sequence, triggered by the arrival of a create
sequence message. As this
* is an inbound sequence, the sequencePropertyKey is the sequenceId.
*/
public static String setupNewSequence(RMMsgContext createSequenceMsg,
StorageManager storageManager, SecurityManager securityManager, SecurityToken
token)
@@ -85,7 +82,8 @@
throw new AxisFault(message);
}
- ConfigurationContext configurationContext =
createSequenceMsg.getMessageContext().getConfigurationContext();
+ MessageContext createSeqContext =
createSequenceMsg.getMessageContext();
+ ConfigurationContext configurationContext =
createSeqContext.getConfigurationContext();
SequencePropertyBeanMgr seqPropMgr =
storageManager.getSequencePropertyBeanMgr();
@@ -130,8 +128,22 @@
}
RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
- nextMsgMgr.insert(new RMDBean(sequenceId, 1)); // 1 will be the
-
// next
+
+ RMDBean rmdBean = new RMDBean();
+ rmdBean.setSequenceID(sequenceId);
+ rmdBean.setNextMsgNoToProcess(1);
+
+ // If this sequence has a 'To' address that is anonymous then
we must have got the
+ // message as a response to a poll. We need to make sure that
we keep polling until
+ // the sequence is closed.
+ if(to.hasAnonymousAddress()) {
+ String newKey = SandeshaUtil.getUUID();
+ rmdBean.setPollingMode(true);
+ rmdBean.setReferenceMessageKey(newKey);
+ storageManager.storeMessageContext(newKey,
createSeqContext);
+ }
+
+ nextMsgMgr.insert(rmdBean);
// message to invoke. This will apply for only in-order
invocations.
Modified:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java?view=diff&rev=487154&r1=487153&r2=487154
==============================================================================
---
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
(original)
+++
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
Thu Dec 14 01:59:01 2006
@@ -17,6 +17,7 @@
package org.apache.sandesha2.scenarios;
import java.io.File;
+import java.util.ArrayList;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.Constants;
@@ -25,6 +26,7 @@
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.log4j.BasicConfigurator;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaTestCase;
import org.apache.sandesha2.client.SandeshaClient;
@@ -50,21 +52,39 @@
startServer(repoPath, axis2_xml);
}
- public void testSyncEcho () throws Exception {
+ public void testSyncEchoWithOffer() throws Exception {
+
+ Options clientOptions = new Options ();
+ String offeredSequenceID = SandeshaUtil.getUUID();
+
clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID);
+
+ runSyncEchoTest(clientOptions);
+
+ }
+
+ public void testSyncEchoWithRMAnon() throws Exception {
+
+ Options clientOptions = new Options ();
+ EndpointReference ref = new EndpointReference(
+
Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX +
+ SandeshaUtil.getUUID());
+ clientOptions.setReplyTo(ref);
+ runSyncEchoTest(clientOptions);
+
+ }
+
+ public void runSyncEchoTest(Options clientOptions) throws Exception {
String to = "http://127.0.0.1:" + serverPort +
"/axis2/services/RMSampleService";
String repoPath = "target" + File.separator + "repos" +
File.separator + "client";
String axis2_xml = repoPath + File.separator +
"client_axis2.xml";
ConfigurationContext configContext =
ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
- Options clientOptions = new Options ();
clientOptions.setAction(echoAction);
clientOptions.setTo(new EndpointReference (to));
String sequenceKey = SandeshaUtil.getUUID();
- String offeredSequenceID = SandeshaUtil.getUUID();
clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE,
"true");
clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
-
clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID);
clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
clientOptions.setTransportInProtocol(Constants.TRANSPORT_HTTP);
@@ -93,7 +113,12 @@
assertEquals(SequenceReport.SEQUENCE_STATUS_TERMINATED,
sequenceReport.getSequenceStatus());
//assertions for the in sequence
- sequenceReport =
SandeshaClient.getIncomingSequenceReport(offeredSequenceID, configContext);
+ ArrayList reports =
SandeshaClient.getIncomingSequenceReports(configContext);
+ assertEquals("Reports count", 1,
reports.size());
+
+ sequenceReport = (SequenceReport)
reports.get(0);
+ String offer = (String)
clientOptions.getProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID);
+ if(offer != null) assertEquals("Offered id",
offer, sequenceReport.getSequenceID());
assertTrue(sequenceReport.getCompletedMessages().contains(new Long(1)));
assertEquals(SequenceReport.SEQUENCE_DIRECTION_IN,
sequenceReport.getSequenceDirection());
assertEquals(SequenceReport.SEQUENCE_STATUS_TERMINATED,
sequenceReport.getSequenceStatus());
@@ -110,5 +135,4 @@
configContext.getListenerManager().stop();
serviceClient.cleanup();
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]