chamikara    2005/06/06 11:23:32

  Modified:    sandesha/src/org/apache/sandesha IStorageManager.java
               sandesha/src/org/apache/sandesha/client
                        ClientStorageManager.java
               sandesha/src/org/apache/sandesha/server
                        ServerStorageManager.java
               sandesha/src/org/apache/sandesha/storage/dao
                        ISandeshaDAO.java SandeshaDatabaseDAO.java
                        SandeshaQueueDAO.java
               sandesha/src/org/apache/sandesha/storage/queue
                        IncomingSequence.java SandeshaQueue.java
  Log:
  The queue classes were changed to give ack messages, not only after a 
ackRequested or lastMessage, but also when a certain time period (ack interval) 
has been passed since the last message arrived.
  
  Revision  Changes    Path
  1.26      +2 -0      
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
  
  Index: IStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
  retrieving revision 1.25
  retrieving revision 1.26
  diff -u -r1.25 -r1.26
  --- IStorageManager.java      25 May 2005 10:08:59 -0000      1.25
  +++ IStorageManager.java      6 Jun 2005 18:23:32 -0000       1.26
  @@ -112,4 +112,6 @@
       void clearStorage();
   
       boolean isSequenceComplete(String seqId);
  +    
  +    void sendAck(String sequenceId);
   }
  \ No newline at end of file
  
  
  
  1.42      +10 -0     
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
  
  Index: ClientStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
  retrieving revision 1.41
  retrieving revision 1.42
  diff -u -r1.41 -r1.42
  --- ClientStorageManager.java 29 May 2005 11:22:04 -0000      1.41
  +++ ClientStorageManager.java 6 Jun 2005 18:23:32 -0000       1.42
  @@ -99,6 +99,10 @@
        * responses received from the server side.
        */
       public void addAcknowledgement(RMMessageContext rmMessageContext) {
  +        String sequenceID = rmMessageContext.getSequenceID();
  +        if(sequenceID!=null)
  +            accessor.removeAllAcks(sequenceID);
  +        
           addPriorityMessage(rmMessageContext);
       }
   
  @@ -254,6 +258,7 @@
               return;
           Long msgNo = new Long(messageNumber);
           accessor.addMessageToIncomingSequence(sequenceId, msgNo, 
rmMessageContext);
  +        accessor.updateFinalMessageArrivedTime(sequenceId);
       }
   
   
  @@ -400,6 +405,11 @@
           boolean incomingTerminateReceived = 
accessor.isIncommingTerminateReceived(seqId);
           return outTerminateSent && incomingTerminateReceived;
       }
  +    
  +    public void sendAck(String sequenceId) {
  +        String keyId = accessor.getKeyFromIncomingSequenceId(sequenceId);
  +        accessor.sendAck(keyId);
  +    }
   
   
   }
  \ No newline at end of file
  
  
  
  1.34      +11 -4     
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
  
  Index: ServerStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
  retrieving revision 1.33
  retrieving revision 1.34
  diff -u -r1.33 -r1.34
  --- ServerStorageManager.java 29 May 2005 11:22:04 -0000      1.33
  +++ ServerStorageManager.java 6 Jun 2005 18:23:32 -0000       1.34
  @@ -191,6 +191,10 @@
        * @see 
