Author: chamikara
Date: Thu Sep 14 02:32:31 2006
New Revision: 443299
URL: http://svn.apache.org/viewvc?view=rev&rev=443299
Log:
Changed the methods of the MessageProcessor interface to throw AxisFaults.
SandeshaoutHandler was changed not to pause messages with wsa:anonymous
endpoint as the wsa:to address.
This will allow at least a sync response to be done (but no retransmissions
will be possible).
A bug fix - CreateSequence should have a different replyTo from the application
message.
Some bug fixes in close sequence and terminate sequence processing logic.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Thu Sep 14 02:32:31 2006
@@ -223,7 +223,7 @@
log.debug("Exit: SandeshaGlobalInHandler::msgContext");
}
- private boolean dropIfDuplicate(RMMsgContext rmMsgContext,
StorageManager storageManager) throws SandeshaException {
+ private boolean dropIfDuplicate(RMMsgContext rmMsgContext,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
SandeshaGlobalInHandler::dropIfDuplicate");
@@ -335,7 +335,7 @@
}
private void processDroppedMessage(RMMsgContext rmMsgContext,
StorageManager storageManager)
- throws SandeshaException {
+ throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
SandeshaGlobalInHandler::processDroppedMessage");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
Thu Sep 14 02:32:31 2006
@@ -26,6 +26,7 @@
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -88,7 +89,7 @@
private static final Log log =
LogFactory.getLog(ApplicationMsgProcessor.class);
- public void processInMessage(RMMsgContext rmMsgCtx) throws
SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::processInMessage");
@@ -369,7 +370,7 @@
}
public void sendAckIfNeeded(RMMsgContext rmMsgCtx, String messagesStr,
StorageManager storageManager)
- throws SandeshaException {
+ throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::sendAckIfNeeded");
@@ -406,7 +407,7 @@
log.debug("Exit:
ApplicationMsgProcessor::sendAckIfNeeded");
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws
SandeshaException {
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::processOutMessage");
@@ -793,14 +794,23 @@
if (!dummyMessage)
processResponseMessage(rmMsgCtx, internalSequenceId,
messageNumber, storageKey, storageManager);
- msgContext.pause(); // the execution will be stopped.
-
+
+ if (!isWSAAnonymous (to)) {
+ //If message has a real to address or if it is in the
polling-mode it shoud be send by the sender or should
+ //be taken away by make connections, so pausing it.
+
+ msgContext.pause(); // the execution will be stopped.
+ }
+
+ //If to address is wsa:anonymous it wont be possible to send
the this message so, letting it go in the current thread.
+ //(it might get the the other end in the back-channel of the
request message, no retransmissions possible).
+
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::processOutMessage");
}
private void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
String sequencePropertyKey, String internalSequenceId, String acksTo,
- StorageManager storageManager) throws SandeshaException
{
+ StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
ApplicationMsgProcessor::addCreateSequenceMessage, " + internalSequenceId);
@@ -1164,5 +1174,13 @@
if (log.isDebugEnabled())
log.debug("Exit:
ApplicationMsgProcessor::setNextMsgNo");
+ }
+
+ private boolean isWSAAnonymous (String address) {
+ if (AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address)
||
+
AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address))
+ return true;
+
+ return false;
}
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
Thu Sep 14 02:32:31 2006
@@ -17,6 +17,8 @@
package org.apache.sandesha2.msgprocessors;
+import java.util.Iterator;
+
import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFactory;
@@ -54,7 +56,7 @@
private static final Log log =
LogFactory.getLog(CloseSequenceProcessor.class);
- public void processInMessage(RMMsgContext rmMsgCtx) throws
SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
CloseSequenceProcessor::processInMessage");
@@ -123,8 +125,8 @@
}
// adding the ack part to the envelope.
- SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) ackRMMsgCtx
-
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ Iterator sequenceAckIter = ackRMMsgCtx
+
.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
@@ -139,9 +141,12 @@
RMMsgContext closeSeqResponseRMMsg =
RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg,
storageManager);
-
closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
- sequenceAcknowledgement);
-
+ while (sequenceAckIter.hasNext()) {
+ SequenceAcknowledgement sequenceAcknowledgement =
(SequenceAcknowledgement) sequenceAckIter.next();
+
closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+ sequenceAcknowledgement);
+ }
+
closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
Thu Sep 14 02:32:31 2006
@@ -64,7 +64,7 @@
private static final Log log =
LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
- public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx)
throws SandeshaException {
+ public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
CreateSeqResponseMsgProcessor::processInMessage");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
Thu Sep 14 02:32:31 2006
@@ -17,6 +17,7 @@
package org.apache.sandesha2.msgprocessors;
+import org.apache.axis2.AxisFault;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
@@ -25,6 +26,6 @@
*/
public interface MsgProcessor {
- public void processInMessage(RMMsgContext rmMsgCtx) throws
SandeshaException;
- public void processOutMessage(RMMsgContext rmMsgCtx) throws
SandeshaException;
+ public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
Thu Sep 14 02:32:31 2006
@@ -72,7 +72,7 @@
private static final Log log =
LogFactory.getLog(TerminateSeqMsgProcessor.class);
- public void processInMessage(RMMsgContext terminateSeqRMMsg) throws
SandeshaException {
+ public void processInMessage(RMMsgContext terminateSeqRMMsg) throws
AxisFault {
if (log.isDebugEnabled())
log.debug("Enter:
TerminateSeqMsgProcessor::processInMessage");
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
Thu Sep 14 02:32:31 2006
@@ -18,6 +18,7 @@
package org.apache.sandesha2.msgprocessors;
import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
@@ -41,7 +42,7 @@
private static final Log log =
LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
public void processInMessage(RMMsgContext terminateResRMMsg)
- throws SandeshaException {
+ throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter:
TerminateSeqResponseMsgProcessor::processInMessage");
MessageContext msgContext =
terminateResRMMsg.getMessageContext();
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/FaultManager.java
Thu Sep 14 02:32:31 2006
@@ -415,6 +415,7 @@
// end hack
+ //TODO this fails when the in message is in only. Fault
is thrown at the InOnlyAxisOperation
MessageContext faultMsgContext =
Utils.createOutMessageContext(referenceMessage);
// setting contexts.
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
Thu Sep 14 02:32:31 2006
@@ -36,6 +36,7 @@
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -200,7 +201,7 @@
* @throws SandeshaException
*/
public static RMMsgContext createCreateSeqMsg(RMMsgContext
applicationRMMsg, String sequencePropertyKey,
- String acksTo, StorageManager storageManager) throws
SandeshaException {
+ String acksTo, StorageManager storageManager) throws
AxisFault {
MessageContext applicationMsgContext =
applicationRMMsg.getMessageContext();
if (applicationMsgContext == null)
@@ -222,7 +223,7 @@
createSeqmsgContext = SandeshaUtil
.createNewRelatedMessageContext(applicationRMMsg, createSequenceOperation);
-
+
initializeCreation(applicationMsgContext,
createSeqmsgContext);
OperationContext createSeqOpCtx =
createSeqmsgContext.getOperationContext();
@@ -249,8 +250,24 @@
createSeqmsgContext.setAxisOperation(createSeqOperation);
createSeqmsgContext.setTo(applicationRMMsg.getTo());
- createSeqmsgContext.setReplyTo(applicationRMMsg.getReplyTo());
-
+
+// createSeqmsgContext.setReplyTo(applicationRMMsg.getReplyTo());
+ //generating a new replyTo address for the
CreateSequenceMessage.
+ //Otherwise there will be errors when the app msg is InOnly.
+
+ QName axisOperationName =
createSeqmsgContext.getAxisOperation().getName();
+ TransportInDescription transportIn =
createSeqmsgContext.getTransportIn();
+ QName transportInName = null;
+ if (transportIn!=null)
+ transportInName = transportIn.getName();
+
+ EndpointReference replyTo =
context.getListenerManager().getEPRforService(
+ createSeqmsgContext.getAxisService().getName(),
+
axisOperationName!=null?axisOperationName.getLocalPart():null,
+
transportInName!=null?transportInName.getLocalPart():null);
+
+ createSeqmsgContext.setReplyTo(replyTo);
+
RMMsgContext createSeqRMMsg = new
RMMsgContext(createSeqmsgContext);
String rmVersion =
SandeshaUtil.getRMVersion(sequencePropertyKey, storageManager);
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
Thu Sep 14 02:32:31 2006
@@ -962,7 +962,7 @@
* @return
*/
- public static String getSequencePropertyKey (RMMsgContext rmMsgContext)
{
+ public static String getSequencePropertyKey (RMMsgContext rmMsgContext)
throws AxisFault {
String sequenceId = (String)
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID);
String internalSequenceId = (String)
rmMsgContext.getProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID);
@@ -979,9 +979,11 @@
propertyKey = internalSequenceId;
else
propertyKey = sequenceId;
+ } else if (flow==MessageContext.OUT_FAULT_FLOW) {
+ propertyKey = internalSequenceId;
}
- //TODO handler faults
+ //TODO handler cases not covered from above.
return propertyKey;
}
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=443299&r1=443298&r2=443299
==============================================================================
---
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Thu Sep 14 02:32:31 2006
@@ -104,10 +104,14 @@
// RMMsgCreator.addAckMessage(rmMsgCtx);
//} else
- if (isAckPiggybackableMsgType(messageType)) {
// checking weather this message can carry piggybacked acks
+
+ if (isAckPiggybackableMsgType(messageType)) { //
checking weather this message can carry piggybacked acks
// piggybacking if an ack if available for the
same
// sequence.
// TODO do piggybacking based on wsa:To
+
+
+
AcknowledgementManager.piggybackAcksIfPresent(rmMsgCtx, storageManager);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]