Author: gatfora
Date: Fri Mar 9 07:51:40 2007
New Revision: 516440
URL: http://svn.apache.org/viewvc?view=rev&rev=516440
Log:
Enable duplicate messages to be processed by Reliable messaging. Duplicate
message is detected by the SandeshaGlobalInHandler, then processed by the
SequenceProcessor.processReliableMessage
Modified:
webservices/sandesha/trunk/java/config/module.xml
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
Modified: webservices/sandesha/trunk/java/config/module.xml
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Fri Mar 9 07:51:40 2007
@@ -75,6 +75,10 @@
<messageReceiver
class="org.apache.sandesha2.msgreceivers.RMMessageReceiver"/>
</operation>
+ <operation name="RMInOutDuplicateMessageOperation"
mep="http://www.w3.org/2006/01/wsdl/in-out">
+ <messageReceiver
class="org.apache.sandesha2.msgreceivers.RMMessageReceiver"/>
+ </operation>
+
<supported-policy-namespaces
namespaces="http://ws.apache.org/sandesha2/policy" />
<wsp:Policy xmlns:wsp="http://schemas.xmlsoap.org/ws/2004/09/policy"
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
Fri Mar 9 07:51:40 2007
@@ -249,8 +249,10 @@
int MAKE_CONNECTION_MSG = 11;
int LAST_MESSAGE = 12;
-
- int MAX_MESSAGE_TYPE = 12;
+
+ int DUPLICATE_MESSAGE = 13;
+
+ int MAX_MESSAGE_TYPE = 13;
}
public interface MessageParts {
@@ -541,6 +543,7 @@
static final String INBOUND_MESSAGE_NUMBER =
"Sandesha2InboundMessageNumber";
static final String INBOUND_LAST_MESSAGE =
"Sandesha2InboundLastMessage";
static final String MAKECONNECTION_ENTRY =
"Sandesha2MakeConnectionEntry";
+ static final String RM_MESSAGE_CONTEXT = "RMMessageContext";
}
public interface Assertions {
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Fri Mar 9 07:51:40 2007
@@ -17,15 +17,34 @@
package org.apache.sandesha2.handlers;
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPHeader;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -84,10 +103,103 @@
}
}
-
+
+ // Check if this is an application message and if it is a duplicate
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgContext);
+
+ // Set the RMMMessageContext as a property on the message so we can
retrieve it later
+
msgContext.setProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT,
rmMsgCtx);
+
+ if (rmMsgCtx.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+ processApplicationMessage(rmMsgCtx);
+ }
+
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaGlobalInHandler::invoke,
continuing");
+ log.debug("Exit: SandeshaGlobalInHandler::invoke " +
InvocationResponse.CONTINUE);
return InvocationResponse.CONTINUE;
}
+ private static void processApplicationMessage(RMMsgContext rmMsgCtx) throws
AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaGlobalInHandler::processApplicationMessage");
+ // Check if this is a duplicate message
+ Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceId = sequence.getIdentifier().getIdentifier();
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+
+ StorageManager storageManager =
+
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getConfigurationContext(),
+ rmMsgCtx.getConfigurationContext().getAxisConfiguration());
+
+ Transaction transaction = storageManager.getTransaction();
+
+ try {
+
+ // Check that both the Sequence header and message body have been
secured properly
+ RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
+ RMDBean bean = mgr.retrieve(sequenceId);
+
+ if(bean != null && bean.getSecurityTokenData() != null) {
+ SecurityManager secManager =
SandeshaUtil.getSecurityManager(rmMsgCtx.getConfigurationContext());
+
+ QName seqName = new QName(rmMsgCtx.getRMNamespaceValue(),
Sandesha2Constants.WSRM_COMMON.SEQUENCE);
+
+ SOAPEnvelope envelope = rmMsgCtx.getSOAPEnvelope();
+ OMElement body = envelope.getBody();
+ OMElement seqHeader =
envelope.getHeader().getFirstChildWithName(seqName);
+
+ SecurityToken token =
secManager.recoverSecurityToken(bean.getSecurityTokenData());
+
+ secManager.checkProofOfPossession(token, seqHeader,
rmMsgCtx.getMessageContext());
+ secManager.checkProofOfPossession(token, body,
rmMsgCtx.getMessageContext());
+ }
+
+ if (bean != null) {
+
+ if (msgNo == 0) {
+ String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
+ .toString(msgNo));
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ // Get the server completed message ranges list
+ RangeString serverCompletedMessageRanges =
bean.getServerCompletedMessages();
+
+ // See if the message is in the list of completed ranges
+ boolean msgNoPresentInList =
+ serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
+
+ if (!msgNoPresentInList) {
+ serverCompletedMessageRanges.addRange(new Range(msgNo));
+
+ storageManager.getRMDBeanMgr().update(bean);
+ }
+ else {
+ // Add the duplicate RM AxisOperation to the message
+ AxisOperation duplicateMessageOperation =
SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE,
+ Sandesha2Constants.SPEC_VERSIONS.v1_0,
+ rmMsgCtx.getMessageContext().getAxisService());
+
rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
+ }
+
+ } else {
+ // Add the duplicate RM AxisOperation to the message
+ AxisOperation duplicateMessageOperation =
SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE,
+ Sandesha2Constants.SPEC_VERSIONS.v1_0,
+ rmMsgCtx.getMessageContext().getAxisService());
+
rmMsgCtx.getMessageContext().setAxisOperation(duplicateMessageOperation);
+ }
+ transaction.commit();
+ transaction = null;
+ }
+ finally {
+ if (transaction != null)
+ transaction.rollback();
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaGlobalInHandler::processApplicationMessage");
+ }
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
Fri Mar 9 07:51:40 2007
@@ -90,9 +90,14 @@
throw new AxisFault(message);
}
- //processing any incoming faults.
- RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
+ RMMsgContext rmMsgCtx = null;
+
+ if
(msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT)
!= null)
+ rmMsgCtx =
(RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+ else
+ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+ //processing any incoming faults.
//This is responsible for Sandesha2 specific
FaultManager.processMessagesForFaults(rmMsgCtx);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Fri Mar 9 07:51:40 2007
@@ -51,8 +51,6 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.Range;
-import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.workers.SandeshaThread;
@@ -171,13 +169,6 @@
return InvocationResponse.ABORT;
}
- if (msgNo == 0) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
- .toString(msgNo));
- log.debug(message);
- throw new SandeshaException(message);
- }
-
// Pause the messages bean if not the right message to invoke.
// updating the last activated time of the sequence.
@@ -207,15 +198,9 @@
bean.setHighestInMessageId(messageId);
bean.setHighestInMessageNumber(msgNo);
}
-
- // Get the server completed message ranges list
- RangeString serverCompletedMessageRanges =
bean.getServerCompletedMessages();
- // See if the message is in the list of completed ranges
- boolean msgNoPresentInList =
-
serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
String specVersion = rmMsgCtx.getRMSpecVersion();
- if (msgNoPresentInList
+ if
(rmMsgCtx.getMessageContext().getAxisOperation().getName().getLocalPart().equals("RMInOutDuplicateMessageOperation")
&&
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE ==
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
// this is a duplicate message and the invocation type
is EXACTLY_ONCE. We try to return
// ack messages at this point, as if someone is sending
duplicates then they may have
@@ -254,11 +239,6 @@
if (log.isDebugEnabled())
log.debug("Exit:
SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
return result;
- }
-
- if (!msgNoPresentInList)
- {
- serverCompletedMessageRanges.addRange(new Range(msgNo));
}
// If the message is a reply to an outbound message then we can
update the RMSBean that
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java
Fri Mar 9 07:51:40 2007
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.msgprocessors.MsgProcessor;
@@ -48,7 +49,13 @@
public final void receive(MessageContext msgCtx) throws AxisFault {
if(log.isDebugEnabled()) log.debug("Entry:
RMMessageReceiver::receive");
- RMMsgContext rmMsgCtx =
MsgInitializer.initializeMessage(msgCtx);
+ RMMsgContext rmMsgCtx = null;
+
+ if
(msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT)
!= null)
+ rmMsgCtx =
(RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
+ else
+ rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
if(log.isDebugEnabled()) log.debug("MsgReceiver got type: " +
SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
// Note that some messages (such as stand-alone acks) will be
routed here, but
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
Fri Mar 9 07:51:40 2007
@@ -346,8 +346,11 @@
case Sandesha2Constants.MessageTypes.ACK_REQUEST:
case Sandesha2Constants.MessageTypes.LAST_MESSAGE:
result = service.getOperation(new
QName("RMOutOnlyOperation"));
- break;
- }
+ break;
+ case Sandesha2Constants.MessageTypes.DUPLICATE_MESSAGE:
+ result = service.getOperation(new
QName("RMInOutDuplicateMessageOperation"));
+ break;
+ }
} else
if(rmSpecLevel.equals(Sandesha2Constants.SPEC_VERSIONS.v1_1)) {
switch(messageType) {
case Sandesha2Constants.MessageTypes.CREATE_SEQ:
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Fri Mar 9 07:51:40 2007
@@ -169,6 +169,14 @@
// sending the message
boolean successfullySent = false;
+ // have to commit the transaction before sending. This
may
+ // get changed when WS-AT is available.
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ transaction = storageManager.getTransaction();
+ }
+
// Although not actually sent yet, update the send
count to indicate an attempt
if (senderBean.isReSend()) {
SenderBean bean2 =
senderBeanMgr.retrieve(senderBean.getMessageID());
Modified:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=516440&r1=516439&r2=516440
==============================================================================
---
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
(original)
+++
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
Fri Mar 9 07:51:40 2007
@@ -101,6 +101,8 @@
public void testSyncEcho() throws Exception {
// Test sync echo with an offer, and the 1.1 spec
Options clientOptions = new Options();
+// org.apache.log4j.BasicConfigurator.configure();
+// to = "http://127.0.0.1:" + 9999 +
"/axis2/services/RMSampleService";
clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,SandeshaUtil.getUUID());
clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
runEcho(clientOptions, false, false, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]