chamikara    2005/05/08 12:10:52

  Modified:    sandesha/src/org/apache/sandesha IStorageManager.java
               sandesha/src/org/apache/sandesha/client
                        ClientStorageManager.java
               sandesha/src/org/apache/sandesha/server
                        ServerStorageManager.java
               sandesha/src/org/apache/sandesha/storage/queue
                        SandeshaQueue.java
               sandesha/src/org/apache/sandesha/ws/rm/providers
                        RMProvider.java
  Log:
  A concept of adding callbacks to StorageManager and RMProvider was added. 
This will be helpful in monitoring the contro messages going out from and 
coming in to the Sandesha system.
  
  Revision  Changes    Path
  1.21      +5 -0      
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
  
  Index: IStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- IStorageManager.java      7 May 2005 08:14:44 -0000       1.20
  +++ IStorageManager.java      8 May 2005 19:10:51 -0000       1.21
  @@ -19,6 +19,7 @@
   import java.util.Iterator;
   import java.util.Map;
   
  +import org.apache.sandesha.storage.Callback;
   import org.apache.sandesha.storage.queue.IncomingSequence;
   
   /**
  @@ -162,6 +163,10 @@
       
       public String getAcksTo(String seqId);
   
  +     public void setCallback(Callback callBack);
  +     
  +     public void removeCallback();
  +
       void addOffer(String msgID, String offerID);
   
       String getOffer(String msgID);
  
  
  
  1.30      +35 -0     
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
  
  Index: ClientStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
  retrieving revision 1.29
  retrieving revision 1.30
  diff -u -r1.29 -r1.30
  --- ClientStorageManager.java 7 May 2005 08:14:44 -0000       1.29
  +++ ClientStorageManager.java 8 May 2005 19:10:51 -0000       1.30
  @@ -22,6 +22,8 @@
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.IStorageManager;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.storage.Callback;
  +import org.apache.sandesha.storage.CallbackData;
   import org.apache.sandesha.storage.dao.ISandeshaDAO;
   import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
   import org.apache.sandesha.storage.queue.IncomingSequence;
  @@ -43,6 +45,7 @@
               .getName());
   
       private ISandeshaDAO accessor;
  +     private static Callback callBack = null;
   
       /*
        * (non-Javadoc)
  @@ -147,6 +150,10 @@
   
               // checks whether all the request messages hv been acked
           }
  +        System.out.println("GET NEXT MSG TO SEND: invoking callback");
  +        if(callBack != null && msg!=null)
  +             informOutgoingMessage(msg);
  +        System.out.println("GET NEXT MSG TO SEND: end callback");
           return msg;
       }
   
  @@ -218,7 +225,10 @@
                   .getAddressingHeaders().getRelatesTo().get(0);
           String messageId = relatesTo.getURI().toString();
           String sequenceId = null;
  +        
  +        SandeshaQueue.getInstance().displayOutgoingMap();
           sequenceId = accessor.searchForSequenceId(messageId);
  +        
           SandeshaQueue sq = SandeshaQueue.getInstance();
           //sq.displayOutgoingMap();
           boolean exists = accessor.isIncomingSequenceExists(sequenceId);
  @@ -409,4 +419,29 @@
          return accessor.getOffer(msgID);
       }
   
  +     public  void setCallback(Callback cb){
  +                     callBack = cb;
  +     }
  +
  +     public void removeCallback(){
  +             callBack = null;
  +     }
  +
  +     private void informOutgoingMessage(RMMessageContext rmMsgContext){
  +     
  +             CallbackData cbData = new CallbackData ();
  +             
  +             //  setting callback data;
  +             if(rmMsgContext!=null){
  +                     cbData.setSequenceId(rmMsgContext.getSequenceID());
  +                     cbData.setMessageId(rmMsgContext.getMessageID());
  +                     cbData.setMessageType(rmMsgContext.getMessageType());   
 
  +             
  +             }
  +             
  +             if(callBack != null)
  +                     callBack.onOutgoingMessage(cbData);
  +     }
  +
  +
   }
  \ No newline at end of file
  
  
  
  1.27      +11 -0     
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
  
  Index: ServerStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
  retrieving revision 1.26
  retrieving revision 1.27
  diff -u -r1.26 -r1.27
  --- ServerStorageManager.java 7 May 2005 08:14:44 -0000       1.26
  +++ ServerStorageManager.java 8 May 2005 19:10:51 -0000       1.27
  @@ -21,6 +21,7 @@
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.IStorageManager;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.storage.Callback;
   import org.apache.sandesha.storage.dao.ISandeshaDAO;
   import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
   import org.apache.sandesha.storage.queue.SandeshaQueue;
  @@ -51,6 +52,8 @@
       private String tempSeqId = null; // used by getNextMessageToProcess();
       ISandeshaDAO accessor;
   
  +     private static Callback callBack = null;
  +     
       public ServerStorageManager() {
           accessor = 
SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
       }
  @@ -413,6 +416,14 @@
           return accessor.getAcksTo(seqId);
       }
   
  +    public void setCallback(Callback cb){
  +     callBack = cb;
  +    }
  +     
  +    public void removeCallback(){
  +      callBack = null;
  +    }
  +
       public void addOffer(String msgID, String offerID) {
           //To change body of implemented methods use File | Settings | File 
Templates.
       }
  
  
  
  1.16      +6 -0      
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
  
  Index: SandeshaQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- SandeshaQueue.java        7 May 2005 08:14:44 -0000       1.15
  +++ SandeshaQueue.java        8 May 2005 19:10:51 -0000       1.16
  @@ -636,11 +636,17 @@
               if (obj != null) {
                   OutgoingSequence hash = (OutgoingSequence) obj;
                   boolean hasMsg = hash.hasMessageWithId(messageId);
  +
                   if (!hasMsg)
                       key = null;
  +                
  +//                if(hasMsg){
  +//                    break;
  +//                }
               }
   
           }
  +
           return key;
       }
   
  
  
  
  1.46      +36 -1     
ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java
  
  Index: RMProvider.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java,v
  retrieving revision 1.45
  retrieving revision 1.46
  diff -u -r1.45 -r1.46
  --- RMProvider.java   7 May 2005 08:14:44 -0000       1.45
  +++ RMProvider.java   8 May 2005 19:10:52 -0000       1.46
  @@ -21,6 +21,7 @@
   import org.apache.axis.components.logger.LogFactory;
   import org.apache.axis.handlers.soap.SOAPService;
   import org.apache.axis.message.SOAPEnvelope;
  +import org.apache.axis.message.addressing.Action;
   import org.apache.axis.message.addressing.AddressingHeaders;
   import org.apache.axis.providers.java.RPCProvider;
   import org.apache.commons.logging.Log;
  @@ -32,6 +33,9 @@
   import org.apache.sandesha.server.RMMessageProcessorIdentifier;
   import org.apache.sandesha.server.msgprocessors.FaultProcessor;
   import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor;
  +import org.apache.sandesha.storage.Callback;
  +import org.apache.sandesha.storage.CallbackData;
  +import org.apache.sandesha.storage.dao.SandeshaQueueDAO;
   import org.apache.sandesha.ws.rm.RMHeaders;
   
   import java.util.ArrayList;
  @@ -51,8 +55,15 @@
   
       private boolean client;
       private static final Log log = 
LogFactory.getLog(RMProvider.class.getName());
  +     private static Callback callback = null;
   
  -
  +     public static void setCallback(Callback cb){
  +             callback = cb;
  +     }
  +
  +     public static void removeCallback(){
  +             callback = null;
  +     }       
       public void processMessage(MessageContext msgContext, SOAPEnvelope 
reqEnv, SOAPEnvelope resEnv, Object obj)
               throws Exception {
   
  @@ -101,6 +112,11 @@
   
               IRMMessageProcessor rmMessageProcessor = 
RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext, 
storageManager);
   
  +                     if(callback != null){
  +                             CallbackData cbData = 
getCallbackData(rmMessageContext);
  +                             callback.onIncomingMessage(cbData);
  +                     }
  +
               try {
                   if (!rmMessageProcessor.processMessage(rmMessageContext)) {
                       msgContext.setPastPivot(true);
  @@ -157,4 +173,23 @@
           return actionList;
       }
   
  +    //for callback
  +    
  +    private CallbackData getCallbackData(RMMessageContext rmMsgContext){
  +     CallbackData cbData = new CallbackData ();
  +     
  +     //  setting callback data;
  +     cbData.setSequenceId( rmMsgContext.getSequenceID());
  +     cbData.setMessageId(rmMsgContext.getMessageID());
  +     cbData.setMessageType(rmMsgContext.getMessageType());
  +     
  +     Action action = rmMsgContext.getAddressingHeaders().getAction();
  +     if(action!=null){
  +             cbData.setAction(action.toString());
  +     }
  +     
  +     return cbData;
  +    }
  +    //end callback
  +
   }
  \ No newline at end of file
  
  
  

Reply via email to