jaliya      2005/01/09 21:43:31

  Modified:    sandesha/src/org/apache/sandesha/server/dao
                        ServerQueueDAO.java ServerDatabaseDAO.java
               sandesha/src/org/apache/sandesha/server/queue
                        ServerQueue.java ResponseSequenceHash.java
  Log:
  resolve some inconsistancies in the code
  
  Revision  Changes    Path
  1.10      +25 -0     
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java
  
  Index: ServerQueueDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- ServerQueueDAO.java       10 Jan 2005 05:35:35 -0000      1.9
  +++ ServerQueueDAO.java       10 Jan 2005 05:43:31 -0000      1.10
  @@ -388,4 +388,29 @@
           ServerQueue sq = ServerQueue.getInstance();
           sq.markOutgoingMessageToDelete(seqId,msgNo); 
       }
  +    
  +    
  +    /* (non-Javadoc)
  +     * @see 
org.apache.sandesha.server.dao.IServerDAO#isAckComplete(java.lang.String)
  +     */
  +    public boolean compareAcksWithSequence(String sequenceId){
  +        ServerQueue sq = ServerQueue.getInstance();
  +        Vector acks = sq.getAllAckedMsgNumbers(sequenceId);
  +        Vector outGoingMsgs = sq.getAllOutgoingMsgNumbers(sequenceId);
  +        
  +        if(acks.size()< outGoingMsgs.size()) //Size must be equal (number of 
msgs=number of acks)
  +            return false;
  +        
  +        boolean result = true;
  +        for(int i=0;i<outGoingMsgs.size();i++){
  +            if(!acks.contains(outGoingMsgs.get(i))){
  +                
  +                //System.out.println("result false "+outGoingMsgs.get(i));
  +                result = false;
  +                break;
  +            }     
  +        }
  +        
  +        return result;
  +    }
   }
  
  
  
  1.10      +12 -0     
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java
  
  Index: ServerDatabaseDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- ServerDatabaseDAO.java    10 Jan 2005 05:35:35 -0000      1.9
  +++ ServerDatabaseDAO.java    10 Jan 2005 05:43:31 -0000      1.10
  @@ -272,4 +272,16 @@
       public String searchForSequenceId(String messageId){
           return null;
       }
  +    
  +    
  +
  +    
  +
  +    /* (non-Javadoc)
  +     * @see 
org.apache.sandesha.server.dao.IServerDAO#compareAcksWithSequence(java.lang.String)
  +     */
  +    public boolean compareAcksWithSequence(String sequenceId) {
  +        // TODO Auto-generated method stub
  +        return false;
  +    }
   }
  
  
  
  1.10      +89 -2     
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
  
  Index: ServerQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- ServerQueue.java  24 Dec 2004 17:59:04 -0000      1.9
  +++ ServerQueue.java  10 Jan 2005 05:43:31 -0000      1.10
  @@ -17,15 +17,20 @@
   
   package org.apache.sandesha.server.queue;
   
  +
   import java.util.ArrayList;
   import java.util.Date;
   import java.util.HashMap;
   import java.util.Iterator;
  +import java.util.List;
   import java.util.Set;
   import java.util.Vector;
   
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.ws.rm.AcknowledgementRange;
  +import org.apache.sandesha.ws.rm.MessageNumber;
  +import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
   
   /*
    * Created on Aug 4, 2004 at 4:49:49 PM
  @@ -445,7 +450,6 @@
               return false;
       }
   
  -    //To set the out sequence of the given sequence
       public void setOutSequence(String seqId, String outSeqId) {
           ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                   .get(seqId);
  @@ -473,7 +477,6 @@
           }
       }
   
  -    
       public String getSequenceOfOutSequence(String outSequence) {
   
           if (outSequence == null) {
  @@ -587,6 +590,26 @@
   
           }
       }
  +    
  +    public void markOutgoingMessageToDelete(String sequenceId,Long 
messageNo){
  +        String sequence = getSequenceOfOutSequence(sequenceId);
  +        ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
  +                .get(sequence);
  +
  +        if (rsh == null) {
  +            System.out.println("ERROR: RESPONSE SEQ IS NULL "+sequence);
  +            return;
  +        }
  +        
  +        synchronized (rsh) {
  +            //Deleting retuns the deleted message.
  +            rsh.markMessageDeleted(messageNo);
  +            //If we jave already deleted then no message to return.    
  +            }
  +
  +        }
  +        
  +
   
       public void movePriorityMsgToBin(String messageId) {
           synchronized (priorityQueue) {
  @@ -686,5 +709,69 @@
           
           return key;      
       }
  +    
  +    public Vector getAllAckedMsgNumbers(String seqId){
  +        Vector msgNumbers = new Vector();
  +  
  +        Iterator it = priorityQueue.iterator();
  +        System.out.println("################## 1");
  +        while(it.hasNext()){
  +            System.out.println("################## 2 ");
  +            RMMessageContext msg = (RMMessageContext) it.next();
  +            System.out.println("################## 2 -"+msg.getMessageID() );
  +            if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
  +                continue;
  +            
  +            SequenceAcknowledgement seqAck = 
msg.getRMHeaders().getSequenceAcknowledgement();
  +            String sId = seqAck.getIdentifier().getIdentifier();
  +            if(seqId!=sId)    //Sorry. Wrong sequence.
  +                continue;
  +            
  +            List ackList = seqAck.getAckRanges();
  +            
  +            Iterator ackIt = ackList.iterator();
  +            
  +            while(ackIt.hasNext()){
  +                AcknowledgementRange ackRng = (AcknowledgementRange) 
ackIt.next();
  +                long min = ackRng.getMinValue();
  +                long temp=min;
  +                System.out.println("################## min -"+min);
  +                while(temp<=ackRng.getMaxValue()){
  +                    System.out.println("################## temp -"+temp);
  +                    Long lng = new Long(temp);
  +                    if(!msgNumbers.contains(lng))  //vector cant hv 
duplicate entries.
  +                        msgNumbers.add(new Long(temp));
  +                    
  +                    temp++;
  +                }  
  +            }  
  +        }
  +        return msgNumbers;
  +    }
  +    
  +    public Vector getAllOutgoingMsgNumbers(String seqId){
  +        Vector msgNumbers;
  +        
  +        ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
  +        
  +        if (rsh == null) {
  +            System.out.println("ERROR: SEQ IS NULL "+seqId);
  +            return null;
  +        } 
  +        
  +        synchronized (rsh) {
  +            msgNumbers = rsh.getReceivedMsgNumbers();  
  +        }
  +        
  +        return msgNumbers;
  +    }
  +    
  +    /*public Vector getAllIncommingMsgNumbers(String seqId){
  +        Vector msgNumbers = new Vector();
  +        incomingMap.get(seqId);
  +        
  +        //Not implemented yet.
  +        return msgNumbers;
  +    }*/
   }
   
  
  
  
  1.9       +17 -0     
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java
  
  Index: ResponseSequenceHash.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- ResponseSequenceHash.java 3 Dec 2004 09:03:58 -0000       1.8
  +++ ResponseSequenceHash.java 10 Jan 2005 05:43:31 -0000      1.9
  @@ -23,6 +23,8 @@
   import java.util.Set;
   import java.util.Vector;
   
  +import java_cup.version;
  +
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
   
  @@ -301,6 +303,21 @@
             
           }
           return false;
  +    }
  +    
  +    public Vector getReceivedMsgNumbers(){
  +        
  +        Vector result = new Vector();
  +        Iterator it = hash.keySet().iterator();
  +        
  +        while(it.hasNext()){
  +            Object key = it.next();
  +            RMMessageContext msg = (RMMessageContext) hash.get(key);
  +            long l = msg.getMsgNumber();
  +            result.add(new Long(l));
  +        }
  +        
  +        return result;
       }
       
   }
  
  
  

Reply via email to