jaliya 2005/02/25 05:41:48
Modified: sandesha/src/org/apache/sandesha IStorageManager.java
RMInitiator.java
sandesha/src/org/apache/sandesha/client
ClientPropertyValidator.java
ClientStorageManager.java RMSender.java
sandesha/src/org/apache/sandesha/server RMInvoker.java
Sender.java ServerStorageManager.java
sandesha/src/org/apache/sandesha/server/msgprocessors
AcknowledgementProcessor.java
TerminateSequenceProcessor.java
sandesha/src/org/apache/sandesha/storage/dao
ISandeshaDAO.java SandeshaDatabaseDAO.java
SandeshaQueueDAO.java
sandesha/src/org/apache/sandesha/storage/queue
IncomingSequence.java OutgoingSequence.java
SandeshaQueue.java
sandesha/src/org/apache/sandesha/util RMMessageCreator.java
sandesha/src/org/apache/sandesha/ws/rm/providers
RMProvider.java
Added: sandesha/src/org/apache/sandesha RMReport.java RMStatus.java
Log:
Form the functionality wise the code is now complete. The only changes that
needs to be done is to make the required minor changes to Sandesha according to
the latest WS-RM spec released
Revision Changes Path
1.17 +6 -0
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
Index: IStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- IStorageManager.java 20 Feb 2005 17:42:38 -0000 1.16
+++ IStorageManager.java 25 Feb 2005 13:41:45 -0000 1.17
@@ -151,4 +151,10 @@
public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg);
+ public void setTerminateSend(String seqId);
+
+ public void setTerminateReceived(String seqId);
+
+ public String getKeyFromOutgoingSeqId(String seqId);
+
}
\ No newline at end of file
1.5 +15 -14 ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java
Index: RMInitiator.java
===================================================================
RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- RMInitiator.java 24 Feb 2005 13:51:13 -0000 1.4
+++ RMInitiator.java 25 Feb 2005 13:41:45 -0000 1.5
@@ -26,7 +26,7 @@
import org.apache.sandesha.server.Sender;
import org.apache.sandesha.server.ServerStorageManager;
import org.apache.sandesha.ws.rm.providers.RMProvider;
-import org.apache.util.PropertyLoader;
+import org.apache.sandesha.util.PropertyLoader;
import org.w3c.dom.Document;
import javax.xml.namespace.QName;
@@ -97,23 +97,24 @@
//This should check whether we have received all the acks or
reponses if any
IStorageManager storageManager = new ClientStorageManager();
+ storageManager.isAllSequenceComplete();
-// while(!storageManager.isAllSequenceComplete()){
-// try {
-// System.out.println("Checking to stop the
client......................");
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-// e.printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
-// }
-// }
+ while(!storageManager.isAllSequenceComplete()){
+ try {
+ System.out.println("Checking to stop the
client......................");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
+ }
+ }
if (listenerStarted)
sas.stop();
- try {
- Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
- }
+// try {
+// Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
+// } catch (InterruptedException e) {
+// e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
+// }
sender.setRunning(false);
return new RMStatus();
1.1 ws-fx/sandesha/src/org/apache/sandesha/RMReport.java
Index: RMReport.java
===================================================================
package org.apache.sandesha;
public class RMReport {
public boolean isAllAcked(){
return false;
}
public int getNumberOfReturnMessages(){
return 0;
}
}
1.1 ws-fx/sandesha/src/org/apache/sandesha/RMStatus.java
Index: RMStatus.java
===================================================================
package org.apache.sandesha;
public class RMStatus {
public boolean isComplete(){
return false;
}
public RMReport getReport(){
return new RMReport();
}
}
1.11 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java
Index: ClientPropertyValidator.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- ClientPropertyValidator.java 24 Feb 2005 13:51:13 -0000 1.10
+++ ClientPropertyValidator.java 25 Feb 2005 13:41:46 -0000 1.11
@@ -20,7 +20,7 @@
import org.apache.axis.client.Call;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
-import org.apache.util.PropertyLoader;
+import org.apache.sandesha.util.PropertyLoader;
import javax.xml.namespace.QName;
import java.net.InetAddress;
1.23 +20 -7
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- ClientStorageManager.java 24 Feb 2005 13:51:13 -0000 1.22
+++ ClientStorageManager.java 25 Feb 2005 13:41:46 -0000 1.23
@@ -274,13 +274,15 @@
* @see org.apache.sandesha.IStorageManager#isAllSequenceComplete()
*/
public boolean isAllSequenceComplete() {
- boolean result=false;
- Iterator ite=accessor.getAllOutgoingSequences();
- while(ite.hasNext()){
- result=isAckComplete((String)ite.next());
- }
- return result;
+ boolean outTerminateSent = accessor.isAllOutgoingTerminateSent();
+
+ boolean incomingTerminateReceived =
accessor.isAllIncommingTerminateReceived();
+
+ if(outTerminateSent && incomingTerminateReceived)
+ return true;
+ else
+ return false;
}
/* (non-Javadoc)
@@ -373,5 +375,16 @@
return accessor.searchForSequenceId(msgId);
}
-
+ public void setTerminateSend(String seqId) {
+ accessor.setTerminateSend(seqId);
+ }
+
+ public void setTerminateReceived(String seqId) {
+ accessor.setTerminateReceived(seqId);
+ }
+
+ public String getKeyFromOutgoingSeqId(String seqId){
+ return accessor.getKeyFromOutgoingSequenceId(seqId);
+ }
+
}
\ No newline at end of file
1.27 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java
Index: RMSender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -r1.26 -r1.27
--- RMSender.java 24 Feb 2005 13:51:13 -0000 1.26
+++ RMSender.java 25 Feb 2005 13:41:46 -0000 1.27
@@ -22,7 +22,7 @@
import org.apache.sandesha.Constants;
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMMessageContext;
-import org.apache.util.RMMessageCreator;
+import org.apache.sandesha.util.RMMessageCreator;
public class RMSender extends BasicHandler {
1.13 +23 -17
ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
Index: RMInvoker.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- RMInvoker.java 21 Feb 2005 12:08:21 -0000 1.12
+++ RMInvoker.java 25 Feb 2005 13:41:46 -0000 1.13
@@ -20,11 +20,13 @@
import org.apache.axis.MessageContext;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
+import org.apache.axis.message.addressing.AddressingHeaders;
import org.apache.axis.providers.java.RPCProvider;
import org.apache.sandesha.Constants;
import org.apache.sandesha.EnvelopeCreator;
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.util.RMMessageCreator;
import javax.xml.soap.SOAPEnvelope;
@@ -71,15 +73,28 @@
// runtime.
RPCProvider rpcProvider = new RPCProvider();
- if (rmMessageContext.isLastMessage()) {
- //Insert Terminate Sequnce.
-
storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(rmMessageContext));
- }
+
rpcProvider.invoke(rmMessageContext.getMsgContext());
//Check whether we have an output (response) or not.
if
(rmMessageContext.getMsgContext().getOperation().getMethod().getReturnType() !=
Void.TYPE) {
+ if (rmMessageContext.isLastMessage()) {
+ //Insert Terminate Sequnce.
+ AddressingHeaders addrHeaders =
rmMessageContext.getAddressingHeaders();
+
+ if (addrHeaders.getReplyTo() != null) {
+ String replyTo =
addrHeaders.getReplyTo().getAddress().toString();
+
+ RMMessageContext terminateMsg=
RMMessageCreator.createTerminateSeqMsg(rmMessageContext);
+ terminateMsg.setOutGoingAddress(replyTo);
+
storageManager.insertTerminateSeqMessage(terminateMsg);
+ }else{
+ System.out.println("SERVER ERROR , CANNOT
SEND THE TERMINTATION");
+ //TODO LOG THE ERROR
+ }
+
+ }
//System.out
// .println("STORING THE RESPONSE
MESSAGE.....\n");
//Store the message in the response queue.
@@ -114,6 +129,8 @@
if (firstMsgOfResponseSeq) {
// System.out.println("NO RESPONSE SEQUENCE");
+
+
RMMessageContext rmMsgContext = new
RMMessageContext();
rmMessageContext.copyContents(rmMsgContext);
@@ -137,7 +154,7 @@
storageManager.setTemporaryOutSequence(rmMsgContext
.getSequenceID(), Constants.UUID + id);
SOAPEnvelope createSequenceEnvelope =
EnvelopeCreator.createCreateSequenceEnvelope(id,
- rmMsgContext, Constants.SERVER);
+ rmMsgContext, Constants.SERVER);
rmMsgContext.getMsgContext().setRequestMessage(new
Message(createSequenceEnvelope));
@@ -172,16 +189,5 @@
}
- private RMMessageContext getTerminateSeqMessage(RMMessageContext
rmMessageContext) {
- RMMessageContext terSeqRMMsgContext = new RMMessageContext();
- MessageContext terSeqMsgContext = new
MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
- terSeqRMMsgContext.setSequenceID(rmMessageContext.getSequenceID());
-
terSeqRMMsgContext.setAddressingHeaders(rmMessageContext.getAddressingHeaders());
- //RMMessageContext.copyMessageContext(msgContext, messageContext);
-
terSeqRMMsgContext.setOutGoingAddress(rmMessageContext.getOutGoingAddress());
- terSeqRMMsgContext.setMsgContext(terSeqMsgContext);
-
terSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_TERMINATE_SEQUENCE);
- // TODO Auto-generated method stub
- return terSeqRMMsgContext;
- }
+
}
\ No newline at end of file
1.24 +1 -0 ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
Index: Sender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- Sender.java 24 Feb 2005 13:51:13 -0000 1.23
+++ Sender.java 25 Feb 2005 13:41:46 -0000 1.24
@@ -265,6 +265,7 @@
{
System.out.println("INFO: SENDING TERMINATE SEQUENCE
REQUEST ....");
sendTerminateSequenceRequest(rmMessageContext);
+
storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID()));
break;
}
case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
1.21 +15 -1
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- ServerStorageManager.java 20 Feb 2005 17:42:38 -0000 1.20
+++ ServerStorageManager.java 25 Feb 2005 13:41:46 -0000 1.21
@@ -37,6 +37,14 @@
public class ServerStorageManager implements IStorageManager {
+ public void setTerminateSend(String seqId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void setTerminateReceived(String seqId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
protected static Log log = LogFactory.getLog(ServerStorageManager.class
.getName());
private String tempSeqId = null; // used by getNextMessageToProcess();
@@ -74,7 +82,8 @@
public void setAcknowledged(String seqID, long msgNumber) {
//TODO decide this in implementing the ServerSender.
- accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
+ //accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
+ accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
}
public void init() {
@@ -103,6 +112,8 @@
msg = accessor.getNextPriorityMessageContextToSend();
if (msg == null)
msg = accessor.getNextOutgoingMsgContextToSend();
+ if (msg == null)
+ msg = accessor.getNextLowPriorityMessageContextToSend();
return msg;
}
@@ -383,4 +394,7 @@
return accessor.hasLastIncomingMsgReceived(seqId);
}
+ public String getKeyFromOutgoingSeqId(String seqId) {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
}
\ No newline at end of file
1.4 +5 -7
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java
Index: AcknowledgementProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- AcknowledgementProcessor.java 20 Feb 2005 08:49:45 -0000 1.3
+++ AcknowledgementProcessor.java 25 Feb 2005 13:41:46 -0000 1.4
@@ -69,16 +69,15 @@
//else set the response env of the messageContext.
String seqID = rmMessageContext.getSequenceID();
- long messageNumber = rmMessageContext.getRMHeaders().getSequence()
- .getMessageNumber().getMessageNumber();
+ long messageNumber = rmMessageContext.getRMHeaders().getSequence()
.getMessageNumber().getMessageNumber();
//Assume that the list is sorted and in the ascending order.
Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seqID);
if (null == listOfMsgNumbers)
System.out.println("MSG Number list is NULL");
- else {
- Iterator ite = listOfMsgNumbers.keySet().iterator();
- }
+// else {
+// Iterator ite = listOfMsgNumbers.keySet().iterator();
+// }
Vector ackRangeVector = null;
if (listOfMsgNumbers != null) {
@@ -124,8 +123,7 @@
private static RMMessageContext getAckRMMsgCtx(RMMessageContext
rmMessageContext, Vector ackRangeVector) {
- SOAPEnvelope ackEnvelope = EnvelopeCreator
- .createAcknowledgementEnvelope(rmMessageContext,
ackRangeVector);
+ SOAPEnvelope ackEnvelope =
EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, ackRangeVector);
//Add the envelope to the response message of the messageContext.
//rmMessageContext.getMsgContext().setResponseMessage(new
// Message(ackEnvelope));
1.3 +3 -0
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/TerminateSequenceProcessor.java
Index: TerminateSequenceProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/TerminateSequenceProcessor.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TerminateSequenceProcessor.java 18 Feb 2005 06:57:17 -0000 1.2
+++ TerminateSequenceProcessor.java 25 Feb 2005 13:41:46 -0000 1.3
@@ -39,6 +39,7 @@
if (terminateSeq != null && terminateSeq.getIdentifier() != null) {
String seqID = terminateSeq.getIdentifier().getIdentifier();
+ storageManger.setTerminateReceived(seqID);
}
@@ -50,4 +51,6 @@
return false;
}
+
+
}
\ No newline at end of file
1.6 +7 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
Index: ISandeshaDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ISandeshaDAO.java 24 Feb 2005 13:51:14 -0000 1.5
+++ ISandeshaDAO.java 25 Feb 2005 13:41:46 -0000 1.6
@@ -113,5 +113,12 @@
public Iterator getAllOutgoingSequences();
+ public void setTerminateSend (String seqId);
+
+ public void setTerminateReceived (String seqId);
+
+ public boolean isAllOutgoingTerminateSent();
+
+ public boolean isAllIncommingTerminateReceived();
}
\ No newline at end of file
1.6 +25 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
Index: SandeshaDatabaseDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SandeshaDatabaseDAO.java 24 Feb 2005 13:51:14 -0000 1.5
+++ SandeshaDatabaseDAO.java 25 Feb 2005 13:41:46 -0000 1.6
@@ -44,6 +44,14 @@
// TODO Auto-generated method stub
}
+ public boolean isOutgoingTerminateSent() {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public boolean isIncommingTerminateReceived() {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
public void addRequestedSequence(String seqId) {
//To change body of implemented methods use File | Settings | File
Templates.
}
@@ -391,4 +399,21 @@
public Iterator getAllOutgoingSequences() {
return null; //To change body of implemented methods use File |
Settings | File Templates.
}
+
+ public void setTerminateSend(String seqId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void setTerminateReceived(String seqId) {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public boolean isAllOutgoingTerminateSent() {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public boolean isAllIncommingTerminateReceived() {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
}
\ No newline at end of file
1.7 +21 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
Index: SandeshaQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SandeshaQueueDAO.java 24 Feb 2005 13:51:14 -0000 1.6
+++ SandeshaQueueDAO.java 25 Feb 2005 13:41:46 -0000 1.7
@@ -449,4 +449,25 @@
SandeshaQueue sq=SandeshaQueue.getInstance();
return sq.getAllOutgoingSequences();
}
+
+ public boolean isAllOutgoingTerminateSent() {
+ SandeshaQueue sq=SandeshaQueue.getInstance();
+ return sq.isAllOutgoingTerminateSent();
+ }
+
+ public boolean isAllIncommingTerminateReceived() {
+ SandeshaQueue sq=SandeshaQueue.getInstance();
+ return sq.isAllIncommingTerminateReceived();
+ }
+
+ public void setTerminateSend(String seqId) {
+ SandeshaQueue sq=SandeshaQueue.getInstance();
+ sq.setTerminateSend(seqId);
+ }
+
+ public void setTerminateReceived(String seqId) {
+ SandeshaQueue sq=SandeshaQueue.getInstance();
+ sq.setTerminateReceived(seqId);
+ }
+
}
1.3 +12 -1
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
Index: IncomingSequence.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- IncomingSequence.java 20 Feb 2005 17:42:38 -0000 1.2
+++ IncomingSequence.java 25 Feb 2005 13:41:47 -0000 1.3
@@ -42,7 +42,18 @@
private HashMap hash;
private boolean beingProcessedLock = false; //When true messages are
private long lastMsgNo = -1;
-
+
+
+ private boolean terminateReceived = false;
+
+ public boolean isTerminateReceived() {
+ return terminateReceived;
+ }
+
+ public void setTerminateReceived(boolean terminateReceived) {
+ this.terminateReceived = terminateReceived;
+ }
+
private static final Log log =
LogFactory.getLog(IncomingSequence.class.getName());
public IncomingSequence(String sequenceId) {
1.3 +19 -1
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java
Index: OutgoingSequence.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- OutgoingSequence.java 20 Feb 2005 17:42:38 -0000 1.2
+++ OutgoingSequence.java 25 Feb 2005 13:41:47 -0000 1.3
@@ -47,6 +47,24 @@
private long lastMsgNo = -1;
private long nextAutoNumber; // key for storing messages.
private static final Log log =
LogFactory.getLog(OutgoingSequence.class.getName());
+ public boolean terminateSent = false;
+ private boolean hasResponse = false;
+
+ public boolean hasResponse() {
+ return hasResponse;
+ }
+
+ public void setHasResponse(boolean hasResponse) {
+ this.hasResponse = hasResponse;
+ }
+
+ public boolean isTerminateSent() {
+ return terminateSent;
+ }
+
+ public void setTerminateSent(boolean terminateSent) {
+ this.terminateSent = terminateSent;
+ }
public OutgoingSequence(String sequenceId) {
this.sequenceId = sequenceId;
@@ -268,7 +286,7 @@
public boolean isAckComplete() {
try {
- long lastMsgNo = getLastMessage();
+ long lastMsgNo = getLastMsgNumber();
if (lastMsgNo <= 0) {
return false;
}
1.7 +73 -21
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SandeshaQueue.java 24 Feb 2005 13:51:14 -0000 1.6
+++ SandeshaQueue.java 25 Feb 2005 13:41:47 -0000 1.7
@@ -66,12 +66,12 @@
* This will not replace messages automatically.
*/
- public Iterator getAllOutgoingSequences(){
+ public Iterator getAllOutgoingSequences() {
return outgoingMap.keySet().iterator();
}
public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
- RMMessageContext msgCon) throws QueueException {
+ RMMessageContext msgCon)
throws QueueException {
boolean successful = false;
if (seqId == null || msgCon == null)
@@ -124,6 +124,9 @@
if (msgCon.isLastMessage())
resSeqHash.setLastMsg(msgCon.getMsgNumber());
+ if (msgCon.isHasResponse())
+ resSeqHash.setHasResponse(true);
+
}
}
return successful;
@@ -241,6 +244,7 @@
synchronized (highPriorityQueue) {
if (msg == null)
throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
+
highPriorityQueue.add(msg);
}
}
@@ -253,6 +257,7 @@
}
}
+
public RMMessageContext nextPriorityMessageToSend() throws
QueueException {
synchronized (highPriorityQueue) {
@@ -268,22 +273,22 @@
RMMessageContext tempMsg = (RMMessageContext)
highPriorityQueue.get(i);
if (tempMsg != null) {
switch (tempMsg.getMessageType()) {
- //Create seq messages will not be removed.
- case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
- long lastSentTime = tempMsg.getLastSentTime();
- Date d = new Date();
- long currentTime = d.getTime();
- if (currentTime >= lastSentTime +
Constants.RETRANSMISSION_INTERVAL) {
- tempMsg.setLastSentTime(currentTime);
+ //Create seq messages will not be removed.
+ case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
+ long lastSentTime =
tempMsg.getLastSentTime();
+ Date d = new Date();
+ long currentTime = d.getTime();
+ if (currentTime >= lastSentTime +
Constants.RETRANSMISSION_INTERVAL) {
+ tempMsg.setLastSentTime(currentTime);
+ msg = tempMsg;
+ break forLoop;
+ }
+ break;
+ default:
+ highPriorityQueue.remove(i);
+ queueBin.put(tempMsg.getMessageID(),
tempMsg);
msg = tempMsg;
break forLoop;
- }
- break;
- default:
- highPriorityQueue.remove(i);
- queueBin.put(tempMsg.getMessageID(), tempMsg);
- msg = tempMsg;
- break forLoop;
}
}
}
@@ -630,7 +635,7 @@
while (temp <= ackRng.getMaxValue()) {
Long lng = new Long(temp);
if (!msgNumbers.contains(lng)) //vector cant hv duplicate
- // entries.
+ // entries.
msgNumbers.add(new Long(temp));
temp++;
}
@@ -664,7 +669,7 @@
OutgoingSequence hash = (OutgoingSequence) obj;
boolean hasMsg = hash.hasMessageWithId(requestMsgID);
if (!hasMsg)
- //set the property response received
+ //set the property response received
hash.setResponseReceived(requestMsgID);
}
}
@@ -816,8 +821,9 @@
public String getKeyFromOutgoingSequenceId(String seqId) {
Iterator it = outgoingMap.keySet().iterator();
+ String key = null;
while (it.hasNext()) {
- String key = (String) it.next();
+ key = (String) it.next();
OutgoingSequence os = (OutgoingSequence) outgoingMap.get(key);
String seq = os.getSequenceId();
@@ -825,9 +831,55 @@
continue;
if (seq.equals(seqId))
- return key;
+ break;
+
}
- return null;
+ return key;
+ }
+
+ public boolean isAllOutgoingTerminateSent() {
+ synchronized (outgoingMap) {
+ Iterator keys = outgoingMap.keySet().iterator();
+
+ while (keys.hasNext()) {
+ OutgoingSequence ogs = (OutgoingSequence)
outgoingMap.get(keys.next());
+ if (!ogs.isTerminateSent())
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ public boolean isAllIncommingTerminateReceived() {
+ synchronized (incomingMap) {
+ Iterator keys = incomingMap.keySet().iterator();
+
+ while (keys.hasNext()) {
+ Object key = keys.next();
+ IncomingSequence ics = (IncomingSequence)
incomingMap.get(key);
+ OutgoingSequence ogs = (OutgoingSequence)
outgoingMap.get(key);
+
+ boolean hasResponse = ogs.hasResponse();
+
+ if (hasResponse && !ics.isTerminateReceived())
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ public void setTerminateSend(String seqId) {
+ OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
+ ogs.setTerminateSent(true);
}
+
+ public void setTerminateReceived(String seqId) {
+ IncomingSequence ics = (IncomingSequence)
incomingMap.get(getKeyFromIncomingSequenceId(seqId));
+ ics.setTerminateReceived(true);
+ }
+
+
}
1.2 +7 -2
ws-fx/sandesha/src/org/apache/sandesha/util/RMMessageCreator.java
Index: RMMessageCreator.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/util/RMMessageCreator.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RMMessageCreator.java 25 Feb 2005 02:53:07 -0000 1.1
+++ RMMessageCreator.java 25 Feb 2005 13:41:47 -0000 1.2
@@ -14,6 +14,8 @@
import org.apache.sandesha.EnvelopeCreator;
import org.apache.sandesha.client.ClientPropertyValidator;
+import java.util.Vector;
+
/**
* Created by IntelliJ IDEA.
* User: Jaliya
@@ -47,8 +49,7 @@
//Set the outgoing address these need to be corrected.
createSeqRMMsgContext.setOutGoingAddress(toAddress);
- SOAPEnvelope resEnvelope =
EnvelopeCreator.createCreateSequenceEnvelope(uuid,
- createSeqRMMsgContext, Constants.CLIENT);
+ SOAPEnvelope resEnvelope =
EnvelopeCreator.createCreateSequenceEnvelope(uuid,createSeqRMMsgContext,
Constants.CLIENT);
MessageContext createSeqMsgContext = new
MessageContext(msgContext.getAxisEngine());
//This should be a clone operation.
@@ -77,6 +78,10 @@
}
public static RMMessageContext createAcknowledgementMsg(RMMessageContext
rmMsgCtx) throws Exception {
+
+
+
+
return new RMMessageContext();
}
1.35 +5 -4
ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java
Index: RMProvider.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -r1.34 -r1.35
--- RMProvider.java 21 Feb 2005 12:08:21 -0000 1.34
+++ RMProvider.java 25 Feb 2005 13:41:47 -0000 1.35
@@ -23,10 +23,7 @@
import org.apache.axis.message.addressing.AddressingHeaders;
import org.apache.axis.providers.java.RPCProvider;
import org.apache.commons.logging.Log;
-import org.apache.sandesha.IStorageManager;
-import org.apache.sandesha.RMException;
-import org.apache.sandesha.RMInitiator;
-import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.*;
import org.apache.sandesha.server.MessageValidator;
import org.apache.sandesha.server.RMMessageProcessorIdentifier;
import org.apache.sandesha.server.msgprocessors.FaultProcessor;
@@ -92,6 +89,10 @@
try {
if (!rmMessageProcessor.processMessage(rmMessageContext)) {
msgContext.setResponseMessage(null);
+ }else{
+ // TODO Get the from envecreator
+
+ // SOAPEnvelope
resEn=EnvelopeCreator.createAcknowledgementEnvelope()
}
} catch (AxisFault af) {
RMProvider.log.error(af);