jaliya 2005/01/09 21:43:31
Modified: sandesha/src/org/apache/sandesha/server/dao
ServerQueueDAO.java ServerDatabaseDAO.java
sandesha/src/org/apache/sandesha/server/queue
ServerQueue.java ResponseSequenceHash.java
Log:
resolve some inconsistancies in the code
Revision Changes Path
1.10 +25 -0
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java
Index: ServerQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ServerQueueDAO.java 10 Jan 2005 05:35:35 -0000 1.9
+++ ServerQueueDAO.java 10 Jan 2005 05:43:31 -0000 1.10
@@ -388,4 +388,29 @@
ServerQueue sq = ServerQueue.getInstance();
sq.markOutgoingMessageToDelete(seqId,msgNo);
}
+
+
+ /* (non-Javadoc)
+ * @see
org.apache.sandesha.server.dao.IServerDAO#isAckComplete(java.lang.String)
+ */
+ public boolean compareAcksWithSequence(String sequenceId){
+ ServerQueue sq = ServerQueue.getInstance();
+ Vector acks = sq.getAllAckedMsgNumbers(sequenceId);
+ Vector outGoingMsgs = sq.getAllOutgoingMsgNumbers(sequenceId);
+
+ if(acks.size()< outGoingMsgs.size()) //Size must be equal (number of
msgs=number of acks)
+ return false;
+
+ boolean result = true;
+ for(int i=0;i<outGoingMsgs.size();i++){
+ if(!acks.contains(outGoingMsgs.get(i))){
+
+ //System.out.println("result false "+outGoingMsgs.get(i));
+ result = false;
+ break;
+ }
+ }
+
+ return result;
+ }
}
1.10 +12 -0
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java
Index: ServerDatabaseDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ServerDatabaseDAO.java 10 Jan 2005 05:35:35 -0000 1.9
+++ ServerDatabaseDAO.java 10 Jan 2005 05:43:31 -0000 1.10
@@ -272,4 +272,16 @@
public String searchForSequenceId(String messageId){
return null;
}
+
+
+
+
+
+ /* (non-Javadoc)
+ * @see
org.apache.sandesha.server.dao.IServerDAO#compareAcksWithSequence(java.lang.String)
+ */
+ public boolean compareAcksWithSequence(String sequenceId) {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
1.10 +89 -2
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ServerQueue.java 24 Dec 2004 17:59:04 -0000 1.9
+++ ServerQueue.java 10 Jan 2005 05:43:31 -0000 1.10
@@ -17,15 +17,20 @@
package org.apache.sandesha.server.queue;
+
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.Vector;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.ws.rm.AcknowledgementRange;
+import org.apache.sandesha.ws.rm.MessageNumber;
+import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
/*
* Created on Aug 4, 2004 at 4:49:49 PM
@@ -445,7 +450,6 @@
return false;
}
- //To set the out sequence of the given sequence
public void setOutSequence(String seqId, String outSeqId) {
ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
.get(seqId);
@@ -473,7 +477,6 @@
}
}
-
public String getSequenceOfOutSequence(String outSequence) {
if (outSequence == null) {
@@ -587,6 +590,26 @@
}
}
+
+ public void markOutgoingMessageToDelete(String sequenceId,Long
messageNo){
+ String sequence = getSequenceOfOutSequence(sequenceId);
+ ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
+ .get(sequence);
+
+ if (rsh == null) {
+ System.out.println("ERROR: RESPONSE SEQ IS NULL "+sequence);
+ return;
+ }
+
+ synchronized (rsh) {
+ //Deleting retuns the deleted message.
+ rsh.markMessageDeleted(messageNo);
+ //If we jave already deleted then no message to return.
+ }
+
+ }
+
+
public void movePriorityMsgToBin(String messageId) {
synchronized (priorityQueue) {
@@ -686,5 +709,69 @@
return key;
}
+
+ public Vector getAllAckedMsgNumbers(String seqId){
+ Vector msgNumbers = new Vector();
+
+ Iterator it = priorityQueue.iterator();
+ System.out.println("################## 1");
+ while(it.hasNext()){
+ System.out.println("################## 2 ");
+ RMMessageContext msg = (RMMessageContext) it.next();
+ System.out.println("################## 2 -"+msg.getMessageID() );
+ if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
+ continue;
+
+ SequenceAcknowledgement seqAck =
msg.getRMHeaders().getSequenceAcknowledgement();
+ String sId = seqAck.getIdentifier().getIdentifier();
+ if(seqId!=sId) //Sorry. Wrong sequence.
+ continue;
+
+ List ackList = seqAck.getAckRanges();
+
+ Iterator ackIt = ackList.iterator();
+
+ while(ackIt.hasNext()){
+ AcknowledgementRange ackRng = (AcknowledgementRange)
ackIt.next();
+ long min = ackRng.getMinValue();
+ long temp=min;
+ System.out.println("################## min -"+min);
+ while(temp<=ackRng.getMaxValue()){
+ System.out.println("################## temp -"+temp);
+ Long lng = new Long(temp);
+ if(!msgNumbers.contains(lng)) //vector cant hv
duplicate entries.
+ msgNumbers.add(new Long(temp));
+
+ temp++;
+ }
+ }
+ }
+ return msgNumbers;
+ }
+
+ public Vector getAllOutgoingMsgNumbers(String seqId){
+ Vector msgNumbers;
+
+ ResponseSequenceHash rsh = (ResponseSequenceHash)
outgoingMap.get(seqId);
+
+ if (rsh == null) {
+ System.out.println("ERROR: SEQ IS NULL "+seqId);
+ return null;
+ }
+
+ synchronized (rsh) {
+ msgNumbers = rsh.getReceivedMsgNumbers();
+ }
+
+ return msgNumbers;
+ }
+
+ /*public Vector getAllIncommingMsgNumbers(String seqId){
+ Vector msgNumbers = new Vector();
+ incomingMap.get(seqId);
+
+ //Not implemented yet.
+ return msgNumbers;
+ }*/
}
1.9 +17 -0
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java
Index: ResponseSequenceHash.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ResponseSequenceHash.java 3 Dec 2004 09:03:58 -0000 1.8
+++ ResponseSequenceHash.java 10 Jan 2005 05:43:31 -0000 1.9
@@ -23,6 +23,8 @@
import java.util.Set;
import java.util.Vector;
+import java_cup.version;
+
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
@@ -301,6 +303,21 @@
}
return false;
+ }
+
+ public Vector getReceivedMsgNumbers(){
+
+ Vector result = new Vector();
+ Iterator it = hash.keySet().iterator();
+
+ while(it.hasNext()){
+ Object key = it.next();
+ RMMessageContext msg = (RMMessageContext) hash.get(key);
+ long l = msg.getMsgNumber();
+ result.add(new Long(l));
+ }
+
+ return result;
}
}