jaliya 2005/02/20 00:49:45
Modified: sandesha/src/org/apache/sandesha Constants.java
EnvelopeCreator.java
sandesha/src/org/apache/sandesha/client
ClientStorageManager.java RMSender.java
sandesha/src/org/apache/sandesha/server
MessageValidator.java RMInvoker.java
RMMessageProcessorIdentifier.java Sender.java
ServerStorageManager.java
sandesha/src/org/apache/sandesha/server/msgprocessors
AcknowledgementProcessor.java
CompositeProcessor.java
CreateSequenceProcessor.java FaultProcessor.java
sandesha/src/org/apache/sandesha/storage/dao
SandeshaQueueDAO.java
sandesha/src/org/apache/sandesha/storage/queue
SandeshaQueue.java
sandesha/src/org/apache/sandesha/ws/rm/providers
RMProvider.java
Log:
Modified the code to handle the faults
Revision Changes Path
1.27 +6 -0 ws-fx/sandesha/src/org/apache/sandesha/Constants.java
Index: Constants.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/Constants.java,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -r1.26 -r1.27
--- Constants.java 18 Feb 2005 13:00:33 -0000 1.26
+++ Constants.java 20 Feb 2005 08:49:45 -0000 1.27
@@ -272,8 +272,10 @@
public static final String NO_RM_HEADES="No RM Headers Available in
this Message";
+ public static final String INVALID_ACKNOWLEDGEMENT="The
SequenceAcknowledgement violates the cumulative acknowledgement invariant.";
public static final String UNKNOWN_SEQUENCE="The value of
wsrm:Identifier is not a known Sequence identifier.";
public static final String MSG_NO_ROLLOVER="The maximum value for
wsrm:MessageNumber has been exceeded.";
+ public static final String LAST_MSG_NO_EXCEEDED="The value for
wsrm:MessageNumber exceeds the value of the MessageNumber accompanying a
LastMessage element in this Sequence.";
}
@@ -283,8 +285,10 @@
public static final String IN_CORRECT_MESSAGE = "Incorrect Message";
+ public static final String
WSRM_FAULT_INVALID_ACKNOWLEDGEMENT="wsrm:InvalidAcknowledgement";
public static final String
WSRM_FAULT_UNKNOWN_SEQUENCE="wsrm:UnknownSequence";
public static final String
WSRM_FAULT_MSG_NO_ROLLOVER="wsrm:MessageNumberRollover";
+ public static final String
WSRM_FAULR_LAST_MSG_NO_EXCEEDED="wsrm:LastMessageNumberExceeded";
}
public interface ErrorMessages{
@@ -298,6 +302,8 @@
public static final double MAX_MSG_NO=18446744073709551615d;
+ public static final String UUID="uuid:";
+
}
1.16 +6 -9
ws-fx/sandesha/src/org/apache/sandesha/EnvelopeCreator.java
Index: EnvelopeCreator.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/EnvelopeCreator.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- EnvelopeCreator.java 16 Feb 2005 04:21:20 -0000 1.15
+++ EnvelopeCreator.java 20 Feb 2005 08:49:45 -0000 1.16
@@ -62,8 +62,7 @@
//Set the messageID
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- MessageID messageId = new MessageID(new URI("uuid:"
- + uuidGen.nextUUID()));
+ MessageID messageId = new MessageID(new URI(Constants.UUID+
uuidGen.nextUUID()));
outGoingAddressingHaders.setMessageID(messageId);
//Set the <wsa:From> address from the incoming <wsa:To>
@@ -90,7 +89,7 @@
//now set the body elements
CreateSequenceResponse response = new CreateSequenceResponse();
Identifier id = new Identifier();
- id.setIdentifier("uuid:" + uuid);
+ id.setIdentifier(Constants.UUID + uuid);
response.setIdentifier(id);
response.toSoapEnvelop(envelope);
} catch (MalformedURIException e) {
@@ -124,7 +123,7 @@
SOAPHeaderElement acionElement =
action.toSOAPHeaderElement(envelope, null);
outGoingAddressingHaders.setAction(action);
- MessageID messageId = new MessageID(new URI("uuid:" + uuid));
+ MessageID messageId = new MessageID(new URI(Constants.UUID+
uuid));
outGoingAddressingHaders.setMessageID(messageId);
if (endPoint == 0) {
@@ -216,8 +215,7 @@
//Set the messageID
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- MessageID messageId = new MessageID(new URI("uuid:"
- + uuidGen.nextUUID()));
+ MessageID messageId = new MessageID(new URI(Constants.UUID +
uuidGen.nextUUID()));
outGoingAddressingHaders.setMessageID(messageId);
//Set the <wsa:From> address from the incoming <wsa:To>
@@ -286,8 +284,7 @@
//Set the messageID
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- MessageID messageId = new MessageID(new URI("uuid:"
- + uuidGen.nextUUID()));
+ MessageID messageId = new MessageID(new URI(Constants.UUID +
uuidGen.nextUUID()));
outGoingAddressingHaders.setMessageID(messageId);
//Set the <wsa:From> address from the incoming <wsa:To>
@@ -442,7 +439,7 @@
outGoingAddressingHaders.setAction(terSeqAction);
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
- MessageID messageId = new MessageID(new URI("uuid:" +
uuidGen.nextUUID()));
+ MessageID messageId = new MessageID(new URI(Constants.UUID+
uuidGen.nextUUID()));
outGoingAddressingHaders.setMessageID(messageId);
outGoingAddressingHaders.setFrom(addressingHeaders.getFrom());
1.19 +6 -7
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- ClientStorageManager.java 18 Feb 2005 13:00:33 -0000 1.18
+++ ClientStorageManager.java 20 Feb 2005 08:49:45 -0000 1.19
@@ -328,9 +328,6 @@
}
public RMMessageContext checkForResponseMessage(String sequenceId,
String requestMsgId) {
- if (sequenceId == null)
- sequenceId = Constants.CLIENT_DEFAULD_SEQUENCE_ID;
-
RMMessageContext response =
accessor.checkForResponseMessage(requestMsgId, sequenceId);
return response;
@@ -406,7 +403,9 @@
* @see
org.apache.sandesha.IStorageManager#addSendMsgNo(java.lang.String, long)
*/
public void addSendMsgNo(String seqId, long msgNo) {
- accessor.addSendMsgNo(seqId,msgNo);
+
System.out.println("2222222222222222222222222222222222222222222222222222");
+ System.out.println( accessor.getSequenceOfOutSequence(seqId));
+
accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId),msgNo);
}
/* (non-Javadoc)
@@ -432,18 +431,18 @@
}
public void addRequestedSequence(String seqId) {
- //To change body of implemented methods use File | Settings | File
Templates.
+ accessor.addRequestedSequence(seqId);
}
public boolean isRequestedSeqPresent(String seqId) {
- return false; //To change body of implemented methods use File |
Settings | File Templates.
+ return accessor.isRequestedSeqPresent(seqId);
}
/* (non-Javadoc)
* @see org.apache.sandesha.IStorageManager#isSentMsg(java.lang.String,
long)
*/
public boolean isSentMsg(String seqId, long msgNo) {
- return accessor.isSentMsg(seqId,msgNo);
+ return
accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId) ,msgNo);
}
public String getOutgoingSeqOfMsg(String msgId){
1.24 +8 -6
ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java
Index: RMSender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- RMSender.java 18 Feb 2005 13:00:33 -0000 1.23
+++ RMSender.java 20 Feb 2005 08:49:45 -0000 1.24
@@ -58,8 +58,9 @@
try {
String sequenceID = requestMesssageContext.getSequenceID();
AddressingHeaders addrHeaders =
getAddressingHeaders(requestMesssageContext);
-
- if (requestMesssageContext.getMsgNumber() == 1) {
+ long msgNo=requestMesssageContext.getMsgNumber();
+
+ if (msgNo == 1) {
requestMesssageContext =
processFirstMessage(requestMesssageContext, addrHeaders,
requestMesssageContext.getSync());
} else {
@@ -76,6 +77,7 @@
//TODO Need to check for errors in the queue.
//If the queue has an error message, then need to report
it
// to client.
+
responseMessageContext =
checkTheQueueForResponse(sequenceID,
requestMesssageContext.getMessageID());
Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
@@ -224,11 +226,11 @@
//Set the tempUUID
String tempUUID = uuidGen.nextUUID();
RMMessageContext createSeqRMMsgContext =
getCreateSeqRMContext(reqRMMsgContext, addrHeaders, tempUUID);
- createSeqRMMsgContext.setMessageID("uuid:" + tempUUID);
+ createSeqRMMsgContext.setMessageID(Constants.UUID + tempUUID);
//Create a sequence first.
storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID());
-
storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(), "uuid:"
+ tempUUID);
+
storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),Constants.UUID
+ tempUUID);
//Set the processing state to the RMMessageContext
createSeqRMMsgContext.setSync(sync);
@@ -238,7 +240,7 @@
reqRMMsgContext.setOutGoingAddress(addrHeaders.getTo().toString());
reqRMMsgContext.setMsgNumber(nextMsgNumber);
reqRMMsgContext.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST);
- reqRMMsgContext.setMessageID("uuid:" + uuidGen.nextUUID());
+ reqRMMsgContext.setMessageID(Constants.UUID+ uuidGen.nextUUID());
storageManager.insertOutgoingMessage(reqRMMsgContext);
return reqRMMsgContext;
}
@@ -249,7 +251,7 @@
reqRMMsgContext.setAddressingHeaders(addrHeaders);
reqRMMsgContext.setOutGoingAddress(addrHeaders.getTo().toString());
reqRMMsgContext.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST);
- reqRMMsgContext.setMessageID("uuid:" + uuidGen.nextUUID());
+ reqRMMsgContext.setMessageID(Constants.UUID + uuidGen.nextUUID());
//Set the processing state of the RMMessageContext
reqRMMsgContext.setSync(sync);
storageManager.insertOutgoingMessage(reqRMMsgContext);
1.6 +15 -16
ws-fx/sandesha/src/org/apache/sandesha/server/MessageValidator.java
Index: MessageValidator.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/MessageValidator.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- MessageValidator.java 18 Feb 2005 13:00:34 -0000 1.5
+++ MessageValidator.java 20 Feb 2005 08:49:45 -0000 1.6
@@ -59,37 +59,36 @@
return;
throw new AxisFault(new
QName(Constants.FaultCodes.IN_CORRECT_MESSAGE),
Constants.FaultMessages.NO_RM_HEADES, null, null);
-
}
private static void validateForFaults(RMMessageContext rmMsgCtx) throws
AxisFault {
RMHeaders rmHeaders = rmMsgCtx.getRMHeaders();
Sequence sequence = rmHeaders.getSequence();
-
- // if (sequence != null) {
- // if
(!storageMgr.isSequenceExist(sequence.getIdentifier().getIdentifier()))
- // throw new AxisFault(new
QName(Constants.FaultCodes.WSRM_FAULT_UNKNOWN_SEQUENCE),
Constants.FaultMessages.UNKNOWN_SEQUENCE, null, null);
-
- // }
-
- if (rmHeaders.getSequenceAcknowledgement() != null) {
- // if
(!storageMgr.isSequenceExist(sequence.getIdentifier().getIdentifier()))
- // throw new AxisFault(new
QName(Constants.FaultCodes.WSRM_FAULT_UNKNOWN_SEQUENCE),
Constants.FaultMessages.UNKNOWN_SEQUENCE, null, null);
+ if (sequence != null) {
+ String seqId = sequence.getIdentifier().getIdentifier();
+ if (!storageMgr.isRequestedSeqPresent(seqId)){
+ System.out.println("I am Here
lllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllllll");
+ throw new AxisFault(new
QName(Constants.FaultCodes.WSRM_FAULT_UNKNOWN_SEQUENCE),
Constants.FaultMessages.UNKNOWN_SEQUENCE, null, null);
+ }
+ if (sequence.getMessageNumber() != null) {
+ long msgNo = sequence.getMessageNumber().getMessageNumber();
+ if
(storageMgr.hasLastMsgReceived(sequence.getIdentifier().getIdentifier())) {
+ long lastMsg = storageMgr.getLastMsgNo(seqId);
+ if (msgNo > lastMsg)
+ throw new AxisFault(new
QName(Constants.FaultCodes.WSRM_FAULR_LAST_MSG_NO_EXCEEDED),
Constants.FaultMessages.LAST_MSG_NO_EXCEEDED, null, null);
+ }
+ }
+ }
}
-
- }
-
private static void validateAddrHeaders(AddressingHeaders addrHeaders)
throws AxisFault {
if (addrHeaders == null) {
throw new AxisFault(new
QName(Constants.FaultCodes.IN_CORRECT_MESSAGE),
Constants.FaultMessages.NO_ADDRESSING_HEADERS, null, null);
}
-
if (addrHeaders.getMessageID() == null)
throw new AxisFault(new
QName(Constants.FaultCodes.IN_CORRECT_MESSAGE),
Constants.FaultMessages.NO_MESSAGE_ID, null, null);
}
-
}
1.11 +3 -4
ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
Index: RMInvoker.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- RMInvoker.java 16 Feb 2005 04:21:21 -0000 1.10
+++ RMInvoker.java 20 Feb 2005 08:49:45 -0000 1.11
@@ -136,9 +136,8 @@
//Posible Problem
//TODO
storageManager.setTemporaryOutSequence(rmMsgContext
- .getSequenceID(), "uuid:" + id);
- SOAPEnvelope createSequenceEnvelope =
EnvelopeCreator
- .createCreateSequenceEnvelope(id,
+ .getSequenceID(), Constants.UUID + id);
+ SOAPEnvelope createSequenceEnvelope =
EnvelopeCreator.createCreateSequenceEnvelope(id,
rmMsgContext, Constants.SERVER);
rmMsgContext.getMsgContext().setRequestMessage(new
Message(createSequenceEnvelope));
@@ -148,7 +147,7 @@
.getAddressingHeaders().getReplyTo()
.getAddress().toString());
- rmMsgContext.setMessageID("uuid:" + id);
+ rmMsgContext.setMessageID(Constants.UUID + id);
storageManager
.addCreateSequenceRequest(rmMsgContext);
1.11 +1 -0
ws-fx/sandesha/src/org/apache/sandesha/server/RMMessageProcessorIdentifier.java
Index: RMMessageProcessorIdentifier.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMMessageProcessorIdentifier.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- RMMessageProcessorIdentifier.java 16 Feb 2005 04:21:21 -0000 1.10
+++ RMMessageProcessorIdentifier.java 20 Feb 2005 08:49:45 -0000 1.11
@@ -42,6 +42,7 @@
AddressingHeaders addrHeaders =
rmMessageContext.getAddressingHeaders();
RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
+
if
(addrHeaders.getAction().toString().equals(Constants.ACTION_CREATE_SEQUENCE)) {
return new CreateSequenceProcessor(storageManager);
} else if
(addrHeaders.getAction().toString().equals(Constants.ACTION_CREATE_SEQUENCE_RESPONSE))
{
1.22 +6 -1 ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
Index: Sender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- Sender.java 16 Feb 2005 04:21:21 -0000 1.21
+++ Sender.java 20 Feb 2005 08:49:45 -0000 1.22
@@ -217,6 +217,8 @@
.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
//We are not expecting the ack over the
// same HTTP connection.
+
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
+
call.invoke();
//System.out.println(call.getResponseMessage().getSOAPPartAsString());
@@ -355,7 +357,9 @@
Call call;
try {
call = prepareCall(rmMessageContext);
- //Send the createSequnceRequest Synchronously
+
+ //CHECK THIS
+
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
call.invoke();
if (call.getResponseMessage() != null) {
System.out.println("RESPONSE MESSAGE IS NOT NULL");
@@ -387,6 +391,7 @@
try {
Call call = prepareCall(rmMessageContext);
//Send the createSequnceRequest Asynchronously.
+
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),rmMessageContext.getMsgNumber());
call.invoke();
} catch (AxisFault e) {
System.err.println("ERROR: SENDING REQUEST ....");
1.19 +3 -3
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- ServerStorageManager.java 18 Feb 2005 13:00:34 -0000 1.18
+++ ServerStorageManager.java 20 Feb 2005 08:49:45 -0000 1.19
@@ -378,7 +378,7 @@
* @see
org.apache.sandesha.IStorageManager#addSendMsgNo(java.lang.String, long)
*/
public void addSendMsgNo(String seqId, long msgNo) {
- // TODO Auto-generated method stub
+
accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId),msgNo);
}
/* (non-Javadoc)
@@ -392,8 +392,8 @@
* @see org.apache.sandesha.IStorageManager#isSentMsg(java.lang.String,
long)
*/
public boolean isSentMsg(String seqId, long msgNo) {
- // TODO Auto-generated method stub
- return false;
+ return
accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId) ,msgNo);
+
}
1.3 +11 -9
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java
Index: AcknowledgementProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- AcknowledgementProcessor.java 18 Feb 2005 06:57:17 -0000 1.2
+++ AcknowledgementProcessor.java 20 Feb 2005 08:49:45 -0000 1.3
@@ -34,15 +34,14 @@
* @author
*/
public final class AcknowledgementProcessor implements IRMMessageProcessor {
- private IStorageManager storageManger = null;
+ private IStorageManager storageManager = null;
- public AcknowledgementProcessor(IStorageManager storageManger) {
- this.storageManger = storageManger;
+ public AcknowledgementProcessor(IStorageManager storageManager) {
+ this.storageManager = storageManager;
}
public final boolean processMessage(RMMessageContext rmMessageContext)
throws AxisFault {
- SequenceAcknowledgement seqAcknowledgement = rmMessageContext
- .getRMHeaders().getSequenceAcknowledgement();
+ SequenceAcknowledgement seqAcknowledgement =
rmMessageContext.getRMHeaders().getSequenceAcknowledgement();
String seqID = seqAcknowledgement.getIdentifier().getIdentifier();
List ackRanges = seqAcknowledgement.getAckRanges();
Iterator ite = ackRanges.iterator();
@@ -51,8 +50,11 @@
AcknowledgementRange ackRange = (AcknowledgementRange)
ite.next();
long msgNumber = ackRange.getMinValue();
while (ackRange.getMaxValue() >= msgNumber) {
- storageManger.setAckReceived(seqID, msgNumber);
- storageManger.setAcknowledged(seqID, msgNumber);
+ if(!storageManager.isSentMsg(seqID, msgNumber)){
+ throw new AxisFault(new
javax.xml.namespace.QName(Constants.FaultCodes.WSRM_FAULT_INVALID_ACKNOWLEDGEMENT),
Constants.FaultMessages.INVALID_ACKNOWLEDGEMENT, null, null);
+ }
+ storageManager.setAckReceived(seqID, msgNumber);
+ storageManager.setAcknowledged(seqID, msgNumber);
msgNumber++;
}
}
@@ -70,7 +72,7 @@
long messageNumber = rmMessageContext.getRMHeaders().getSequence()
.getMessageNumber().getMessageNumber();
//Assume that the list is sorted and in the ascending order.
- Map listOfMsgNumbers = storageManger.getListOfMessageNumbers(seqID);
+ Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seqID);
if (null == listOfMsgNumbers)
System.out.println("MSG Number list is NULL");
@@ -114,7 +116,7 @@
//Store the asynchronize ack in the queue.
//The name for this queue is not yet fixed.
//RENAME insertAcknowledgement(rmMessageContext)
- storageManger.addAcknowledgement(rmMsgContext);
+ storageManager.addAcknowledgement(rmMsgContext);
return false;
}
1.4 +18 -8
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/CompositeProcessor.java
Index: CompositeProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/CompositeProcessor.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- CompositeProcessor.java 18 Feb 2005 13:00:34 -0000 1.3
+++ CompositeProcessor.java 20 Feb 2005 08:49:45 -0000 1.4
@@ -19,6 +19,8 @@
import org.apache.axis.Message;
import org.apache.axis.MessageContext;
import org.apache.axis.AxisFault;
+import org.apache.axis.message.addressing.AddressingHeaders;
+import org.apache.axis.message.addressing.RelatesTo;
import org.apache.sandesha.Constants;
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMException;
@@ -30,10 +32,10 @@
*/
public class CompositeProcessor implements IRMMessageProcessor {
- IStorageManager storageManger = null;
+ IStorageManager storageManager = null;
public CompositeProcessor(IStorageManager storageManger) {
- this.storageManger = storageManger;
+ this.storageManager = storageManger;
}
public boolean processMessage(RMMessageContext rmMessageContext) throws
AxisFault {
@@ -42,7 +44,8 @@
//if the message has a body then insert it to the queue
RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
- AcknowledgementProcessor ackProcessor = new
AcknowledgementProcessor(this.storageManger);
+ AddressingHeaders addrHeaders
=rmMessageContext.getAddressingHeaders();
+ AcknowledgementProcessor ackProcessor = new
AcknowledgementProcessor(this.storageManager);
if (rmHeaders.getSequenceAcknowledgement() != null) {
ackProcessor.processMessage(rmMessageContext);
}
@@ -56,18 +59,25 @@
// System.out.println("EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
"+storageManger.isMessageExist(sequenceID, messageNumber));
- String seqId =
storageManger.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext);
+ String seqId =
storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext);
+ boolean hasSequence = storageManager.isSequenceExist(seqId);
+
- boolean hasSequence = storageManger.isSequenceExist(seqId);
+ if(addrHeaders.getRelatesTo()!=null &&
!addrHeaders.getRelatesTo().isEmpty()){
+ RelatesTo relatesTo = (RelatesTo)
addrHeaders.getRelatesTo().get(0);
+ String messageId = relatesTo.getURI().toString();
+ seqId=storageManager.getOutgoingSeqOfMsg(messageId);
+
System.out.println("(((((((99999999999999999999999999999999999999 "+seqId);
+ }
if(!hasSequence){
- storageManger.addIncomingSequence(seqId);
+ storageManager.addIncomingSequence(seqId);
}
- if (storageManger.isMessageExist(seqId, messageNumber) !=
true) {
+ if (storageManager.isMessageExist(seqId, messageNumber) !=
true) {
//Create a copy of the RMMessageContext.
RMMessageContext rmMsgContext = new RMMessageContext();
//Copy the RMMEssageContext
@@ -104,7 +114,7 @@
System.out.println("INFO: Inserting the request message
....\n");
//Insert the message to the INQUEUE
- storageManger.insertIncomingMessage(rmMsgContext);
+ storageManager.insertIncomingMessage(rmMsgContext);
}
//Send an Ack for every message received by the server.
1.5 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/CreateSequenceProcessor.java
Index: CreateSequenceProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/CreateSequenceProcessor.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- CreateSequenceProcessor.java 18 Feb 2005 13:00:34 -0000 1.4
+++ CreateSequenceProcessor.java 20 Feb 2005 08:49:45 -0000 1.5
@@ -65,7 +65,7 @@
UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
String uuid = uuidGen.nextUUID();
- storageManager.addRequestedSequence(uuid);
+
storageManager.addRequestedSequence(org.apache.sandesha.Constants.UUID+uuid);
SOAPEnvelope resEnvelope =
EnvelopeCreator.createCreateSequenceResponseEnvelope(uuid, rmMessageContext);
rmMessageContext.setMessageType(org.apache.sandesha.Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE);
1.4 +11 -0
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/FaultProcessor.java
Index: FaultProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/FaultProcessor.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- FaultProcessor.java 18 Feb 2005 13:00:34 -0000 1.3
+++ FaultProcessor.java 20 Feb 2005 08:49:45 -0000 1.4
@@ -64,10 +64,21 @@
//return false
SOAPFault soapFault = null;
+ /*
if
(Constants.FaultCodes.IN_CORRECT_MESSAGE.equalsIgnoreCase(axisFault.getFaultCode().getLocalPart())){
soapFault = new SOAPFault(this.axisFault);
}
+ if
(Constants.FaultCodes.WSRM_FAULT_UNKNOWN_SEQUENCE.equalsIgnoreCase(axisFault.getFaultCode().getLocalPart())){
+ soapFault = new SOAPFault(this.axisFault);
+ }
+
+ if
(Constants.FaultCodes.WSRM_FAULR_LAST_MSG_NO_EXCEEDED.equalsIgnoreCase(axisFault.getFaultCode().getLocalPart())){
+ soapFault = new SOAPFault(this.axisFault);
+ }
+ */
+
+ soapFault = new SOAPFault(this.axisFault);
return sendFault(rmMessageContext, soapFault);
1.4 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
Index: SandeshaQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SandeshaQueueDAO.java 18 Feb 2005 13:00:34 -0000 1.3
+++ SandeshaQueueDAO.java 20 Feb 2005 08:49:45 -0000 1.4
@@ -469,7 +469,7 @@
public boolean hasLastMsgReceived(String seqId){
SandeshaQueue sq = SandeshaQueue.getInstance();
- return sq.hasLastMsgReceived(seqId);
+ return sq.hasLastMsgReceived(seqId);
}
public long getLastMsgNo(String seqId){
1.4 +6 -3
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SandeshaQueue.java 18 Feb 2005 13:00:34 -0000 1.3
+++ SandeshaQueue.java 20 Feb 2005 08:49:45 -0000 1.4
@@ -875,9 +875,9 @@
public void addSendMsgNo(String seqId,long msgNo){
OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
-
+
System.out.println("5555555555555555555555555555555555555555555555555555555555555555");
if (rsh == null) {
- System.out.println("ERROR: SEQ IS NULL");
+ System.out.println("ERROR: SEQ IS NULL "+seqId+" "+msgNo);
}
synchronized(rsh){
@@ -897,11 +897,13 @@
}
}
- public boolean hasLastMsgReceived(String seqId){
+ public boolean hasLastMsgReceived(String seqId) {
+
OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
if (rsh == null) {
System.out.println("ERROR: SEQ IS NULL");
+
}
synchronized(rsh){
@@ -914,6 +916,7 @@
if (rsh == null) {
System.out.println("ERROR: SEQ IS NULL");
+
}
synchronized(rsh){
1.33 +2 -1
ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java
Index: RMProvider.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -r1.32 -r1.33
--- RMProvider.java 18 Feb 2005 13:00:34 -0000 1.32
+++ RMProvider.java 20 Feb 2005 08:49:45 -0000 1.33
@@ -60,7 +60,7 @@
IStorageManager storageManager = RMInitiator.init(client);
storageManager.init();
-
+
RMMessageContext rmMessageContext = new RMMessageContext();
rmMessageContext.setMsgContext(msgContext);
try {
@@ -75,6 +75,7 @@
return;
}
+ System.out.println("VALIDATION IS PASSED
.................................................");
RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
AddressingHeaders addrHeaders =
rmMessageContext.getAddressingHeaders();