Author: gatfora
Date: Fri Jan 26 01:09:46 2007
New Revision: 500191
URL: http://svn.apache.org/viewvc?view=rev&rev=500191
Log:
Refactor ack processing inside SequenceProcessor
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=500191&r1=500190&r2=500191
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Fri Jan 26 01:09:46 2007
@@ -400,9 +400,8 @@
rmsBean.setNextMessageNumber(messageNumber);
if (messageNumber == 1 && !sendCreateSequence) {
- // if first message - setup the sending side sequence -
both for the
- // server and the client sides
- SequenceManager.setupNewClientSequence(msgContext,
sequencePropertyKey, specVersion, storageManager);
+ // Start the sender for the service side.
+ SandeshaUtil.startSenderForTheSequence(configContext,
outSequenceID);
}
RelatesTo relatesTo = msgContext.getRelatesTo();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=500191&r1=500190&r2=500191
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Fri Jan 26 01:09:46 2007
@@ -30,8 +30,8 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +46,6 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.RMDBean;
@@ -185,11 +184,7 @@
}
String outgoingSideInternalSequenceId =
SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
- RMSBean findRMSBean = new RMSBean ();
-
findRMSBean.setInternalSequenceID(outgoingSideInternalSequenceId);
-
- RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
- RMSBean rmsBean = rmsBeanMgr.findUnique (findRMSBean);
+ RMSBean rmsBean =
SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager,
outgoingSideInternalSequenceId);
if (rmsBean==null) {
String message = "Cannot find a entries for the
response side sequence";
throw new SandeshaException (message);
@@ -216,7 +211,7 @@
Long msgNoOfInMsg = (Long)
outMessageContext.getProperty(Sandesha2Constants.MSG_NO_OF_IN_MSG);
if (msgNoOfInMsg == null) {
MessageContext inMsgContextOfOutMessage
= outMessageContext.getOperationContext()
-
.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+
.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
RMMsgContext inRMMsgContextOfOutMessage
= MsgInitializer.initializeMessage(inMsgContextOfOutMessage);
if (inMsgContextOfOutMessage != null) {
@@ -326,34 +321,35 @@
// else
// add an ack entry here
+ EndpointReference acksTo = new EndpointReference
(bean.getAcksToEPR());
- String acksToAddress = bean.getAcksToEPR();
- EndpointReference acksTo = new EndpointReference
(acksToAddress);
-
- if (acksTo!=null && acksTo.hasAnonymousAddress()) {
-
- long timeToSend = -1; //having a negative value for
timeToSend will make this behave as having an infinite ack interval.
- RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId,
storageManager,false,true);
- AcknowledgementManager.removeAckBeanEntries(sequenceId,
storageManager);
- AcknowledgementManager.addAckBeanEntry(ackRMMsgContext
,sequenceId, timeToSend, storageManager);
+ if (acksTo!=null && acksTo.hasAnonymousAddress() &&
+ WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep)) {
+ Object responseWritten =
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
+ if (responseWritten==null ||
!Constants.VALUE_TRUE.equals(responseWritten)) {
+ RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId,
storageManager,false,true);
+
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Constants.VALUE_TRUE);
+
AcknowledgementManager.sendAckNow(ackRMMsgContext);
+ }
} else { //Scenario 2 and Scenario 3
SandeshaPolicyBean policyBean =
SandeshaUtil.getPropertyBean (msgCtx.getAxisOperation());
if (policyBean==null) {
String message = "Cant find the policy bean
from the passed Axis2 description";
throw new SandeshaException (message);
}
-
- long ackInterval =
policyBean.getAcknowledgementInterval();
- long timeToSend = System.currentTimeMillis() +
ackInterval;
+ // having a negative value for timeToSend
will make this behave as having an infinite ack interval.
+ long timeToSend = -1;
+ if (acksTo!=null && !acksTo.hasAnonymousAddress()) {
+ long ackInterval =
policyBean.getAcknowledgementInterval();
+ timeToSend = System.currentTimeMillis() +
ackInterval;
+ }
RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx, sequenceId,
storageManager,false,true);
AcknowledgementManager.removeAckBeanEntries(sequenceId,
storageManager);
AcknowledgementManager.addAckBeanEntry(ackRMMsgContext,
sequenceId, timeToSend, storageManager);
}
-
-
-
+
if (inOrderInvocation) {
//if replyTo is anonymous and this is not an InOnly
message
@@ -376,29 +372,9 @@
//ack bean entry added previously may
cause an ack to be piggybacked.
} else {
result = InvocationResponse.ABORT;
-
- //in this case, we will be adding a
sync ack if acksTo is anonymous.
- if (acksTo!=null &&
acksTo.hasAnonymousAddress()) {
- Object responseWritten =
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
- if (responseWritten==null ||
!Constants.VALUE_TRUE.equals(responseWritten)) {
- RMMsgContext
ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx ,
sequenceId, storageManager,false,true);
-
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Constants.VALUE_TRUE);
-
AcknowledgementManager.sendAckNow(ackRMMsgContext);
- }
- }
}
} else {
result = InvocationResponse.ABORT;
-
- //in this case, we will be adding a sync ack if
acksTo is anonymous.
- if (acksTo!=null &&
acksTo.hasAnonymousAddress()) {
- Object responseWritten =
msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
- if (responseWritten==null ||
!Constants.VALUE_TRUE.equals(responseWritten)) {
- RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId,
storageManager,false,true);
-
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
Constants.VALUE_TRUE);
-
AcknowledgementManager.sendAckNow(ackRMMsgContext);
- }
- }
}
@@ -421,7 +397,6 @@
// Starting the invoker if stopped.
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(),
sequenceId);
}
-
if (log.isDebugEnabled())
log.debug("Exit:
SequenceProcessor::processReliableMessage " + result);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]