chamikara 2005/05/08 12:10:52
Modified: sandesha/src/org/apache/sandesha IStorageManager.java
sandesha/src/org/apache/sandesha/client
ClientStorageManager.java
sandesha/src/org/apache/sandesha/server
ServerStorageManager.java
sandesha/src/org/apache/sandesha/storage/queue
SandeshaQueue.java
sandesha/src/org/apache/sandesha/ws/rm/providers
RMProvider.java
Log:
A concept of adding callbacks to StorageManager and RMProvider was added.
This will be helpful in monitoring the contro messages going out from and
coming in to the Sandesha system.
Revision Changes Path
1.21 +5 -0
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
Index: IStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- IStorageManager.java 7 May 2005 08:14:44 -0000 1.20
+++ IStorageManager.java 8 May 2005 19:10:51 -0000 1.21
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.Map;
+import org.apache.sandesha.storage.Callback;
import org.apache.sandesha.storage.queue.IncomingSequence;
/**
@@ -162,6 +163,10 @@
public String getAcksTo(String seqId);
+ public void setCallback(Callback callBack);
+
+ public void removeCallback();
+
void addOffer(String msgID, String offerID);
String getOffer(String msgID);
1.30 +35 -0
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -r1.29 -r1.30
--- ClientStorageManager.java 7 May 2005 08:14:44 -0000 1.29
+++ ClientStorageManager.java 8 May 2005 19:10:51 -0000 1.30
@@ -22,6 +22,8 @@
import org.apache.sandesha.Constants;
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.storage.Callback;
+import org.apache.sandesha.storage.CallbackData;
import org.apache.sandesha.storage.dao.ISandeshaDAO;
import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
import org.apache.sandesha.storage.queue.IncomingSequence;
@@ -43,6 +45,7 @@
.getName());
private ISandeshaDAO accessor;
+ private static Callback callBack = null;
/*
* (non-Javadoc)
@@ -147,6 +150,10 @@
// checks whether all the request messages hv been acked
}
+ System.out.println("GET NEXT MSG TO SEND: invoking callback");
+ if(callBack != null && msg!=null)
+ informOutgoingMessage(msg);
+ System.out.println("GET NEXT MSG TO SEND: end callback");
return msg;
}
@@ -218,7 +225,10 @@
.getAddressingHeaders().getRelatesTo().get(0);
String messageId = relatesTo.getURI().toString();
String sequenceId = null;
+
+ SandeshaQueue.getInstance().displayOutgoingMap();
sequenceId = accessor.searchForSequenceId(messageId);
+
SandeshaQueue sq = SandeshaQueue.getInstance();
//sq.displayOutgoingMap();
boolean exists = accessor.isIncomingSequenceExists(sequenceId);
@@ -409,4 +419,29 @@
return accessor.getOffer(msgID);
}
+ public void setCallback(Callback cb){
+ callBack = cb;
+ }
+
+ public void removeCallback(){
+ callBack = null;
+ }
+
+ private void informOutgoingMessage(RMMessageContext rmMsgContext){
+
+ CallbackData cbData = new CallbackData ();
+
+ // setting callback data;
+ if(rmMsgContext!=null){
+ cbData.setSequenceId(rmMsgContext.getSequenceID());
+ cbData.setMessageId(rmMsgContext.getMessageID());
+ cbData.setMessageType(rmMsgContext.getMessageType());
+
+ }
+
+ if(callBack != null)
+ callBack.onOutgoingMessage(cbData);
+ }
+
+
}
\ No newline at end of file
1.27 +11 -0
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -r1.26 -r1.27
--- ServerStorageManager.java 7 May 2005 08:14:44 -0000 1.26
+++ ServerStorageManager.java 8 May 2005 19:10:51 -0000 1.27
@@ -21,6 +21,7 @@
import org.apache.sandesha.Constants;
import org.apache.sandesha.IStorageManager;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.storage.Callback;
import org.apache.sandesha.storage.dao.ISandeshaDAO;
import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
import org.apache.sandesha.storage.queue.SandeshaQueue;
@@ -51,6 +52,8 @@
private String tempSeqId = null; // used by getNextMessageToProcess();
ISandeshaDAO accessor;
+ private static Callback callBack = null;
+
public ServerStorageManager() {
accessor =
SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR);
}
@@ -413,6 +416,14 @@
return accessor.getAcksTo(seqId);
}
+ public void setCallback(Callback cb){
+ callBack = cb;
+ }
+
+ public void removeCallback(){
+ callBack = null;
+ }
+
public void addOffer(String msgID, String offerID) {
//To change body of implemented methods use File | Settings | File
Templates.
}
1.16 +6 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- SandeshaQueue.java 7 May 2005 08:14:44 -0000 1.15
+++ SandeshaQueue.java 8 May 2005 19:10:51 -0000 1.16
@@ -636,11 +636,17 @@
if (obj != null) {
OutgoingSequence hash = (OutgoingSequence) obj;
boolean hasMsg = hash.hasMessageWithId(messageId);
+
if (!hasMsg)
key = null;
+
+// if(hasMsg){
+// break;
+// }
}
}
+
return key;
}
1.46 +36 -1
ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java
Index: RMProvider.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -r1.45 -r1.46
--- RMProvider.java 7 May 2005 08:14:44 -0000 1.45
+++ RMProvider.java 8 May 2005 19:10:52 -0000 1.46
@@ -21,6 +21,7 @@
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.handlers.soap.SOAPService;
import org.apache.axis.message.SOAPEnvelope;
+import org.apache.axis.message.addressing.Action;
import org.apache.axis.message.addressing.AddressingHeaders;
import org.apache.axis.providers.java.RPCProvider;
import org.apache.commons.logging.Log;
@@ -32,6 +33,9 @@
import org.apache.sandesha.server.RMMessageProcessorIdentifier;
import org.apache.sandesha.server.msgprocessors.FaultProcessor;
import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor;
+import org.apache.sandesha.storage.Callback;
+import org.apache.sandesha.storage.CallbackData;
+import org.apache.sandesha.storage.dao.SandeshaQueueDAO;
import org.apache.sandesha.ws.rm.RMHeaders;
import java.util.ArrayList;
@@ -51,8 +55,15 @@
private boolean client;
private static final Log log =
LogFactory.getLog(RMProvider.class.getName());
+ private static Callback callback = null;
-
+ public static void setCallback(Callback cb){
+ callback = cb;
+ }
+
+ public static void removeCallback(){
+ callback = null;
+ }
public void processMessage(MessageContext msgContext, SOAPEnvelope
reqEnv, SOAPEnvelope resEnv, Object obj)
throws Exception {
@@ -101,6 +112,11 @@
IRMMessageProcessor rmMessageProcessor =
RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext,
storageManager);
+ if(callback != null){
+ CallbackData cbData =
getCallbackData(rmMessageContext);
+ callback.onIncomingMessage(cbData);
+ }
+
try {
if (!rmMessageProcessor.processMessage(rmMessageContext)) {
msgContext.setPastPivot(true);
@@ -157,4 +173,23 @@
return actionList;
}
+ //for callback
+
+ private CallbackData getCallbackData(RMMessageContext rmMsgContext){
+ CallbackData cbData = new CallbackData ();
+
+ // setting callback data;
+ cbData.setSequenceId( rmMsgContext.getSequenceID());
+ cbData.setMessageId(rmMsgContext.getMessageID());
+ cbData.setMessageType(rmMsgContext.getMessageType());
+
+ Action action = rmMsgContext.getAddressingHeaders().getAction();
+ if(action!=null){
+ cbData.setAction(action.toString());
+ }
+
+ return cbData;
+ }
+ //end callback
+
}
\ No newline at end of file