org.apache.sandesha.IStorageManager#addAcknowledgement(org.apache.sandesha.RMMessageContext)
        */
       public void addAcknowledgement(RMMessageContext rmMessageContext) {
  +        String sequenceID = rmMessageContext.getSequenceID();
  +        if(sequenceID!=null)
  +            accessor.removeAllAcks(sequenceID);
  +        
           addPriorityMessage(rmMessageContext);
       }
   
  @@ -259,6 +263,7 @@
   
           Long msgNo = new Long(messageNumber);
           accessor.addMessageToIncomingSequence(sequenceId, msgNo, 
rmMessageContext);
  +        accessor.updateFinalMessageArrivedTime(sequenceId);
   
       }
   
  @@ -367,11 +372,11 @@
       }
   
       public void addOffer(String msgID, String offerID) {
  -        //To change body of implemented methods use File | Settings | File 
Templates.
  +        
       }
   
       public String getOffer(String msgID) {
  -        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
  +        return null;  
       }
       
       public void clearStorage(){
  @@ -379,8 +384,10 @@
       }
   
       public boolean isSequenceComplete(String seqId) {
  -        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
  +        return false;  
       }
   
  -
  +    public void sendAck(String sequenceId) {
  +        accessor.sendAck(sequenceId);
  +    }
   }
  \ No newline at end of file
  
  
  
  1.15      +6 -0      
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
  
  Index: ISandeshaDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- ISandeshaDAO.java 25 May 2005 10:09:00 -0000      1.14
  +++ ISandeshaDAO.java 6 Jun 2005 18:23:32 -0000       1.15
  @@ -337,4 +337,10 @@
       public boolean isOutgoingTerminateSent(String seqId);
   
      public  boolean isIncommingTerminateReceived(String seqId);
  +   
  +   public void updateFinalMessageArrivedTime(String sequenceID);
  +   
  +   public void sendAck(String sequenceId);
  +   
  +   public void removeAllAcks(String sequenceID);
   }
  \ No newline at end of file
  
  
  
  1.15      +9 -0      
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
  
  Index: SandeshaDatabaseDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- SandeshaDatabaseDAO.java  24 May 2005 06:07:40 -0000      1.14
  +++ SandeshaDatabaseDAO.java  6 Jun 2005 18:23:32 -0000       1.15
  @@ -19,6 +19,7 @@
   import org.apache.axis.components.logger.LogFactory;
   import org.apache.commons.logging.Log;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.storage.queue.SandeshaQueue;
   
   import java.util.Iterator;
   import java.util.Set;
  @@ -305,4 +306,12 @@
           return false;  //To change body of implemented methods use File | 
Settings | File Templates.
       }
   
  +    public void updateFinalMessageArrivedTime(String sequenceID) {
  +    }
  +    
  +    public void sendAck(String sequenceId) {
  +    }
  +    
  +    public void removeAllAcks(String sequenceID){
  +    }
   }
  \ No newline at end of file
  
  
  
  1.16      +15 -1     
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
  
  Index: SandeshaQueueDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- SandeshaQueueDAO.java     24 May 2005 06:07:40 -0000      1.15
  +++ SandeshaQueueDAO.java     6 Jun 2005 18:23:32 -0000       1.16
  @@ -421,5 +421,19 @@
            SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
           return sq.isIncommingTerminateReceived(seqId);
       }
  -
  +    
  +    public void updateFinalMessageArrivedTime(String sequenceID) {
  +        SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
  +        sq.updateFinalMessageArrivedTime(sequenceID);
  +    }
  +    
  +    public void sendAck(String sequenceId) {
  +        SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
  +        sq.sendAck(sequenceId);
  +    }
  +    
  +    public void removeAllAcks(String sequenceID){
  +        SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
  +        sq.removeAllAcks(sequenceID);
  +    }
   }
  
  
  
  1.9       +28 -1     
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
  
  Index: IncomingSequence.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- IncomingSequence.java     16 May 2005 06:08:17 -0000      1.8
  +++ IncomingSequence.java     6 Jun 2005 18:23:32 -0000       1.9
  @@ -20,6 +20,7 @@
   import org.apache.axis.components.logger.LogFactory;
   import org.apache.axis.message.addressing.RelatesTo;
   import org.apache.commons.logging.Log;
  +import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
   
   import java.util.*;
  @@ -43,8 +44,34 @@
       private long lastMsgNo = -1;
       private String acksTo = null;
       private String offer;
  -    //private Set msgNumbers;
  +    private long finalMsgArrivedTime = 0;   //this is the time the latest 
application msg was arrived (
  +    private long finalAckedTime = 0;
  +    private boolean sendAck = false;
   
  +    public long getFinalAckedTime() {
  +        return finalAckedTime;
  +    }
  +
  +    public void setFinalAckedTime(long finalAckedTime) {
  +        this.finalAckedTime = finalAckedTime;
  +    }
  +    
  +    public long getFinalMsgArrivedTime() {
  +        return finalMsgArrivedTime;
  +    }
  +
  +    public void setFinalMsgArrivedTime(long finalMsgArrivedTime) {
  +        this.finalMsgArrivedTime = finalMsgArrivedTime;
  +    }
  +
  +    public boolean isSendAck() {
  +        return sendAck;
  +    }
  +
  +    public void setSendAck(boolean sendAck) {
  +        this.sendAck = sendAck;
  +    }
  +    
       public String getOffer() {
           return offer;
       }
  
  
  
  1.23      +83 -2     
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
  
  Index: SandeshaQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
  retrieving revision 1.22
  retrieving revision 1.23
  diff -u -r1.22 -r1.23
  --- SandeshaQueue.java        24 May 2005 06:07:40 -0000      1.22
  +++ SandeshaQueue.java        6 Jun 2005 18:23:32 -0000       1.23
  @@ -23,9 +23,12 @@
   import org.apache.commons.logging.Log;
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.util.PolicyLoader;
   import org.apache.sandesha.ws.rm.AcknowledgementRange;
   import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
   
  +import sun.nio.cs.HistoricallyNamedCharset;
  +
   import java.util.*;
   
   
  @@ -321,7 +324,44 @@
   
                                   }
                                   break;
  -
  +                            case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
  +                                
  +                                //acks are send in the folowing manner.
  +                                //If a ack the system has asked to send a 
