jaliya 2005/01/11 05:52:02
Modified: sandesha/src/org/apache/sandesha/server
AcknowledgementProcessor.java
CompositeProcessor.java RMInvoker.java Sender.java
ServerStorageManager.java
sandesha/src/org/apache/sandesha/server/dao IServerDAO.java
ServerDatabaseDAO.java ServerQueueDAO.java
sandesha/src/org/apache/sandesha/server/queue
ResponseSequenceHash.java ServerQueue.java
Log:
Added the functionality to send the TerminateSeq message from the client
Revision Changes Path
1.12 +1 -0
ws-fx/sandesha/src/org/apache/sandesha/server/AcknowledgementProcessor.java
Index: AcknowledgementProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/AcknowledgementProcessor.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- AcknowledgementProcessor.java 12 Nov 2004 06:18:25 -0000 1.11
+++ AcknowledgementProcessor.java 11 Jan 2005 13:51:57 -0000 1.12
@@ -53,6 +53,7 @@
AcknowledgementRange ackRange = (AcknowledgementRange)
ite.next();
long msgNumber = ackRange.getMinValue();
while (ackRange.getMaxValue() >= msgNumber) {
+ storageManger. setAckReceived(seqID, msgNumber);
storageManger.setAcknowledged(seqID, msgNumber);
msgNumber++;
}
1.12 +1 -2
ws-fx/sandesha/src/org/apache/sandesha/server/CompositeProcessor.java
Index: CompositeProcessor.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/CompositeProcessor.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- CompositeProcessor.java 2 Dec 2004 13:15:48 -0000 1.11
+++ CompositeProcessor.java 11 Jan 2005 13:51:57 -0000 1.12
@@ -98,8 +98,7 @@
//TODO: Add to log.
}
- System.out
- .println("INFO: Inserting the request message
....\n");
+ System.out.println("INFO: Inserting the request message
....\n");
//Insert the message to the INQUEUE
storageManger.insertIncomingMessage(rmMsgContext);
}
1.8 +15 -1
ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
Index: RMInvoker.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- RMInvoker.java 10 Jan 2005 23:00:09 -0000 1.7
+++ RMInvoker.java 11 Jan 2005 13:51:57 -0000 1.8
@@ -74,7 +74,8 @@
if(rmMessageContext.isLastMessage()){
//Insert Terminate Sequnce.
-
//storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(requestMesssageContext));
+ System.out.println("LAST MESSAGE IS THERE");
+
storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(rmMessageContext));
}
rpcProvider.invoke(rmMessageContext.getMsgContext());
@@ -177,5 +178,18 @@
}
+ }
+
+ private RMMessageContext getTerminateSeqMessage(RMMessageContext
rmMessageContext) {
+ RMMessageContext terSeqRMMsgContext = new RMMessageContext();
+ MessageContext terSeqMsgContext = new
MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
+ terSeqRMMsgContext.setSequenceID(rmMessageContext.getSequenceID());
+
terSeqRMMsgContext.setAddressingHeaders(rmMessageContext.getAddressingHeaders());
+ //RMMessageContext.copyMessageContext(msgContext, messageContext);
+
terSeqRMMsgContext.setOutGoingAddress(rmMessageContext.getOutGoingAddress());
+ terSeqRMMsgContext.setMsgContext(terSeqMsgContext);
+
terSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_TERMINATE_SEQUENCE);
+ // TODO Auto-generated method stub
+ return terSeqRMMsgContext;
}
}
1.18 +4 -5 ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
Index: Sender.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- Sender.java 10 Jan 2005 23:00:37 -0000 1.17
+++ Sender.java 11 Jan 2005 13:51:57 -0000 1.18
@@ -149,11 +149,10 @@
* @param rmMessageContext
*/
private void sendTerminateSequenceRequest(RMMessageContext
rmMessageContext) {
+ SOAPEnvelope
terSeqEnv=EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
+ Message terSeqMsg= new Message(terSeqEnv);
+ rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
- if (rmMessageContext.getMsgContext().getRequestMessage() == null) {
- //The code should not come to this point.
- System.err.println("ERROR: NULL REQUEST MESSAGE");
- } else {
Call call;
try {
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
@@ -183,7 +182,7 @@
// TODO Auto-generated catch block
e.printStackTrace();
}
- }
+
}
1.14 +8 -3
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- ServerStorageManager.java 11 Jan 2005 07:16:09 -0000 1.13
+++ ServerStorageManager.java 11 Jan 2005 13:51:57 -0000 1.14
@@ -109,7 +109,12 @@
if (msg == null)
msg = accessor.getNextOutgoingMsgContextToSend();
+ //if(msg==null){
+ // msg = accessor.getNextLowPriorityMessageContextToSend(); //
checks whether all the request messages hv been acked
+ //}
+
return msg;
+
}
/**
@@ -307,7 +312,7 @@
* @see
org.apache.sandesha.IStorageManager#insertTerminateSeqMessage(org.apache.sandesha.RMMessageContext)
*/
public void insertTerminateSeqMessage(RMMessageContext
terminateSeqMessage) {
- accessor.addPriorityMessage(terminateSeqMessage);
+ accessor.addLowPriorityMessage(terminateSeqMessage);
}
/* (non-Javadoc)
@@ -335,8 +340,8 @@
}
- public void setAckReceived(RMMessageContext responseMsg) {
- accessor.setAckReceived(responseMsg);
+ public void setAckReceived(String seqId,long msgNo) {
+ accessor.setAckReceived(seqId,msgNo);
}
}
1.12 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/server/dao/IServerDAO.java
Index: IServerDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/IServerDAO.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- IServerDAO.java 11 Jan 2005 07:18:54 -0000 1.11
+++ IServerDAO.java 11 Jan 2005 13:51:57 -0000 1.12
@@ -87,7 +87,7 @@
public void setResponseReceived(RMMessageContext msg);
- public void setAckReceived(RMMessageContext responseMsg);
+ public void setAckReceived(String seqId,long msgNo);
public void addLowPriorityMessage(RMMessageContext msg);
1.13 +1 -1
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java
Index: ServerDatabaseDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ServerDatabaseDAO.java 11 Jan 2005 07:20:14 -0000 1.12
+++ ServerDatabaseDAO.java 11 Jan 2005 13:51:57 -0000 1.13
@@ -27,7 +27,7 @@
public class ServerDatabaseDAO implements IServerDAO {
- public void setAckReceived(RMMessageContext responseMsg) {
+ public void setAckReceived(String seqId,long msgNo) {
// TODO Auto-generated method stub
}
1.13 +3 -4
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java
Index: ServerQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ServerQueueDAO.java 11 Jan 2005 07:19:44 -0000 1.12
+++ ServerQueueDAO.java 11 Jan 2005 13:51:57 -0000 1.13
@@ -443,7 +443,7 @@
ServerQueue sq = ServerQueue.getInstance();
try{
- return sq.nextLowPriorityMessageToSend();
+ return sq.getLowPriorityMessageIfAcked();
}catch(Exception e){
log.error(e);
}
@@ -451,9 +451,8 @@
return null;
}
- public void setAckReceived(RMMessageContext responseMsg) {
+ public void setAckReceived(String seqId,long msgNo) {
ServerQueue sq = ServerQueue.getInstance();
- sq.setAckReceived(responseMsg);
-
+ sq.setAckReceived(seqId,msgNo);
}
}
1.13 +35 -15
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java
Index: ResponseSequenceHash.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ResponseSequenceHash.java 11 Jan 2005 07:21:02 -0000 1.12
+++ ResponseSequenceHash.java 11 Jan 2005 13:52:01 -0000 1.13
@@ -269,7 +269,6 @@
}
public boolean markMessageDeleted(Long messageNo){
- System.out.println("********* MARKING AS DELETE");
if(hash.containsKey(messageNo)){
markedAsDelete.add(messageNo);
String msgId = ((RMMessageContext)
hash.get(messageNo)).getMessageID();
@@ -339,34 +338,55 @@
msg.setAckReceived(true);
}
}
-
-
+ public void setAckReceived(long msgNo){
+
+ RMMessageContext msg = (RMMessageContext) hash.get(new Long(msgNo));
+ if(msg!=null){
+ msg.setAckReceived(true);
+ }else
+ System.out.println("ERROR: MESSAGE IS NULL IN ResponseSeqHash");
+
+ }
+
public boolean isAckComplete(){
- if(!hasLastMessage())
+ try{
+ long lastMsgNo = getLastMessage();
+ if(lastMsgNo<=0){
return false;
-
+ }
Iterator it = hash.keySet().iterator();
-
+ for(long i=1;i<lastMsgNo;i++){
+ if(!hasMessage(new Long(i))){
+ return false;
+ }
+ }
+
+ it = hash.keySet().iterator();
while(it.hasNext()){
- RMMessageContext msg = (RMMessageContext) it.next();
- if(!msg.isAckReceived())
+ RMMessageContext msg = (RMMessageContext) hash.get(it.next());
+ if(!msg.isAckReceived()){
return false;
+ }
}
-
+
return true;
+ }catch(Exception e){
+ e.printStackTrace();
+ return false;
+ }
}
- private boolean hasLastMessage(){
+ private long getLastMessage(){
Iterator it = hash.keySet().iterator();
-
while(it.hasNext()){
- RMMessageContext msg = (RMMessageContext) it.next();
- if(msg.isLastMessage())
- return true;
+ RMMessageContext msg = (RMMessageContext) hash.get(it.next());
+ if(msg.isLastMessage()){
+ return msg.getMsgNumber();
+ }
}
- return false;
+ return -1;
}
}
1.13 +38 -48
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ServerQueue.java 11 Jan 2005 07:22:28 -0000 1.12
+++ ServerQueue.java 11 Jan 2005 13:52:01 -0000 1.13
@@ -77,8 +77,7 @@
/**
* This will not replace messages automatically.
*/
- public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
- RMMessageContext msgCon) throws QueueException {
+ public boolean addMessageToIncomingSequence(String seqId, Long
messageNo,RMMessageContext msgCon) throws QueueException {
boolean successful = false;
if (seqId == null || msgCon == null)
@@ -336,20 +335,6 @@
}
- public RMMessageContext nextLowPriorityMessageToSend() throws
QueueException {
-
- synchronized (lowPriorityQueue)
- {
- if(lowPriorityQueue.size() > 0){
- RMMessageContext msg = (RMMessageContext)
lowPriorityQueue.get(0);
- lowPriorityQueue.remove(0);
- return msg;
- }
- }
-
- return null;
-
- }
/*
* public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
@@ -708,7 +693,6 @@
}
public boolean isRequestMsgPresent(String seqId,String messageId){
- System.out.println("******************* "+messageId);
ResponseSequenceHash rsh = (ResponseSequenceHash)
outgoingMap.get(seqId);
if (rsh == null) {
@@ -748,12 +732,9 @@
Vector msgNumbers = new Vector();
Iterator it = highPriorityQueue.iterator();
- System.out.println("################## 1");
- while(it.hasNext()){
- System.out.println("################## 2 ");
- RMMessageContext msg = (RMMessageContext) it.next();
- System.out.println("################## 2 -"+msg.getMessageID() );
- if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
+ while(it.hasNext()){
+ RMMessageContext msg = (RMMessageContext) it.next();
+ if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
continue;
SequenceAcknowledgement seqAck =
msg.getRMHeaders().getSequenceAcknowledgement();
@@ -769,9 +750,7 @@
AcknowledgementRange ackRng = (AcknowledgementRange)
ackIt.next();
long min = ackRng.getMinValue();
long temp=min;
- System.out.println("################## min -"+min);
while(temp<=ackRng.getMaxValue()){
- System.out.println("################## temp -"+temp);
Long lng = new Long(temp);
if(!msgNumbers.contains(lng)) //vector cant hv
duplicate entries.
msgNumbers.add(new Long(temp));
@@ -821,25 +800,27 @@
}
- public void setAckReceived(RMMessageContext responseMsg)
+ public void setAckReceived(String seqId,long msgNo)
{
- String requestMsgID =
responseMsg.getAddressingHeaders().getRelatesTo().toString();
-
+
Iterator it = outgoingMap.keySet().iterator();
String key =null;
while(it.hasNext()){
key = (String) it.next();
Object obj=outgoingMap.get(key);
- if(obj!=null){
- ResponseSequenceHash hash = (ResponseSequenceHash)obj;
- boolean hasMsg = hash.hasMessageWithId(requestMsgID);
- if(!hasMsg)
- //set the property response received
- hash.setAckReceived(requestMsgID);
- }
+
+
+ if(obj!=null){
+ ResponseSequenceHash hash = (ResponseSequenceHash)obj;
+ //System.out.println("************** HASH SEQ IS " +
hash.getSequenceId() + " SEQ IS " + seqId + " OUT SEQ IS " +
hash.getOutSequenceId());
+ if(hash.getOutSequenceId().equals(seqId)){
+
+ hash.setAckReceived(msgNo);
+ }
+ }
}
-
+
}
@@ -854,31 +835,40 @@
}
public RMMessageContext getLowPriorityMessageIfAcked(){
+
int size = lowPriorityQueue.size();
-
-
+
RMMessageContext terminateMsg = null;
for(int i=0;i<size;i++) {
+
RMMessageContext temp;
temp = (RMMessageContext) lowPriorityQueue.get(i);
String seqId = temp.getSequenceID();
-
- boolean foundSeq = false;
- ResponseSequenceHash hash = null;
- Iterator it1 = outgoingMap.keySet().iterator();
+ // System.out.println(" HASH NOT FOUND SEQ ID " + seqId);
+
+ ResponseSequenceHash hash = null;
+ hash = (ResponseSequenceHash) outgoingMap.get(seqId);
+ if(hash==null){
+ System.out.println("ServerQueue: ERROR: HASH NOT FOUND SEQ
ID " + seqId);
+ }
+
+ /*Iterator it1 = outgoingMap.keySet().iterator();
while(it1.hasNext()){
hash = (ResponseSequenceHash) it1.next();
- if(hash.getSequenceId()==seqId){
+ if(hash.getOutSequenceId().equals(seqId)){
+ System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
FOUND SEQ "+ seqId);
foundSeq = true;
break;
}
- }
-
- if(foundSeq && hash!=null){
+ } */
+
+ if(hash!=null){
boolean complete = hash.isAckComplete();
- if(complete)
- lowPriorityQueue.remove(i);
+ if(complete)
+ //lowPriorityQueue.remove(i);
terminateMsg = temp;
+ terminateMsg.setSequenceID(hash.getOutSequenceId());
+ lowPriorityQueue.remove(i);
break;
}