jaliya      2005/01/11 05:52:02

  Modified:    sandesha/src/org/apache/sandesha/server
                        AcknowledgementProcessor.java
                        CompositeProcessor.java RMInvoker.java Sender.java
                        ServerStorageManager.java
               sandesha/src/org/apache/sandesha/server/dao IServerDAO.java
                        ServerDatabaseDAO.java ServerQueueDAO.java
               sandesha/src/org/apache/sandesha/server/queue
                        ResponseSequenceHash.java ServerQueue.java
  Log:
  Added the functionality to send the TerminateSeq message from the client
  
  Revision  Changes    Path
  1.12      +1 -0      
ws-fx/sandesha/src/org/apache/sandesha/server/AcknowledgementProcessor.java
  
  Index: AcknowledgementProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/AcknowledgementProcessor.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- AcknowledgementProcessor.java     12 Nov 2004 06:18:25 -0000      1.11
  +++ AcknowledgementProcessor.java     11 Jan 2005 13:51:57 -0000      1.12
  @@ -53,6 +53,7 @@
               AcknowledgementRange ackRange = (AcknowledgementRange) 
ite.next();
               long msgNumber = ackRange.getMinValue();
               while (ackRange.getMaxValue() >= msgNumber) {
  +                storageManger. setAckReceived(seqID, msgNumber);             
   
                   storageManger.setAcknowledged(seqID, msgNumber);
                   msgNumber++;
               }
  
  
  
  1.12      +1 -2      
ws-fx/sandesha/src/org/apache/sandesha/server/CompositeProcessor.java
  
  Index: CompositeProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/CompositeProcessor.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- CompositeProcessor.java   2 Dec 2004 13:15:48 -0000       1.11
  +++ CompositeProcessor.java   11 Jan 2005 13:51:57 -0000      1.12
  @@ -98,8 +98,7 @@
                           //TODO: Add to log.
                       }
   
  -                    System.out
  -                            .println("INFO: Inserting the request message 
....\n");
  +                    System.out.println("INFO: Inserting the request message 
....\n");
                       //Insert the message to the INQUEUE
                       storageManger.insertIncomingMessage(rmMsgContext);
                   }
  
  
  
  1.8       +15 -1     
ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
  
  Index: RMInvoker.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- RMInvoker.java    10 Jan 2005 23:00:09 -0000      1.7
  +++ RMInvoker.java    11 Jan 2005 13:51:57 -0000      1.8
  @@ -74,7 +74,8 @@
                       
                       if(rmMessageContext.isLastMessage()){
                           //Insert Terminate Sequnce.
  -                        
//storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(requestMesssageContext));
 
  +                        System.out.println("LAST MESSAGE IS THERE");
  +                        
storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(rmMessageContext));
                       }
                       rpcProvider.invoke(rmMessageContext.getMsgContext());
   
  @@ -177,5 +178,18 @@
   
           }
   
  +    }
  +
  +      private RMMessageContext getTerminateSeqMessage(RMMessageContext 
rmMessageContext) {
  +        RMMessageContext terSeqRMMsgContext = new RMMessageContext();
  +        MessageContext terSeqMsgContext = new 
MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
  +        terSeqRMMsgContext.setSequenceID(rmMessageContext.getSequenceID());
  +        
terSeqRMMsgContext.setAddressingHeaders(rmMessageContext.getAddressingHeaders());
  +        //RMMessageContext.copyMessageContext(msgContext, messageContext);
  +        
terSeqRMMsgContext.setOutGoingAddress(rmMessageContext.getOutGoingAddress());
  +        terSeqRMMsgContext.setMsgContext(terSeqMsgContext);
  +        
terSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_TERMINATE_SEQUENCE);
  +        // TODO Auto-generated method stub
  +        return terSeqRMMsgContext;
       }
   }
  
  
  
  1.18      +4 -5      ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
  
  Index: Sender.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- Sender.java       10 Jan 2005 23:00:37 -0000      1.17
  +++ Sender.java       11 Jan 2005 13:51:57 -0000      1.18
  @@ -149,11 +149,10 @@
        * @param rmMessageContext
        */
       private void sendTerminateSequenceRequest(RMMessageContext 
rmMessageContext) {
  +       SOAPEnvelope 
terSeqEnv=EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
  +       Message terSeqMsg= new Message(terSeqEnv);       
  +       rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
   
  -        if (rmMessageContext.getMsgContext().getRequestMessage() == null) {
  -            //The code should not come to this point.
  -            System.err.println("ERROR: NULL REQUEST MESSAGE");
  -        } else {
               Call call;
               try {
                   
rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
  @@ -183,7 +182,7 @@
                   // TODO Auto-generated catch block
                   e.printStackTrace();
               }
  -        }
  +
       
       }
   
  
  
  
  1.14      +8 -3      
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
  
  Index: ServerStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- ServerStorageManager.java 11 Jan 2005 07:16:09 -0000      1.13
  +++ ServerStorageManager.java 11 Jan 2005 13:51:57 -0000      1.14
  @@ -109,7 +109,12 @@
           if (msg == null)
               msg = accessor.getNextOutgoingMsgContextToSend();
   
  +        //if(msg==null){
  +        //    msg = accessor.getNextLowPriorityMessageContextToSend();   // 