ack (sequence.sendAck==true)
  +                                //then send it immediately.
  +                                //Also send a ack when a interval 
(ACKNOWLEDGEMENT_INTERVAL) has passed
  +                                //since last message arrived.
  +                                
  +                                String sequenceId = tempMsg.getSequenceID();
  +                             if(sequenceId==null)
  +                                 continue;
  +                             
  +                             String key = 
getKeyFromIncomingSequenceId(sequenceId);
  +                             IncomingSequence sequence = (IncomingSequence) 
incomingMap.get(key);
  +                             if(sequence==null)
  +                                 continue;
  +                             
  +                             d = new Date ();
  +                             currentTime = d.getTime();
  +                             
  +                             if(sequence.isSendAck()){
  +                                 
  +                                    tempMsg.setLastSentTime(currentTime);
  +                                    msg = tempMsg;
  +                                    sequence.setSendAck(false);
  +                                    sequence.setFinalAckedTime(currentTime);
  +                                    break forLoop;
  +                                    
  +                             }else{
  +                                     long ackInterval = 
PolicyLoader.getInstance().getAcknowledgementInterval();
  +                                     long finalAckedTime = 
sequence.getFinalAckedTime();
  +                                     long finalMsgArrivedTime = 
sequence.getFinalMsgArrivedTime();
  +                                      
  +                                     if((finalMsgArrivedTime>finalAckedTime) 
&& (currentTime>finalMsgArrivedTime+ackInterval))
  +                                         sequence.setSendAck(true);
  +                             }
  +                             
  +                                break;
                               default:
                                   highPriorityQueue.remove(i);
                                   queueBin.put(tempMsg.getMessageID(), 
tempMsg);
  @@ -576,11 +616,18 @@
       }
   
       public void movePriorityMsgToBin(String messageId) {
  +      
           synchronized (highPriorityQueue) {
               int size = highPriorityQueue.size();
               for (int i = 0; i < size; i++) {
                   RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(i);
  -                String tempMsgId = (String) msg.getMessageIdList().get(0);
  +                
  +                String tempMsgId;
  +                try{
  +                    tempMsgId = (String) msg.getMessageIdList().get(0);
  +                }catch(Exception ex){
  +                     tempMsgId = msg.getMessageID();
  +                }
                   if (tempMsgId.equals(messageId)) {
                       highPriorityQueue.remove(i);
                       queueBin.put(messageId, msg);
  @@ -1068,6 +1115,40 @@
           }
   
       }
  +    
  +    public void updateFinalMessageArrivedTime(String sequenceId){
  +        synchronized (incomingMap) {
  +            IncomingSequence ics = (IncomingSequence) 
incomingMap.get(sequenceId);
  +            if(ics==null)
  +                return;
  +            
  +            Date d = new Date();
  +            long time = d.getTime();
  +            ics.setFinalMsgArrivedTime(time);
  +        }
  +    }
  +    
  +    public void sendAck(String sequenceId){
  +        synchronized (incomingMap) {
  +            IncomingSequence ics = (IncomingSequence) 
incomingMap.get(sequenceId);
  +            if(ics==null)
  +                return;
  +            
  +           ics.setSendAck(true);
  +        }
  +    }
  +    
  +    public void removeAllAcks (String sequenceID){
  +        synchronized (highPriorityQueue){
  +            int size = highPriorityQueue.size();
  +            
  +            for(int i=0;i<size;i++){
  +                RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(i);
  +                if(msg.getSequenceID().equals(sequenceID) && 
msg.getMessageType()==Constants.MSG_TYPE_ACKNOWLEDGEMENT)
  +                    highPriorityQueue.remove(i);
  +            }
  +        }
  +    }
   
   
   }
  
  
  

Reply via email to