Author: mlovett
Date: Thu Feb 1 05:53:37 2007
New Revision: 502213
URL: http://svn.apache.org/viewvc?view=rev&rev=502213
Log:
Avoid loading and saving message contexts when we get a create sequence response
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Thu Feb 1 05:53:37 2007
@@ -53,11 +53,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.util.SpecSpecificConstants;
-import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.CreateSequence;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.LastMessage;
-import org.apache.sandesha2.wsrm.MessageNumber;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceOffer;
@@ -592,16 +588,8 @@
else
rmMsg.setTo(toEPR);
- String rmVersion = rmsBean.getRMVersion();
-
- String rmNamespaceValue =
SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
- Sequence sequence = new Sequence(rmNamespaceValue);
- MessageNumber msgNumber = new MessageNumber(rmNamespaceValue);
- msgNumber.setMessageNumber(messageNumber);
- sequence.setMessageNumber(msgNumber);
-
// setting last message
+ boolean lastMessage = false;
if (msg.isServerSide()) {
MessageContext requestMsg = null;
@@ -617,7 +605,7 @@
}
if (requestSequence.getLastMessage() != null) {
- sequence.setLastMessage(new
LastMessage(rmNamespaceValue));
+ lastMessage = true;
}
} else {
@@ -627,51 +615,15 @@
if (operationContext != null) {
Object obj =
msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
if (obj != null && "true".equals(obj)) {
-
- if
(SpecSpecificConstants.isLastMessageIndicatorRequired(rmVersion))
- sequence.setLastMessage(new
LastMessage(rmNamespaceValue));
+ lastMessage = true;
}
}
}
- AckRequested ackRequested = null;
-
- boolean addAckRequested = false;
- // if (!lastMessage)
- // addAckRequested = true; //TODO decide the policy to add the
- // ackRequested tag
-
- // setting the Sequence id.
- // Set send = true/false depending on the availability of the
out
- // sequence id.
- String identifierStr = null;
- if (outSequenceID == null) {
- identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- } else {
- identifierStr = outSequenceID;
- }
-
- Identifier id1 = new Identifier(rmNamespaceValue);
- id1.setIndentifer(identifierStr);
- sequence.setIdentifier(id1);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,
sequence);
-
- if (addAckRequested) {
- ackRequested = new AckRequested(rmNamespaceValue);
- Identifier id2 = new Identifier(rmNamespaceValue);
- id2.setIndentifer(identifierStr);
- ackRequested.setIdentifier(id2);
-
rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST, ackRequested);
- }
-
- // Now that we have added the headers to the message, make sure
that we secure it with
- // the correct token.
+ // Now that we have decided which sequence to use for the
message, make sure that we secure
+ // it with the correct token.
RMMsgCreator.secureOutboundMessage(rmsBean, msg);
- rmMsg.addSOAPEnvelope();
-
-
// Retransmitter bean entry for the application message
SenderBean appMsgEntry = new SenderBean();
@@ -680,6 +632,7 @@
appMsgEntry.setTimeToSend(System.currentTimeMillis());
appMsgEntry.setMessageID(rmMsg.getMessageId());
appMsgEntry.setMessageNumber(messageNumber);
+ appMsgEntry.setLastMessage(lastMessage);
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
if (outSequenceID == null) {
appMsgEntry.setSend(false);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Feb 1 05:53:37 2007
@@ -44,17 +44,10 @@
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.wsrm.Accept;
-import org.apache.sandesha2.wsrm.AckRequested;
-import org.apache.sandesha2.wsrm.CloseSequence;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
-import org.apache.sandesha2.wsrm.Identifier;
-import org.apache.sandesha2.wsrm.Sequence;
-import org.apache.sandesha2.wsrm.TerminateSequence;
/**
* Responsible for processing an incoming Create Sequence Response message.
@@ -226,105 +219,20 @@
rmsBean.setLastActivatedTime(System.currentTimeMillis());
rmsBeanMgr.update(rmsBean);
+ // Locate and update all of the messages for this sequence, now
that we know
+ // the sequence id.
SenderBean target = new SenderBean();
target.setInternalSequenceID(internalSequenceId);
target.setSend(false);
-
+
Iterator iterator = retransmitterMgr.find(target).iterator();
while (iterator.hasNext()) {
SenderBean tempBean = (SenderBean) iterator.next();
- // updating the application message
- String key = tempBean.getMessageContextRefKey();
- MessageContext applicationMsg =
storageManager.retrieveMessageContext(key, configCtx);
-
- // TODO make following exception message more
understandable to the
- // user (probably some others exceptions messages as
well)
- if (applicationMsg == null)
- throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.unavailableAppMsg));
-
- String assumedRMNamespace =
SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());
-
- RMMsgContext applicaionRMMsg =
MsgInitializer.initializeMessage(applicationMsg);
-
- if (tempBean.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
-
- Sequence sequencePart = (Sequence)
applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (sequencePart == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new
Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-
- TerminateSequence sequencePart =
(TerminateSequence)
applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- if (sequencePart == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new
Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() ==
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
-
- CloseSequence sequencePart = (CloseSequence)
applicaionRMMsg.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
- if (sequencePart == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- Identifier identifier = new
Identifier(assumedRMNamespace);
- identifier.setIndentifer(newOutSequenceId);
-
- sequencePart.setIdentifier(identifier);
-
- } else if (tempBean.getMessageType() ==
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
-
- Iterator headerIterator =
applicaionRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
-
- AckRequested sequencePart = null;
- while (headerIterator.hasNext()) {
- sequencePart = (AckRequested)
headerIterator.next();
- }
-
- if (headerIterator.hasNext()) {
- throw new SandeshaException
(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
- }
-
- if (sequencePart == null) {
- String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.seqPartIsNull);
- log.debug(message);
- throw new SandeshaException(message);
- }
-
-
sequencePart.getIdentifier().setIndentifer(newOutSequenceId);
-
- }
-
- try {
- applicaionRMMsg.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage(), e);
- }
-
// asking to send the application msssage
tempBean.setSend(true);
tempBean.setSequenceID(newOutSequenceId);
retransmitterMgr.update(tempBean);
-
- // updating the message. this will correct the SOAP
envelope string.
- storageManager.updateMessageContext(key,
applicationMsg);
}
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext().setProperty(
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
Thu Feb 1 05:53:37 2007
@@ -96,6 +96,11 @@
private int messageType =0;
/**
+ * Flag to indicate if this is the last message for the sequence
+ */
+ private boolean lastMessage = false;
+
+ /**
* Flags that are used to check if the primitive types on this bean
* have been set. If a primitive type has not been set then it will
* be ignored within the match method.
@@ -107,6 +112,7 @@
private static final int RESEND_FLAG = 0x00001000;
private static final int TIME_TO_SEND_FLAG = 0x00010000;
private static final int MSG_TYPE_FLAG = 0x00100000;
+ private static final int LAST_MSG_FLAG = 0x01000000;
public SenderBean() {
@@ -216,6 +222,15 @@
public void setToAddress(String toAddress) {
this.toAddress = toAddress;
}
+
+ public boolean isLastMessage() {
+ return lastMessage;
+ }
+
+ public void setLastMessage(boolean lastMessage) {
+ this.lastMessage = lastMessage;
+ this.flags |= LAST_MSG_FLAG;
+ }
public String toString() {
StringBuffer result = new StringBuffer();
@@ -271,6 +286,10 @@
else if((bean.flags & MSG_TYPE_FLAG) != 0 &&
bean.getMessageType() != this.getMessageType())
match = false;
+ else if((bean.flags & LAST_MSG_FLAG) != 0 &&
bean.isLastMessage() != this.isLastMessage())
+ match = false;
+
return match;
}
+
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
Thu Feb 1 05:53:37 2007
@@ -91,27 +91,21 @@
if (context == null)
throw new
SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
- MessageContext createSeqmsgContext;
- try {
- // creating by copying common contents. (this will not
set contexts
- // except for configCtx).
- AxisOperation createSequenceOperation =
SpecSpecificConstants.getWSRMOperation(
-
Sandesha2Constants.MessageTypes.CREATE_SEQ,
- rmsBean.getRMVersion(),
- applicationMsgContext.getAxisService());
+ // creating by copying common contents. (this will not set
contexts
+ // except for configCtx).
+ AxisOperation createSequenceOperation =
SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.CREATE_SEQ,
+ rmsBean.getRMVersion(),
+ applicationMsgContext.getAxisService());
- createSeqmsgContext = SandeshaUtil
-
.createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
-
- OperationContext createSeqOpCtx =
createSeqmsgContext.getOperationContext();
- String createSeqMsgId = SandeshaUtil.getUUID();
- createSeqmsgContext.setMessageID(createSeqMsgId);
- context.registerOperationContext(createSeqMsgId,
createSeqOpCtx);
+ MessageContext createSeqmsgContext = SandeshaUtil
+
.createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
+
+ OperationContext createSeqOpCtx =
createSeqmsgContext.getOperationContext();
+ String createSeqMsgId = SandeshaUtil.getUUID();
+ createSeqmsgContext.setMessageID(createSeqMsgId);
+ context.registerOperationContext(createSeqMsgId,
createSeqOpCtx);
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage(), e);
- }
-
RMMsgContext createSeqRMMsg = new
RMMsgContext(createSeqmsgContext);
String rmNamespaceValue =
SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion());
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=502213&r1=502212&r2=502213
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Thu Feb 1 05:53:37 2007
@@ -1,6 +1,7 @@
package org.apache.sandesha2.workers;
import java.util.ArrayList;
+import java.util.Iterator;
import javax.xml.namespace.QName;
@@ -30,6 +31,7 @@
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
@@ -37,6 +39,12 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.LastMessage;
+import org.apache.sandesha2.wsrm.MessageNumber;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
public class SenderWorker extends SandeshaWorker implements Runnable {
@@ -149,16 +157,6 @@
int messageType = senderBean.getMessageType();
-// if (messageType ==
Sandesha2Constants.MessageTypes.APPLICATION) {
-// Sequence sequence = (Sequence)
rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-// String sequenceID =
sequence.getIdentifier().getIdentifier();
-// }
-
-// if (AcknowledgementManager.ackRequired (rmMsgCtx)) {
-// RMMsgCreator.addAckMessage(rmMsgCtx);
-
- //} else
-
if (isAckPiggybackableMsgType(messageType)) { //
checking weather this message can carry piggybacked acks
// checking weather this message can carry
piggybacked acks
// piggybacking if an ack if available for the
same
@@ -332,12 +330,73 @@
log.debug("Exit: SenderWorker::run");
}
+ /**
+ * Update the message before sending it. We adjust the retransmission
intervals and send counts
+ * for the message. If the message is an application message then we
ensure that we have added
+ * the Sequence header.
+ */
private boolean updateMessage(RMMsgContext rmMsgContext, SenderBean
senderBean, StorageManager storageManager) throws AxisFault {
boolean continueSending =
MessageRetransmissionAdjuster.adjustRetransmittion(
rmMsgContext, senderBean,
rmMsgContext.getConfigurationContext(), storageManager);
+ if(!continueSending) return false;
+
+ Identifier id = null;
+
+ if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
+ RMSequenceBean bean =
SandeshaUtil.getRMSBeanFromSequenceId(storageManager,
senderBean.getSequenceID());
+ String namespace =
SpecSpecificConstants.getRMNamespaceValue(bean.getRMVersion());
+ Sequence sequence = (Sequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence == null) {
+ sequence = new Sequence(namespace);
+
+ MessageNumber msgNumber = new
MessageNumber(namespace);
+
msgNumber.setMessageNumber(senderBean.getMessageNumber());
+ sequence.setMessageNumber(msgNumber);
+
+ if(senderBean.isLastMessage() &&
+
SpecSpecificConstants.isLastMessageIndicatorRequired(bean.getRMVersion())) {
+ sequence.setLastMessage(new
LastMessage(namespace));
+ }
+
+ // We just create the id here, we will add the
value in later
+ id = new Identifier(namespace);
+ sequence.setIdentifier(id);
+
+
rmMsgContext.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE, sequence);
+ }
+
+ } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+ TerminateSequence terminate = (TerminateSequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ id = terminate.getIdentifier();
+
+ } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE) {
+ CloseSequence close = (CloseSequence)
rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+ id = close.getIdentifier();
+
+ } else if(senderBean.getMessageType() ==
Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+ // The only time that we can have a message of this
type is when we are sending a
+ // stand-alone ack request, and in that case we only
expect to find a single ack
+ // request header in the message.
+ Iterator ackRequests =
rmMsgContext.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
+ AckRequested ackRequest = (AckRequested)
ackRequests.next();
+ if (ackRequests.hasNext()) {
+ throw new SandeshaException
(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
+ }
+ id = ackRequest.getIdentifier();
+ }
+
+ // TODO consider adding an extra ack request, as we are about
to send the message and we
+ // know which sequence it is associated with.
+
+ if(id != null &&
!senderBean.getSequenceID().equals(id.getIdentifier())) {
+ id.setIndentifer(senderBean.getSequenceID());
+
+ // Write the changes back into the message context
+ rmMsgContext.addSOAPEnvelope();
+ }
- return continueSending;
+ return true;
}
private boolean isAckPiggybackableMsgType(int messageType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]