checks whether all the request messages hv been acked
  +        //}
  +
           return msg;
  +
       }
   
       /**
  @@ -307,7 +312,7 @@
        * @see 
org.apache.sandesha.IStorageManager#insertTerminateSeqMessage(org.apache.sandesha.RMMessageContext)
        */
       public void insertTerminateSeqMessage(RMMessageContext 
terminateSeqMessage) {            
  -             accessor.addPriorityMessage(terminateSeqMessage);
  +             accessor.addLowPriorityMessage(terminateSeqMessage);
       }
   
       /* (non-Javadoc)
  @@ -335,8 +340,8 @@
   
       }
       
  -    public void setAckReceived(RMMessageContext responseMsg) {
  -        accessor.setAckReceived(responseMsg);
  +    public void setAckReceived(String seqId,long msgNo) {
  +        accessor.setAckReceived(seqId,msgNo);
   
       }
   }
  
  
  
  1.12      +1 -1      
ws-fx/sandesha/src/org/apache/sandesha/server/dao/IServerDAO.java
  
  Index: IServerDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/IServerDAO.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- IServerDAO.java   11 Jan 2005 07:18:54 -0000      1.11
  +++ IServerDAO.java   11 Jan 2005 13:51:57 -0000      1.12
  @@ -87,7 +87,7 @@
       
       public void setResponseReceived(RMMessageContext msg);
       
  -    public void setAckReceived(RMMessageContext responseMsg);
  +    public void setAckReceived(String seqId,long msgNo);
       
       public void addLowPriorityMessage(RMMessageContext msg);
       
  
  
  
  1.13      +1 -1      
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java
  
  Index: ServerDatabaseDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerDatabaseDAO.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ServerDatabaseDAO.java    11 Jan 2005 07:20:14 -0000      1.12
  +++ ServerDatabaseDAO.java    11 Jan 2005 13:51:57 -0000      1.13
  @@ -27,7 +27,7 @@
   
   public class ServerDatabaseDAO implements IServerDAO {
   
  -    public void setAckReceived(RMMessageContext responseMsg) {
  +    public void setAckReceived(String seqId,long msgNo) {
           // TODO Auto-generated method stub
   
       }
  
  
  
  1.13      +3 -4      
ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java
  
  Index: ServerQueueDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/dao/ServerQueueDAO.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ServerQueueDAO.java       11 Jan 2005 07:19:44 -0000      1.12
  +++ ServerQueueDAO.java       11 Jan 2005 13:51:57 -0000      1.13
  @@ -443,7 +443,7 @@
           
           ServerQueue sq = ServerQueue.getInstance();
           try{
  -            return sq.nextLowPriorityMessageToSend();
  +            return sq.getLowPriorityMessageIfAcked();
           }catch(Exception e){
               log.error(e);
           }
  @@ -451,9 +451,8 @@
           return null;
       }
       
  -    public void setAckReceived(RMMessageContext responseMsg) {
  +    public void setAckReceived(String seqId,long msgNo) {
           ServerQueue sq = ServerQueue.getInstance();
  -        sq.setAckReceived(responseMsg);
  -
  +        sq.setAckReceived(seqId,msgNo);
       }
   }
  
  
  
  1.13      +35 -15    
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java
  
  Index: ResponseSequenceHash.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ResponseSequenceHash.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ResponseSequenceHash.java 11 Jan 2005 07:21:02 -0000      1.12
  +++ ResponseSequenceHash.java 11 Jan 2005 13:52:01 -0000      1.13
  @@ -269,7 +269,6 @@
       }
       
       public boolean markMessageDeleted(Long messageNo){
  -        System.out.println("********* MARKING AS DELETE");
           if(hash.containsKey(messageNo)){
               markedAsDelete.add(messageNo);
               String msgId = ((RMMessageContext) 
hash.get(messageNo)).getMessageID();
  @@ -339,34 +338,55 @@
                   msg.setAckReceived(true);
           } 
       }
  -    
   
  -    
  +    public void setAckReceived(long msgNo){
  +
  +        RMMessageContext msg = (RMMessageContext) hash.get(new Long(msgNo));
  +        if(msg!=null){
  +            msg.setAckReceived(true);
  +        }else
  +            System.out.println("ERROR: MESSAGE IS NULL IN ResponseSeqHash");
  +
  +    }
  +
       public boolean isAckComplete(){
  -        if(!hasLastMessage())
  +         try{
  +        long lastMsgNo = getLastMessage();
  +        if(lastMsgNo<=0){
               return false;
  -        
  +        }
           Iterator it = hash.keySet().iterator();
  -        
  +        for(long i=1;i<lastMsgNo;i++){
  +            if(!hasMessage(new Long(i))){
  +                return false;
  +            }
  +        }
  +
  +        it = hash.keySet().iterator();
           while(it.hasNext()){
  -            RMMessageContext msg = (RMMessageContext) it.next();
  -            if(!msg.isAckReceived())
  +            RMMessageContext msg = (RMMessageContext) hash.get(it.next());
  +             if(!msg.isAckReceived()){
                   return false;
  +             }
           }
  -        
  +
           return true;
  +         }catch(Exception e){
  +             e.printStackTrace();
  +             return false;
  +         }
       }
       
  -    private boolean hasLastMessage(){
  +    private long getLastMessage(){
           Iterator it = hash.keySet().iterator();
  -        
           while(it.hasNext()){
  -            RMMessageContext msg = (RMMessageContext) it.next();
  -            if(msg.isLastMessage())
  -                return true;
  +            RMMessageContext msg = (RMMessageContext) hash.get(it.next());
  +            if(msg.isLastMessage()){
  +                return msg.getMsgNumber();
  +            }
           }
           
  -        return false;
  +        return -1;
       }
       
   }
  
  
  
  1.13      +38 -48    
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
  
  Index: ServerQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ServerQueue.java  11 Jan 2005 07:22:28 -0000      1.12
  +++ ServerQueue.java  11 Jan 2005 13:52:01 -0000      1.13
  @@ -77,8 +77,7 @@
       /**
        * This will not replace messages automatically.
        */
  -    public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
  -            RMMessageContext msgCon) throws QueueException {
  +    public boolean addMessageToIncomingSequence(String seqId, Long 
messageNo,RMMessageContext msgCon) throws QueueException {
           boolean successful = false;
   
           if (seqId == null || msgCon == null)
  @@ -336,20 +335,6 @@
       }
       
       
  -    public RMMessageContext nextLowPriorityMessageToSend() throws 
QueueException {
  -       
  -        synchronized (lowPriorityQueue)
  -        {
  -            if(lowPriorityQueue.size() > 0){
  -                RMMessageContext msg = (RMMessageContext) 
lowPriorityQueue.get(0);
  -                lowPriorityQueue.remove(0);
  -                return msg;
  -            }    
  -        }
  -        
  -        return null;
  -        
  -    }
   
       /*
        * public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
  @@ -708,7 +693,6 @@
       }
       
       public boolean isRequestMsgPresent(String seqId,String messageId){
  -        System.out.println("******************* "+messageId);
           ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
               
           if (rsh == null) {
  @@ -748,12 +732,9 @@
           Vector msgNumbers = new Vector();
     
           Iterator it = highPriorityQueue.iterator();
  -        System.out.println("################## 1");
  -        while(it.hasNext()){
  -            System.out.println("################## 2 ");
  -            RMMessageContext msg = (RMMessageContext) it.next();
  -            System.out.println("################## 2 -"+msg.getMessageID() );
  -            if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
  +         while(it.hasNext()){
  +           RMMessageContext msg = (RMMessageContext) it.next();
  +                if(msg.getMessageType()!=Constants.MSG_TYPE_ACKNOWLEDGEMENT)
                   continue;
               
               SequenceAcknowledgement seqAck = 
msg.getRMHeaders().getSequenceAcknowledgement();
  @@ -769,9 +750,7 @@
                   AcknowledgementRange ackRng = (AcknowledgementRange) 
ackIt.next();
                   long min = ackRng.getMinValue();
                   long temp=min;
  -                System.out.println("################## min -"+min);
                   while(temp<=ackRng.getMaxValue()){
  -                    System.out.println("################## temp -"+temp);
                       Long lng = new Long(temp);
                       if(!msgNumbers.contains(lng))  //vector cant hv 
duplicate entries.
                           msgNumbers.add(new Long(temp));
  @@ -821,25 +800,27 @@
                   
       }
       
  -    public void setAckReceived(RMMessageContext responseMsg)
  +    public void setAckReceived(String seqId,long msgNo)
       {
  -        String requestMsgID = 
responseMsg.getAddressingHeaders().getRelatesTo().toString();
  -        
  +
           Iterator it = outgoingMap.keySet().iterator();
           
           String key =null;
           while(it.hasNext()){
               key = (String) it.next();
               Object obj=outgoingMap.get(key);
  -           if(obj!=null){
  -                ResponseSequenceHash hash = (ResponseSequenceHash)obj;
  -            boolean hasMsg = hash.hasMessageWithId(requestMsgID);
  -            if(!hasMsg)
  -                //set the property response received
  -                hash.setAckReceived(requestMsgID);
  -            }
  +
  +
  +            if(obj!=null){
  +               ResponseSequenceHash hash = (ResponseSequenceHash)obj;
  +               //System.out.println("************** HASH SEQ IS " + 
hash.getSequenceId() + "      SEQ IS " + seqId + " OUT SEQ IS  " + 
hash.getOutSequenceId());
  +               if(hash.getOutSequenceId().equals(seqId)){
  +
  +                    hash.setAckReceived(msgNo);
  +               }
  +           }
           }
  -                
  +
       }
       
       
  @@ -854,31 +835,40 @@
       }
       
       public RMMessageContext getLowPriorityMessageIfAcked(){
  +
           int size = lowPriorityQueue.size();
  -        
  -        
  +
           RMMessageContext terminateMsg = null;
           for(int i=0;i<size;i++) {
  +
               RMMessageContext temp;
               temp = (RMMessageContext) lowPriorityQueue.get(i);
               String seqId = temp.getSequenceID();
  -            
  -            boolean foundSeq = false;
  -            ResponseSequenceHash hash = null; 
  -            Iterator it1 = outgoingMap.keySet().iterator();
  +           // System.out.println(" HASH NOT FOUND SEQ ID " + seqId);
  +
  +            ResponseSequenceHash hash = null;
  +            hash = (ResponseSequenceHash) outgoingMap.get(seqId);
  +            if(hash==null){
  +                System.out.println("ServerQueue: ERROR: HASH NOT FOUND SEQ 
ID " + seqId);
  +            }
  +
  +            /*Iterator it1 = outgoingMap.keySet().iterator();
               while(it1.hasNext()){
                   hash = (ResponseSequenceHash) it1.next();
  -                if(hash.getSequenceId()==seqId){
  +                if(hash.getOutSequenceId().equals(seqId)){
  +                    System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
FOUND SEQ "+ seqId);
                       foundSeq = true;
                       break;
                   }
  -            }
  -            
  -            if(foundSeq && hash!=null){
  +            } */
  +
  +            if(hash!=null){
                   boolean complete = hash.isAckComplete();
  -                if(complete)
  -                    lowPriorityQueue.remove(i);
  +                 if(complete)
  +                    //lowPriorityQueue.remove(i);
                       terminateMsg = temp;
  +                    terminateMsg.setSequenceID(hash.getOutSequenceId());
  +                    lowPriorityQueue.remove(i);
                       break;
               }
               
  
  
  

Reply via email to