chamikara    2005/01/10 15:03:14

  Modified:    sandesha/src/org/apache/sandesha/server/queue
                        ServerQueue.java
  Log:
  a new queue was added for low priority messages
  
  Revision  Changes    Path
  1.11      +78 -20    
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
  
  Index: ServerQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- ServerQueue.java  10 Jan 2005 05:43:31 -0000      1.10
  +++ ServerQueue.java  10 Jan 2005 23:03:14 -0000      1.11
  @@ -26,6 +26,7 @@
   import java.util.Set;
   import java.util.Vector;
   
  +import org.apache.axis.message.addressing.MessageID;
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
   import org.apache.sandesha.ws.rm.AcknowledgementRange;
  @@ -49,9 +50,11 @@
   
       HashMap outgoingMap; //Response messages
   
  -    ArrayList priorityQueue; // Acks and create seq. responses.
  +    ArrayList highPriorityQueue; // Acks and create seq. responses.
   
       HashMap queueBin; // Messaged processed from out queue will be moved
  +    
  +    ArrayList lowPriorityQueue;
                                 // to this.
   
       // to this.
  @@ -59,8 +62,9 @@
       private ServerQueue() {
           incomingMap = new HashMap();
           outgoingMap = new HashMap();
  -        priorityQueue = new ArrayList();
  +        highPriorityQueue = new ArrayList();
           queueBin = new HashMap();
  +        lowPriorityQueue = new ArrayList();
       }
   
       public static ServerQueue getInstance() {
  @@ -256,31 +260,45 @@
        */
       public void addPriorityMessage(RMMessageContext msg) throws 
QueueException {
   
  -        synchronized (priorityQueue) {
  +        synchronized (highPriorityQueue) {
   
               if (msg == null)
                   throw new QueueException("Message is null");
   
  -            priorityQueue.add(msg);
  +            highPriorityQueue.add(msg);
           }
       }
  +    
  +    
  +    
  +    public void addLowPriorityMessage(RMMessageContext msg) throws 
QueueException {
  +
  +        synchronized (lowPriorityQueue) {
  +
  +            if (msg == null)
  +                throw new QueueException("Message is null");
  +
  +            lowPriorityQueue.add(msg);
  +        }
  +    }   
  +    
   
       public RMMessageContext nextPriorityMessageToSend() throws 
QueueException {
   
  -        synchronized (priorityQueue) {
  +        synchronized (highPriorityQueue) {
   
  -            if (priorityQueue.size() <= 0)
  +            if (highPriorityQueue.size() <= 0)
                   return null;
   
  -            //RMMessageContext msg = (RMMessageContext) priorityQueue.get(0);
  +            //RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(0);
               RMMessageContext msg = null;
  -            int size = priorityQueue.size();
  +            int size = highPriorityQueue.size();
   
  -            synchronized (priorityQueue) {
  +            synchronized (highPriorityQueue) {
   
                   forLoop: //Label
                   for (int i = 0; i < size; i++) {
  -                    RMMessageContext tempMsg = (RMMessageContext) 
priorityQueue
  +                    RMMessageContext tempMsg = (RMMessageContext) 
highPriorityQueue
                               .get(i);
                       if (tempMsg != null) {
   
  @@ -302,7 +320,7 @@
                           //These include CreareSeqResponses and
                           // Acknowledgements.
                           default:
  -                            priorityQueue.remove(i);
  +                            highPriorityQueue.remove(i);
                               queueBin.put(tempMsg.getMessageID(), tempMsg);
                               msg = tempMsg;
                               break forLoop;
  @@ -316,6 +334,22 @@
   
           }
       }
  +    
  +    
  +    public RMMessageContext nextLowPriorityMessageToSend() throws 
QueueException {
  +       
  +        synchronized (lowPriorityQueue)
  +        {
  +            if(lowPriorityQueue.size() > 0){
  +                RMMessageContext msg = (RMMessageContext) 
lowPriorityQueue.get(0);
  +                lowPriorityQueue.remove(0);
  +                return msg;
  +            }    
  +        }
  +        
  +        return null;
  +        
  +    }
   
       /*
        * public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
  @@ -381,7 +415,7 @@
               return;
   
           incomingMap.clear();
  -        priorityQueue.clear();
  +        highPriorityQueue.clear();
           outgoingMap.clear();
           queueBin.clear();
       }
  @@ -552,7 +586,7 @@
           System.out.println("       DISPLAYING PRIORITY QUEUE");
           System.out.println("------------------------------------");
   
  -        Iterator it = priorityQueue.iterator();
  +        Iterator it = highPriorityQueue.iterator();
           while (it.hasNext()) {
               RMMessageContext msg = (RMMessageContext) it.next();
               String id = msg.getMessageID();
  @@ -612,14 +646,14 @@
   
   
       public void movePriorityMsgToBin(String messageId) {
  -        synchronized (priorityQueue) {
  -            int size = priorityQueue.size();
  +        synchronized (highPriorityQueue) {
  +            int size = highPriorityQueue.size();
               for (int i = 0; i < size; i++) {
  -                RMMessageContext msg = (RMMessageContext) 
priorityQueue.get(i);
  +                RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(i);
   
                   if (msg.getMessageID().equals(messageId)) {
   
  -                    priorityQueue.remove(i);
  +                    highPriorityQueue.remove(i);
                       queueBin.put(messageId, msg);
                       return;
                   }
  @@ -713,7 +747,7 @@
       public Vector getAllAckedMsgNumbers(String seqId){
           Vector msgNumbers = new Vector();
     
  -        Iterator it = priorityQueue.iterator();
  +        Iterator it = highPriorityQueue.iterator();
           System.out.println("################## 1");
           while(it.hasNext()){
               System.out.println("################## 2 ");
  @@ -750,13 +784,13 @@
       }
       
       public Vector getAllOutgoingMsgNumbers(String seqId){
  -        Vector msgNumbers;
  +        Vector msgNumbers = new Vector();
           
           ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
           
           if (rsh == null) {
               System.out.println("ERROR: SEQ IS NULL "+seqId);
  -            return null;
  +            return msgNumbers;
           } 
           
           synchronized (rsh) {
  @@ -765,6 +799,30 @@
           
           return msgNumbers;
       }
  +    
  +    public void setResponseReceived(RMMessageContext responseMsg)
  +    {
  +        String requestMsgID = 
responseMsg.getAddressingHeaders().getRelatesTo().toString();
  +        
  +        Iterator it = outgoingMap.keySet().iterator();
  +        
  +        String key =null;
  +        while(it.hasNext()){
  +            key = (String) it.next();
  +            Object obj=outgoingMap.get(key);
  +           if(obj!=null){
  +                ResponseSequenceHash hash = (ResponseSequenceHash)obj;
  +            boolean hasMsg = hash.hasMessageWithId(requestMsgID);
  +            if(!hasMsg)
  +                //set the property response received
  +                hash.setResponseReceived(requestMsgID);
  +            }
  +        }
  +                
  +    }
  +    
  +    
  +    
       
       /*public Vector getAllIncommingMsgNumbers(String seqId){
           Vector msgNumbers = new Vector();
  
  
  

Reply via email to