Author: hiranya
Date: Sat Aug 3 00:37:18 2013
New Revision: 1509920
URL: http://svn.apache.org/r1509920
Log:
Adding a JMX MBean to monitor the synapse call back store - SYNAPSE-528
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/Axis2SynapseController.java
Sat Aug 3 00:37:18 2013
@@ -348,6 +348,9 @@ public class Axis2SynapseController impl
} catch (IOException e) {
log.error("Error while initializing SNMP", e);
}
+
+
SynapseCallbackReceiver.getInstance().init(serverContextInformation.getSynapseConfiguration(),
+ serverContextInformation);
}
/**
@@ -459,6 +462,8 @@ public class Axis2SynapseController impl
}
}
}
+
+ SynapseCallbackReceiver.getInstance().destroy();
} catch (AxisFault e) {
log.error("Error stopping the Axis2 Environment");
}
@@ -621,7 +626,7 @@ public class Axis2SynapseController impl
}
int pendingTransportThreads = pendingListenerThreads +
pendingSenderThreads;
- int pendingCallbacks = serverContextInformation.getCallbackCount();
+ int pendingCallbacks =
SynapseCallbackReceiver.getInstance().getCallbackCount();
if (pendingCallbacks > 0) {
log.info("Waiting for: " + pendingCallbacks + "
callbacks/replies..");
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/ServerContextInformation.java
Sat Aug 3 00:37:18 2013
@@ -38,8 +38,6 @@ public class ServerContextInformation {
private SynapseConfiguration synapseConfiguration;
/* Keeps the SynapseEnvironment */
private SynapseEnvironment synapseEnvironment;
- /** Callback receiver */
- private SynapseCallbackReceiver synapseCallbackReceiver;
/** State of the server */
private ServerState serverState = ServerState.UNDETERMINED;
/** Reference to the server configuration */
@@ -95,27 +93,7 @@ public class ServerContextInformation {
this.synapseEnvironment = synapseEnvironment;
}
- public SynapseCallbackReceiver getSynapseCallbackReceiver() {
- return synapseCallbackReceiver;
- }
-
- public void setSynapseCallbackReceiver(SynapseCallbackReceiver
synapseCallbackReceiver) {
- this.synapseCallbackReceiver = synapseCallbackReceiver;
- }
-
public ServerConfigurationInformation getServerConfigurationInformation() {
return serverConfigurationInformation;
}
-
- /**
- * Returns the number of current callbacks.
- *
- * @return the number of current callbacks.
- */
- public int getCallbackCount() {
- if (synapseCallbackReceiver != null) {
- return synapseCallbackReceiver.getCallbackCount();
- }
- return 0;
- }
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/AnonymousServiceFactory.java
Sat Aug 3 00:37:18 2013
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
-import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.config.SynapseConfiguration;
import javax.xml.namespace.QName;
@@ -50,8 +49,6 @@ public class AnonymousServiceFactory {
public static final String OUT_IN_OPERATION = "anonOutInOp";
public static final String OUT_ONLY_OPERATION = "anonOutonlyOp";
- private static SynapseCallbackReceiver synapseCallbackReceiver = null;
-
/**
* Creates an AxisService for the requested QoS for sending out messages
* Callers must guarantee that if wsRMon or wsSecOn is required, that
wsAddrOn is also set
@@ -161,7 +158,7 @@ public class AnonymousServiceFactory {
try {
DynamicAxisOperation dynamicOperation =
new DynamicAxisOperation(new QName(OUT_IN_OPERATION));
- dynamicOperation.setMessageReceiver(getCallbackReceiver(synCfg,
axisCfg));
+
dynamicOperation.setMessageReceiver(SynapseCallbackReceiver.getInstance());
AxisMessage inMsg = new AxisMessage();
inMsg.setName("in-message");
inMsg.setParent(dynamicOperation);
@@ -173,7 +170,7 @@ public class AnonymousServiceFactory {
OutOnlyAxisOperation asyncOperation =
new OutOnlyAxisOperation(new QName(OUT_ONLY_OPERATION));
- asyncOperation.setMessageReceiver(getCallbackReceiver(synCfg,
axisCfg));
+
asyncOperation.setMessageReceiver(SynapseCallbackReceiver.getInstance());
AxisMessage outOnlyMsg = new AxisMessage();
outOnlyMsg.setName("out-message");
outOnlyMsg.setParent(asyncOperation);
@@ -193,39 +190,9 @@ public class AnonymousServiceFactory {
} catch (AxisFault e) {
handleException(
- "Error occured while creating an anonymous service for QoS : "
+
+ "Error occurred while creating an anonymous service for QoS :
" +
serviceKey, e);
}
return null;
}
-
- /**
- * Create a single callback receiver if required, and return its reference
- * @param synCfg the Synapse configuration
- * @param axisCfg axis configuration
- * @return the callback receiver thats created or now exists
- */
- private static synchronized SynapseCallbackReceiver getCallbackReceiver(
- SynapseConfiguration synCfg, AxisConfiguration axisCfg) {
-
- if (synapseCallbackReceiver == null) {
- Parameter serverCtxParam =
- axisCfg.getParameter(
- SynapseConstants.SYNAPSE_SERVER_CTX_INFO);
- if (serverCtxParam == null ||
- !(serverCtxParam.getValue() instanceof
ServerContextInformation)) {
- String msg = "ServerContextInformation not found";
- log.error(msg);
- throw new SynapseException(msg);
- }
-
- ServerContextInformation contextInformation =
- (ServerContextInformation) serverCtxParam.getValue();
-
- synapseCallbackReceiver = new SynapseCallbackReceiver(synCfg,
contextInformation);
-
-
contextInformation.setSynapseCallbackReceiver(synapseCallbackReceiver);
- }
- return synapseCallbackReceiver;
- }
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java?rev=1509920&r1=1509919&r2=1509920&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java
Sat Aug 3 00:37:18 2013
@@ -40,6 +40,7 @@ import org.apache.synapse.SynapseExcepti
import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.aspects.statistics.ErrorLogFactory;
import org.apache.synapse.aspects.statistics.StatisticsReporter;
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
import org.apache.synapse.config.SynapseConfigUtils;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.endpoints.Endpoint;
@@ -61,19 +62,47 @@ public class SynapseCallbackReceiver imp
private static final Log log =
LogFactory.getLog(SynapseCallbackReceiver.class);
+ private static final String CALLBACK_STORE_CATEGORY =
"SynapseCallbackStore";
+ private static final String CALLBACK_STORE_NAME = "SynapseCallbackStore";
+
+ private static final SynapseCallbackReceiver instance = new
SynapseCallbackReceiver();
+
/** This is the synchronized callbackStore that maps outgoing messageID's
to callback objects */
private final Map<String, AxisCallback> callbackStore; // will be made
thread safe in the constructor
+ private boolean initialized = false;
+
+ private SynapseCallbackReceiver() {
+ callbackStore = Collections.synchronizedMap(new HashMap<String,
AxisCallback>());
+ }
+
+ /**
+ * Get the singleton SynapseCallbackReceiver instance
+ *
+ * @return A SynapseCallbackReceiver
+ */
+ public static SynapseCallbackReceiver getInstance() {
+ return instance;
+ }
+
/**
- * Create the *single* instance of this class that would be used by all
anonymous services
+ * Initialize the singleton instance of this class that would be used by
all anonymous services
* used for outgoing messaging.
+ *
* @param synCfg the Synapse configuration
* @param contextInformation server runtime information
*/
- public SynapseCallbackReceiver(SynapseConfiguration synCfg,
+ public void init(SynapseConfiguration synCfg,
ServerContextInformation
contextInformation) {
- callbackStore = Collections.synchronizedMap(new HashMap<String,
AxisCallback>());
+ if (initialized) {
+ log.warn("Attempted to re-initialize SynapseCallbackReceiver");
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Initializing SynapseCallbackReceiver");
+ }
// create the Timer object and a TimeoutHandler task
TimeoutHandler timeoutHandler = new TimeoutHandler(callbackStore,
contextInformation);
@@ -83,12 +112,45 @@ public class SynapseCallbackReceiver imp
// schedule timeout handler to run every n seconds (n : specified or
defaults to 15s)
timeOutTimer.schedule(timeoutHandler, 0, timeoutHandlerInterval);
+
+ MBeanRegistrar.getInstance().registerMBean(new
SynapseCallbackStoreView(this),
+ CALLBACK_STORE_CATEGORY, CALLBACK_STORE_NAME);
+ initialized = true;
+ }
+
+ /**
+ * Destroy and cleanup this callback receiver instance
+ */
+ public void destroy() {
+ if (!initialized) {
+ log.warn("Attempted to destroy uninitialized
SynapseCallbackReceiver");
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Destroying SynapseCallbackReceiver");
+ }
+ MBeanRegistrar.getInstance().unRegisterMBean(CALLBACK_STORE_CATEGORY,
+ CALLBACK_STORE_NAME);
+ initialized = false;
}
public int getCallbackCount() {
return callbackStore.size();
}
+ public String[] getPendingCallbacks() {
+ Set<String> keys = callbackStore.keySet();
+ List<String> list = new ArrayList<String>();
+ synchronized (callbackStore) {
+ Iterator<String> iterator = keys.iterator();
+ while (iterator.hasNext()) {
+ list.add(iterator.next());
+ }
+ }
+ return list.toArray(new String[list.size()]);
+ }
+
public void addCallback(String MsgID, AxisCallback callback) {
callbackStore.put(MsgID, callback);
if (log.isDebugEnabled()) {
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java?rev=1509920&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreView.java
Sat Aug 3 00:37:18 2013
@@ -0,0 +1,18 @@
+package org.apache.synapse.core.axis2;
+
+public class SynapseCallbackStoreView implements SynapseCallbackStoreViewMBean
{
+
+ private SynapseCallbackReceiver receiver;
+
+ public SynapseCallbackStoreView(SynapseCallbackReceiver receiver) {
+ this.receiver = receiver;
+ }
+
+ public int getCallbackCount() {
+ return receiver.getCallbackCount();
+ }
+
+ public String[] getPendingCallbacks() {
+ return receiver.getPendingCallbacks();
+ }
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java?rev=1509920&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackStoreViewMBean.java
Sat Aug 3 00:37:18 2013
@@ -0,0 +1,22 @@
+package org.apache.synapse.core.axis2;
+
+/**
+ * JMX MBean interface for monitoring the Synapse callback store.
+ */
+public interface SynapseCallbackStoreViewMBean {
+
+ /**
+ * Get the number of pending callbacks in Synapse callback store
+ *
+ * @return An integer
+ */
+ public int getCallbackCount();
+
+ /**
+ * Get the IDs (message IDs) of the pending callbacks in Synapse callback
store
+ *
+ * @return An array of strings
+ */
+ public String[] getPendingCallbacks();
+
+}