Author: mckierna
Date: Wed Dec 12 08:12:56 2007
New Revision: 603654
URL: http://svn.apache.org/viewvc?rev=603654&view=rev
Log:
Ensure RMD can close sequences
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=603654&r1=603653&r2=603654&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
Wed Dec 12 08:12:56 2007
@@ -38,6 +38,8 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.RMMsgCreator;
@@ -71,10 +73,13 @@
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
.getAxisConfiguration());
- RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ RMSequenceBean rmBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ if(rmBean==null){
+ rmBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
+ }
//check the security credentials
- SandeshaUtil.assertProofOfPossession(rmdBean, msgCtx,
msgCtx.getEnvelope().getBody());
+ SandeshaUtil.assertProofOfPossession(rmBean, msgCtx,
msgCtx.getEnvelope().getBody());
if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId,
storageManager, false)) {
if (log.isDebugEnabled())
@@ -83,32 +88,35 @@
}
// throwing a fault if the sequence is terminated
- if (FaultManager.checkForSequenceTerminated(rmMsgCtx,
sequenceId, rmdBean, false)) {
+ if (FaultManager.checkForSequenceTerminated(rmMsgCtx,
sequenceId, rmBean, false)) {
if (log.isDebugEnabled())
log.debug("Exit:
CloseSequenceProcessor::processInMessage, Sequence terminated");
return false;
}
- rmdBean.setClosed(true);
- storageManager.getRMDBeanMgr().update(rmdBean);
-
- RMMsgContext ackRMMsgCtx =
AcknowledgementManager.generateAckMessage(rmMsgCtx, rmdBean, sequenceId,
storageManager, true);
- // adding the ack part(s) to the envelope.
- Iterator sequenceAckIter =
ackRMMsgCtx.getSequenceAcknowledgements();
+ rmBean.setClosed(true);
+ Iterator sequenceAckIter = null;
+ if(rmBean instanceof RMDBean){
+ storageManager.getRMDBeanMgr().update((RMDBean)rmBean);
+ RMMsgContext ackRMMsgCtx =
AcknowledgementManager.generateAckMessage(rmMsgCtx, (RMDBean)rmBean,
sequenceId, storageManager, true);
+ // adding the ack part(s) to the envelope.
+ sequenceAckIter =
ackRMMsgCtx.getSequenceAcknowledgements();
+ }
+ else{
+ storageManager.getRMSBeanMgr().update((RMSBean)rmBean);
+ }
- RMMsgContext closeSeqResponseRMMsg =
RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, rmdBean);
+ RMMsgContext closeSeqResponseRMMsg =
RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, rmBean);
MessageContext closeSequenceResponseMsg =
closeSeqResponseRMMsg.getMessageContext();
- while (sequenceAckIter.hasNext()) {
+ while (sequenceAckIter!=null && sequenceAckIter.hasNext()) {
SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) sequenceAckIter.next();
closeSeqResponseRMMsg.addSequenceAcknowledgement(sequenceAcknowledgement);
}
closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true");
-
closeSequenceResponseMsg.setResponseWritten(true);
-
closeSeqResponseRMMsg.addSOAPEnvelope();
//
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=603654&r1=603653&r2=603654&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Wed Dec 12 08:12:56 2007
@@ -43,6 +43,7 @@
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
@@ -91,10 +92,13 @@
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
// Check that the sender of this TerminateSequence holds the
correct token
- RMDBean rmdBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ RMSequenceBean rmBean =
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+ if(rmBean==null){
+ rmBean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
+ }
//check security credentials
- SandeshaUtil.assertProofOfPossession(rmdBean, terminateSeqMsg,
+ SandeshaUtil.assertProofOfPossession(rmBean, terminateSeqMsg,
terminateSeqMsg.getEnvelope().getBody());
if (FaultManager.checkForUnknownSequence(terminateSeqRMMsg,
sequenceId, storageManager, false)) {
@@ -106,24 +110,21 @@
// add the terminate sequence response if required.
RMMsgContext terminateSequenceResponse = null;
if
(SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
- terminateSequenceResponse =
getTerminateSequenceResponse(terminateSeqRMMsg, rmdBean, sequenceId,
storageManager);
+ terminateSequenceResponse =
getTerminateSequenceResponse(terminateSeqRMMsg, rmBean, sequenceId,
storageManager);
setUpHighestMsgNumbers(context, storageManager, sequenceId,
terminateSeqRMMsg);
-
-
boolean inOrderInvocation =
SandeshaUtil.getDefaultPropertyBean(context.getAxisConfiguration()).isInOrder();
-
- //if the invocation is inOrder and if this is RM 1.1 there is a
posibility of all the messages having eleady being invoked.
+ //if the invocation is inOrder and if this is RM 1.1 there is a
posibility of all the messages having aleady being invoked.
//In this case we should do the full termination.
boolean doFullTermination = false;
- if (inOrderInvocation) {
+ if (inOrderInvocation && rmBean instanceof RMDBean) {
- long highestMsgNo = rmdBean.getHighestInMessageNumber();
- long nextMsgToProcess = rmdBean.getNextMsgNoToProcess();
+ long highestMsgNo =
((RMDBean)rmBean).getHighestInMessageNumber();
+ long nextMsgToProcess =
((RMDBean)rmBean).getNextMsgNoToProcess();
if (nextMsgToProcess>highestMsgNo) {
//all the messages have been invoked, u can do
the full termination
@@ -140,10 +141,14 @@
} else
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId,
storageManager);
- rmdBean.setTerminated(true);
- rmdBean.setLastActivatedTime(System.currentTimeMillis());
- storageManager.getRMDBeanMgr().update(rmdBean);
-
+ rmBean.setTerminated(true);
+ rmBean.setLastActivatedTime(System.currentTimeMillis());
+ if(rmBean instanceof RMDBean){
+ storageManager.getRMDBeanMgr().update((RMDBean)rmBean);
+ }
+ else{
+ storageManager.getRMSBeanMgr().update((RMSBean)rmBean);
+ }
//sending the terminate sequence response
if (terminateSequenceResponse != null) {
@@ -313,25 +318,26 @@
log.debug("Exit:
TerminateSeqMsgProcessor::setUpHighestMsgNumbers");
}
- private RMMsgContext getTerminateSequenceResponse(RMMsgContext
terminateSeqRMMsg, RMDBean rmdBean, String sequenceId,
+ private RMMsgContext getTerminateSequenceResponse(RMMsgContext
terminateSeqRMMsg, RMSequenceBean rmBean, String sequenceId,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
TerminateSeqMsgProcessor::addTerminateSequenceResponse, " + sequenceId);
- RMMsgContext terminateSeqResponseRMMsg =
RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg, rmdBean);
+ RMMsgContext terminateSeqResponseRMMsg =
RMMsgCreator.createTerminateSeqResponseMsg(terminateSeqRMMsg, rmBean);
MessageContext outMessage =
terminateSeqResponseRMMsg.getMessageContext();
-
- RMMsgContext ackRMMessage =
AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, rmdBean,
- sequenceId, storageManager, true);
-
- // copy over the ack parts
- Iterator iter = ackRMMessage.getSequenceAcknowledgements();
- while (iter.hasNext()) {
- SequenceAcknowledgement seqAck =
(SequenceAcknowledgement) iter.next();
-
terminateSeqResponseRMMsg.addSequenceAcknowledgement(seqAck);
- }
+ if(rmBean instanceof RMDBean){
+ RMMsgContext ackRMMessage =
AcknowledgementManager.generateAckMessage(terminateSeqRMMsg, (RMDBean)rmBean,
+ sequenceId, storageManager, true);
+
+ // copy over the ack parts
+ Iterator iter =
ackRMMessage.getSequenceAcknowledgements();
+ while (iter.hasNext()) {
+ SequenceAcknowledgement seqAck =
(SequenceAcknowledgement) iter.next();
+
terminateSeqResponseRMMsg.addSequenceAcknowledgement(seqAck);
+ }
+ }
terminateSeqResponseRMMsg.addSOAPEnvelope();
terminateSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]