chamikara 2005/01/10 15:03:14
Modified: sandesha/src/org/apache/sandesha/server/queue
ServerQueue.java
Log:
a new queue was added for low priority messages
Revision Changes Path
1.11 +78 -20
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- ServerQueue.java 10 Jan 2005 05:43:31 -0000 1.10
+++ ServerQueue.java 10 Jan 2005 23:03:14 -0000 1.11
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.Vector;
+import org.apache.axis.message.addressing.MessageID;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
import org.apache.sandesha.ws.rm.AcknowledgementRange;
@@ -49,9 +50,11 @@
HashMap outgoingMap; //Response messages
- ArrayList priorityQueue; // Acks and create seq. responses.
+ ArrayList highPriorityQueue; // Acks and create seq. responses.
HashMap queueBin; // Messaged processed from out queue will be moved
+
+ ArrayList lowPriorityQueue;
// to this.
// to this.
@@ -59,8 +62,9 @@
private ServerQueue() {
incomingMap = new HashMap();
outgoingMap = new HashMap();
- priorityQueue = new ArrayList();
+ highPriorityQueue = new ArrayList();
queueBin = new HashMap();
+ lowPriorityQueue = new ArrayList();
}
public static ServerQueue getInstance() {
@@ -256,31 +260,45 @@
*/
public void addPriorityMessage(RMMessageContext msg) throws
QueueException {
- synchronized (priorityQueue) {
+ synchronized (highPriorityQueue) {
if (msg == null)
throw new QueueException("Message is null");
- priorityQueue.add(msg);
+ highPriorityQueue.add(msg);
}
}
+
+
+
+ public void addLowPriorityMessage(RMMessageContext msg) throws
QueueException {
+
+ synchronized (lowPriorityQueue) {
+
+ if (msg == null)
+ throw new QueueException("Message is null");
+
+ lowPriorityQueue.add(msg);
+ }
+ }
+
public RMMessageContext nextPriorityMessageToSend() throws
QueueException {
- synchronized (priorityQueue) {
+ synchronized (highPriorityQueue) {
- if (priorityQueue.size() <= 0)
+ if (highPriorityQueue.size() <= 0)
return null;
- //RMMessageContext msg = (RMMessageContext) priorityQueue.get(0);
+ //RMMessageContext msg = (RMMessageContext)
highPriorityQueue.get(0);
RMMessageContext msg = null;
- int size = priorityQueue.size();
+ int size = highPriorityQueue.size();
- synchronized (priorityQueue) {
+ synchronized (highPriorityQueue) {
forLoop: //Label
for (int i = 0; i < size; i++) {
- RMMessageContext tempMsg = (RMMessageContext)
priorityQueue
+ RMMessageContext tempMsg = (RMMessageContext)
highPriorityQueue
.get(i);
if (tempMsg != null) {
@@ -302,7 +320,7 @@
//These include CreareSeqResponses and
// Acknowledgements.
default:
- priorityQueue.remove(i);
+ highPriorityQueue.remove(i);
queueBin.put(tempMsg.getMessageID(), tempMsg);
msg = tempMsg;
break forLoop;
@@ -316,6 +334,22 @@
}
}
+
+
+ public RMMessageContext nextLowPriorityMessageToSend() throws
QueueException {
+
+ synchronized (lowPriorityQueue)
+ {
+ if(lowPriorityQueue.size() > 0){
+ RMMessageContext msg = (RMMessageContext)
lowPriorityQueue.get(0);
+ lowPriorityQueue.remove(0);
+ return msg;
+ }
+ }
+
+ return null;
+
+ }
/*
* public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
@@ -381,7 +415,7 @@
return;
incomingMap.clear();
- priorityQueue.clear();
+ highPriorityQueue.clear();
outgoingMap.clear();
queueBin.clear();
}
@@ -552,7 +586,7 @@
System.out.println(" DISPLAYING PRIORITY QUEUE");
System.out.println("------------------------------------");
- Iterator it = priorityQueue.iterator();
+ Iterator it = highPriorityQueue.iterator();
while (it.hasNext()) {
RMMessageContext msg = (RMMessageContext) it.next();
String id = msg.getMessageID();
@@ -612,14 +646,14 @@
public void movePriorityMsgToBin(String messageId) {
- synchronized (priorityQueue) {
- int size = priorityQueue.size();
+ synchronized (highPriorityQueue) {
+ int size = highPriorityQueue.size();
for (int i = 0; i < size; i++) {
- RMMessageContext msg = (RMMessageContext)
priorityQueue.get(i);
+ RMMessageContext msg = (RMMessageContext)
highPriorityQueue.get(i);
if (msg.getMessageID().equals(messageId)) {
- priorityQueue.remove(i);
+ highPriorityQueue.remove(i);
queueBin.put(messageId, msg);
return;
}
@@ -713,7 +747,7 @@
public Vector getAllAckedMsgNumbers(String seqId){
Vector msgNumbers = new Vector();
- Iterator it = priorityQueue.iterator();
+ Iterator it = highPriorityQueue.iterator();
System.out.println("################## 1");
while(it.hasNext()){
System.out.println("################## 2 ");
@@ -750,13 +784,13 @@
}
public Vector getAllOutgoingMsgNumbers(String seqId){
- Vector msgNumbers;
+ Vector msgNumbers = new Vector();
ResponseSequenceHash rsh = (ResponseSequenceHash)
outgoingMap.get(seqId);
if (rsh == null) {
System.out.println("ERROR: SEQ IS NULL "+seqId);
- return null;
+ return msgNumbers;
}
synchronized (rsh) {
@@ -765,6 +799,30 @@
return msgNumbers;
}
+
+ public void setResponseReceived(RMMessageContext responseMsg)
+ {
+ String requestMsgID =
responseMsg.getAddressingHeaders().getRelatesTo().toString();
+
+ Iterator it = outgoingMap.keySet().iterator();
+
+ String key =null;
+ while(it.hasNext()){
+ key = (String) it.next();
+ Object obj=outgoingMap.get(key);
+ if(obj!=null){
+ ResponseSequenceHash hash = (ResponseSequenceHash)obj;
+ boolean hasMsg = hash.hasMessageWithId(requestMsgID);
+ if(!hasMsg)
+ //set the property response received
+ hash.setResponseReceived(requestMsgID);
+ }
+ }
+
+ }
+
+
+
/*public Vector getAllIncommingMsgNumbers(String seqId){
Vector msgNumbers = new Vector();