Hey folks
We added a feature to TomEE a little while back, where we could control MDB
listening states via JMX. This currently only works with ActiveMQ, so I'd
like to propose a patch to make it work and MDB/RAR. In essence, the logic
is exactly the same, but has shifted from ActiveMQResourceAdapter to
MdbContainer. Here's the diff for master showing what I had in mind.
Any thoughts / objections?
Thanks
Jon
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index a17f342bab..724effd72f 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -44,8 +44,22 @@
import org.apache.xbean.recipe.ObjectRecipe;
import org.apache.xbean.recipe.Option;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanConstructorInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
import javax.naming.NamingException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
@@ -63,7 +77,9 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static javax.management.MBeanOperationInfo.ACTION;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
import static
org.apache.openejb.core.transaction.EjbTransactionUtil.handleApplicationException;
@@ -73,9 +89,11 @@
private static final Logger logger =
Logger.getInstance(LogCategory.OPENEJB,
"org.apache.openejb.util.resources");
private static final ThreadLocal<BeanContext> CURRENT = new
ThreadLocal<>();
-
private static final Object[] NO_ARGS = new Object[0];
+ private final Map<BeanContext, ObjectName> mbeanNames = new
ConcurrentHashMap<>();
+ private final Map<BeanContext, MdbActivationContext>
activationContexts = new ConcurrentHashMap<>();
+
private final Object containerID;
private final SecurityService securityService;
private final ResourceAdapter resourceAdapter;
@@ -188,7 +206,18 @@ public void deploy(final BeanContext beanContext)
throws OpenEJBException {
// activate the endpoint
CURRENT.set(beanContext);
try {
- resourceAdapter.endpointActivation(endpointFactory,
activationSpec);
+
+ final MdbActivationContext activationContext = new
MdbActivationContext(resourceAdapter, endpointFactory,
activationSpec);
+ activationContexts.put(beanContext, activationContext);
+
+ final boolean activeOnStartup =
Boolean.parseBoolean(beanContext.getProperties().getProperty("MdbActiveOnStartup",
"true"));
+ if (activeOnStartup) {
+ activationContext.start();
+ }
+
+ final String jmxName =
beanContext.getProperties().getProperty("MdbJMXControl", "true");
+ addJMxControl(beanContext, jmxName, activationContext);
+
} catch (final ResourceException e) {
// activation failed... clean up
beanContext.setContainer(null);
@@ -201,6 +230,10 @@ public void deploy(final BeanContext beanContext)
throws OpenEJBException {
}
}
+ private static String getOrDefault(final Map<String, String> map,
final String key, final String defaultValue) {
+ return map.get(key) != null ? map.get(key) : defaultValue;
+ }
+
private ActivationSpec createActivationSpec(final BeanContext
beanContext) throws OpenEJBException {
try {
// initialize the object recipe
@@ -253,7 +286,6 @@ private ActivationSpec createActivationSpec(final
BeanContext beanContext) throw
logger.debug("No Validator bound to JNDI context");
}
-
// set the resource adapter into the activation spec
activationSpec.setResourceAdapter(resourceAdapter);
@@ -284,7 +316,17 @@ public void undeploy(final BeanContext
beanContext) throws OpenEJBException {
if (endpointFactory != null) {
CURRENT.set(beanContext);
try {
-
resourceAdapter.endpointDeactivation(endpointFactory,
endpointFactory.getActivationSpec());
+
+ final ObjectName jmxBeanToRemove =
mbeanNames.remove(beanContext);
+ if (jmxBeanToRemove != null) {
+ LocalMBeanServer.unregisterSilently(jmxBeanToRemove);
+ logger.info("Undeployed MDB control for " +
beanContext.getDeploymentID());
+ }
+
+ final MdbActivationContext activationContext =
activationContexts.remove(beanContext);
+ if (activationContext != null &&
activationContext.isStarted()) {
+
resourceAdapter.endpointDeactivation(endpointFactory,
endpointFactory.getActivationSpec());
+ }
} finally {
CURRENT.remove();
}
@@ -492,6 +534,30 @@ public void release(final BeanContext deployInfo,
final Object instance) {
}
}
+ private void addJMxControl(final BeanContext current, final
String name, final MdbActivationContext activationContext) throws
ResourceException {
+ if (name == null || "false".equalsIgnoreCase(name)) {
+ return;
+ }
+
+ final ObjectName jmxName;
+ try {
+ jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
+ .set("J2EEServer", "openejb")
+ .set("J2EEApplication", null)
+ .set("EJBModule", current.getModuleID())
+ .set("StatelessSessionBean", current.getEjbName())
+ .set("j2eeType", "control")
+ .set("name", current.getEjbName())
+ .build() : new ObjectName(name);
+ } catch (final MalformedObjectNameException e) {
+ throw new IllegalArgumentException(e);
+ }
+ mbeanNames.put(current, jmxName);
+
+ LocalMBeanServer.registerSilently(new
MdbJmxControl(activationContext), jmxName);
+ logger.info("Deployed MDB control for " +
current.getDeploymentID() + " on " + jmxName);
+ }
+
public static BeanContext current() {
final BeanContext beanContext = CURRENT.get();
if (beanContext == null) {
@@ -505,4 +571,115 @@ public static BeanContext current() {
private TransactionPolicy txPolicy;
private ThreadContext oldCallContext;
}
+
+ private static class MdbActivationContext {
+ private final ResourceAdapter resourceAdapter;
+ private final EndpointFactory endpointFactory;
+ private final ActivationSpec activationSpec;
+
+ private AtomicBoolean started = new AtomicBoolean(false);
+
+ public MdbActivationContext(final ResourceAdapter
resourceAdapter, final EndpointFactory endpointFactory, final
ActivationSpec activationSpec) {
+ this.resourceAdapter = resourceAdapter;
+ this.endpointFactory = endpointFactory;
+ this.activationSpec = activationSpec;
+ }
+
+ public ResourceAdapter getResourceAdapter() {
+ return resourceAdapter;
+ }
+
+ public EndpointFactory getEndpointFactory() {
+ return endpointFactory;
+ }
+
+ public ActivationSpec getActivationSpec() {
+ return activationSpec;
+ }
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ public void start() throws ResourceException {
+ if (started.get()) {
+ return;
+ }
+
+ resourceAdapter.endpointActivation(endpointFactory,
activationSpec);
+ started.set(true);
+ }
+
+ public void stop() {
+ if (! started.get()) {
+ return;
+ }
+
+ resourceAdapter.endpointDeactivation(endpointFactory,
activationSpec);
+ started.set(false);
+ }
+ }
+
+ public static final class MdbJmxControl implements DynamicMBean {
+ private static final AttributeList ATTRIBUTE_LIST = new
AttributeList();
+ private static final MBeanInfo INFO = new MBeanInfo(
+
"org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
+ "Allows to control a MDB (start/stop)",
+ new MBeanAttributeInfo[0],
+ new MBeanConstructorInfo[0],
+ new MBeanOperationInfo[]{
+ new MBeanOperationInfo("start", "Ensure the
listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+ new MBeanOperationInfo("stop", "Ensure the
listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+ },
+ new MBeanNotificationInfo[0]);
+
+ private final MdbActivationContext activationContext;
+
+ private MdbJmxControl(final MdbActivationContext activationContext) {
+ this.activationContext = activationContext;
+ }
+
+ @Override
+ public Object invoke(final String actionName, final Object[]
params, final String[] signature) throws MBeanException,
ReflectionException {
+ if (actionName.equals("stop")) {
+ activationContext.stop();
+
+ } else if (actionName.equals("start")) {
+ try {
+ activationContext.start();
+ } catch (ResourceException e) {
+ throw new MBeanException(new
IllegalStateException(e.getMessage()));
+ }
+
+ } else {
+ throw new MBeanException(new
IllegalStateException("unsupported operation: " + actionName));
+ }
+ return null;
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo() {
+ return INFO;
+ }
+
+ @Override
+ public Object getAttribute(final String attribute) throws
AttributeNotFoundException, MBeanException, ReflectionException {
+ throw new AttributeNotFoundException();
+ }
+
+ @Override
+ public void setAttribute(final Attribute attribute) throws
AttributeNotFoundException, InvalidAttributeValueException,
MBeanException, ReflectionException {
+ throw new AttributeNotFoundException();
+ }
+
+ @Override
+ public AttributeList getAttributes(final String[] attributes) {
+ return ATTRIBUTE_LIST;
+ }
+
+ @Override
+ public AttributeList setAttributes(final AttributeList attributes) {
+ return ATTRIBUTE_LIST;
+ }
+ }
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index cd44b79ba8..e5f201e3c5 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -29,8 +29,6 @@
import org.apache.openejb.BeanContext;
import org.apache.openejb.core.mdb.MdbContainer;
import org.apache.openejb.loader.SystemInstance;
-import org.apache.openejb.monitoring.LocalMBeanServer;
-import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.openejb.resource.AutoConnectionTracker;
import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
import org.apache.openejb.resource.activemq.jms2.TomEEManagedConnectionProxy;
@@ -44,28 +42,11 @@
import javax.jms.Connection;
import javax.jms.JMSException;
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.AttributeNotFoundException;
-import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanConstructorInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanNotificationInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanParameterInfo;
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import javax.management.ReflectionException;
import javax.naming.NamingException;
-import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
-import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapterInternalException;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
@@ -78,8 +59,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import static javax.management.MBeanOperationInfo.ACTION;
-
@SuppressWarnings("UnusedDeclaration")
public class ActiveMQResourceAdapter extends
org.apache.activemq.ra.ActiveMQResourceAdapter {
@@ -190,65 +169,6 @@ private void createInternalBroker(final String
brokerXmlConfig, final Properties
}
}
- @Override
- public void endpointActivation(final MessageEndpointFactory
endpointFactory, final ActivationSpec activationSpec) throws
ResourceException {
- final BeanContext current = MdbContainer.current();
- if (current != null &&
"false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup")))
{
- if (!equals(activationSpec.getResourceAdapter())) {
- throw new ResourceException("Activation spec not
initialized with this ResourceAdapter instance (" +
activationSpec.getResourceAdapter() + " != " + this + ")");
- }
- if (!(activationSpec instanceof MessageActivationSpec)) {
- throw new NotSupportedException("That type of
ActivationSpec not supported: " + activationSpec.getClass());
- }
-
- final ActiveMQEndpointActivationKey key = new
ActiveMQEndpointActivationKey(endpointFactory,
MessageActivationSpec.class.cast(activationSpec));
- Map.class.cast(Reflections.get(this,
"endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
- });
- // we dont want that worker.start();
- } else {
- super.endpointActivation(endpointFactory, activationSpec);
- }
-
- if (current != null) {
- addJMxControl(current,
current.getProperties().getProperty("MdbJMXControl"));
- }
- }
-
- private void addJMxControl(final BeanContext current, final
String name) throws ResourceException {
- if (name == null || "false".equalsIgnoreCase(name)) {
- return;
- }
-
- final ActiveMQEndpointWorker worker = getWorker(current);
- final ObjectName jmxName;
- try {
- jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
- .set("J2EEServer", "openejb")
- .set("J2EEApplication", null)
- .set("EJBModule", current.getModuleID())
- .set("StatelessSessionBean", current.getEjbName())
- .set("j2eeType", "control")
- .set("name", current.getEjbName())
- .build() : new ObjectName(name);
- } catch (final MalformedObjectNameException e) {
- throw new IllegalArgumentException(e);
- }
- mbeanNames.put(current, jmxName);
-
- LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName);
- log.info("Deployed MDB control for " +
current.getDeploymentID() + " on " + jmxName);
- }
-
- @Override
- public void endpointDeactivation(final MessageEndpointFactory
endpointFactory, final ActivationSpec activationSpec) {
- final BeanContext current = MdbContainer.current();
- if (current != null &&
"true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl")))
{
- LocalMBeanServer.unregisterSilently(mbeanNames.remove(current));
- log.info("Undeployed MDB control for " +
current.getDeploymentID());
- }
- super.endpointDeactivation(endpointFactory, activationSpec);
- }
-
private ActiveMQEndpointWorker getWorker(final BeanContext
beanContext) throws ResourceException {
final Map<ActiveMQEndpointActivationKey,
ActiveMQEndpointWorker> workers = Map.class.cast(Reflections.get(
MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(),
"endpointWorkers"));
@@ -396,72 +316,4 @@ private static void stopScheduler() {
//Ignore
}
}
-
- public static final class MdbJmxControl implements DynamicMBean {
- private static final AttributeList ATTRIBUTE_LIST = new
AttributeList();
- private static final MBeanInfo INFO = new MBeanInfo(
-
"org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
- "Allows to control a MDB (start/stop)",
- new MBeanAttributeInfo[0],
- new MBeanConstructorInfo[0],
- new MBeanOperationInfo[]{
- new MBeanOperationInfo("start", "Ensure the
listener is active.", new MBeanParameterInfo[0], "void", ACTION),
- new MBeanOperationInfo("stop", "Ensure the
listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
- },
- new MBeanNotificationInfo[0]);
-
- private final ActiveMQEndpointWorker worker;
-
- private MdbJmxControl(final ActiveMQEndpointWorker worker) {
- this.worker = worker;
- }
-
- @Override
- public Object invoke(final String actionName, final Object[]
params, final String[] signature) throws MBeanException,
ReflectionException {
- switch (actionName) {
- case "stop":
- try {
- worker.stop();
- } catch (final InterruptedException e) {
- Thread.interrupted();
- }
- break;
- case "start":
- try {
- worker.start();
- } catch (ResourceException e) {
- throw new MBeanException(new
IllegalStateException(e.getMessage()));
- }
- break;
- default:
- throw new MBeanException(new
IllegalStateException("unsupported operation: " + actionName));
- }
- return null;
- }
-
- @Override
- public MBeanInfo getMBeanInfo() {
- return INFO;
- }
-
- @Override
- public Object getAttribute(final String attribute) throws
AttributeNotFoundException, MBeanException, ReflectionException {
- throw new AttributeNotFoundException();
- }
-
- @Override
- public void setAttribute(final Attribute attribute) throws
AttributeNotFoundException, InvalidAttributeValueException,
MBeanException, ReflectionException {
- throw new AttributeNotFoundException();
- }
-
- @Override
- public AttributeList getAttributes(final String[] attributes) {
- return ATTRIBUTE_LIST;
- }
-
- @Override
- public AttributeList setAttributes(final AttributeList attributes) {
- return ATTRIBUTE_LIST;
- }
- }
}
diff --git
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
similarity index 96%
rename from
container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
rename to
container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
index 7940cd423d..3885dc51de 100644
---
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
+++
b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.openejb.resource.activemq;
+package org.apache.openejb.core.mdb;
import org.apache.openejb.config.EjbModule;
import org.apache.openejb.jee.EjbJar;
@@ -45,6 +45,7 @@
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -53,8 +54,9 @@
import static org.junit.Assert.fail;
@RunWith(ApplicationComposer.class)
-public class ActiveMQResourceAdapterControlTest {
- @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
+public class ResourceAdapterControlTest {
+ private static final Logger logger =
Logger.getLogger(ResourceAdapterControlTest.class.getName());
+ @Resource(name = "ResourceAdapterControlTest/test/ejb/Mdb")
private Queue queue;
@Resource