Forgot to mention, the JIRA for the original feature was TOMEE-2021: https://issues.apache.org/jira/browse/TOMEE-2021
Thanks Jon On Mon, Jun 26, 2017 at 11:54 AM, Jonathan Gallimore < jonathan.gallim...@gmail.com> wrote: > Hey folks > > We added a feature to TomEE a little while back, where we could control > MDB listening states via JMX. This currently only works with ActiveMQ, so > I'd like to propose a patch to make it work and MDB/RAR. In essence, the > logic is exactly the same, but has shifted from ActiveMQResourceAdapter to > MdbContainer. Here's the diff for master showing what I had in mind. > > Any thoughts / objections? > > Thanks > > Jon > > diff --git > a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java > > b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java > index a17f342bab..724effd72f 100644 > --- > a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java > +++ > b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java > @@ -44,8 +44,22 @@ > import org.apache.xbean.recipe.ObjectRecipe; > import org.apache.xbean.recipe.Option; > > +import javax.management.Attribute; > +import javax.management.AttributeList; > +import javax.management.AttributeNotFoundException; > +import javax.management.DynamicMBean; > +import javax.management.InvalidAttributeValueException; > +import javax.management.MBeanAttributeInfo; > +import javax.management.MBeanConstructorInfo; > +import javax.management.MBeanException; > +import javax.management.MBeanInfo; > +import javax.management.MBeanNotificationInfo; > +import javax.management.MBeanOperationInfo; > +import javax.management.MBeanParameterInfo; > import javax.management.MBeanServer; > +import javax.management.MalformedObjectNameException; > import javax.management.ObjectName; > +import javax.management.ReflectionException; > import javax.naming.NamingException; > import javax.resource.ResourceException; > import javax.resource.spi.ActivationSpec; > @@ -63,7 +77,9 @@ > import java.util.TreeSet; > import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.ConcurrentMap; > +import java.util.concurrent.atomic.AtomicBoolean; > > +import static javax.management.MBeanOperationInfo.ACTION; > import static > org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke; > import static > org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy; > import static > org.apache.openejb.core.transaction.EjbTransactionUtil.handleApplicationException; > @@ -73,9 +89,11 @@ > private static final Logger logger = > Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); > > private static final ThreadLocal<BeanContext> CURRENT = new > ThreadLocal<>(); > - > private static final Object[] NO_ARGS = new Object[0]; > > + private final Map<BeanContext, ObjectName> mbeanNames = new > ConcurrentHashMap<>(); > + private final Map<BeanContext, MdbActivationContext> activationContexts > = new ConcurrentHashMap<>(); > + > private final Object containerID; > private final SecurityService securityService; > private final ResourceAdapter resourceAdapter; > @@ -188,7 +206,18 @@ public void deploy(final BeanContext beanContext) throws > OpenEJBException { > // activate the endpoint > CURRENT.set(beanContext); > try { > - resourceAdapter.endpointActivation(endpointFactory, > activationSpec); > + > + final MdbActivationContext activationContext = new > MdbActivationContext(resourceAdapter, endpointFactory, activationSpec); > + activationContexts.put(beanContext, activationContext); > + > + final boolean activeOnStartup = > Boolean.parseBoolean(beanContext.getProperties().getProperty("MdbActiveOnStartup", > "true")); > + if (activeOnStartup) { > + activationContext.start(); > + } > + > + final String jmxName = > beanContext.getProperties().getProperty("MdbJMXControl", "true"); > + addJMxControl(beanContext, jmxName, activationContext); > + > } catch (final ResourceException e) { > // activation failed... clean up > beanContext.setContainer(null); > @@ -201,6 +230,10 @@ public void deploy(final BeanContext beanContext) throws > OpenEJBException { > } > } > > + private static String getOrDefault(final Map<String, String> map, final > String key, final String defaultValue) { > + return map.get(key) != null ? map.get(key) : defaultValue; > + } > + > private ActivationSpec createActivationSpec(final BeanContext > beanContext) throws OpenEJBException { > try { > // initialize the object recipe > @@ -253,7 +286,6 @@ private ActivationSpec createActivationSpec(final > BeanContext beanContext) throw > logger.debug("No Validator bound to JNDI context"); > } > > - > // set the resource adapter into the activation spec > activationSpec.setResourceAdapter(resourceAdapter); > > @@ -284,7 +316,17 @@ public void undeploy(final BeanContext beanContext) > throws OpenEJBException { > if (endpointFactory != null) { > CURRENT.set(beanContext); > try { > - resourceAdapter.endpointDeactivation(endpointFactory, > endpointFactory.getActivationSpec()); > + > + final ObjectName jmxBeanToRemove = > mbeanNames.remove(beanContext); > + if (jmxBeanToRemove != null) { > + LocalMBeanServer.unregisterSilently(jmxBeanToRemove); > + logger.info("Undeployed MDB control for " + > beanContext.getDeploymentID()); > + } > + > + final MdbActivationContext activationContext = > activationContexts.remove(beanContext); > + if (activationContext != null && > activationContext.isStarted()) { > + > resourceAdapter.endpointDeactivation(endpointFactory, > endpointFactory.getActivationSpec()); > + } > } finally { > CURRENT.remove(); > } > @@ -492,6 +534,30 @@ public void release(final BeanContext deployInfo, final > Object instance) { > } > } > > + private void addJMxControl(final BeanContext current, final String name, > final MdbActivationContext activationContext) throws ResourceException { > + if (name == null || "false".equalsIgnoreCase(name)) { > + return; > + } > + > + final ObjectName jmxName; > + try { > + jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder() > + .set("J2EEServer", "openejb") > + .set("J2EEApplication", null) > + .set("EJBModule", current.getModuleID()) > + .set("StatelessSessionBean", current.getEjbName()) > + .set("j2eeType", "control") > + .set("name", current.getEjbName()) > + .build() : new ObjectName(name); > + } catch (final MalformedObjectNameException e) { > + throw new IllegalArgumentException(e); > + } > + mbeanNames.put(current, jmxName); > + > + LocalMBeanServer.registerSilently(new > MdbJmxControl(activationContext), jmxName); > + logger.info("Deployed MDB control for " + current.getDeploymentID() > + " on " + jmxName); > + } > + > public static BeanContext current() { > final BeanContext beanContext = CURRENT.get(); > if (beanContext == null) { > @@ -505,4 +571,115 @@ public static BeanContext current() { > private TransactionPolicy txPolicy; > private ThreadContext oldCallContext; > } > + > + private static class MdbActivationContext { > + private final ResourceAdapter resourceAdapter; > + private final EndpointFactory endpointFactory; > + private final ActivationSpec activationSpec; > + > + private AtomicBoolean started = new AtomicBoolean(false); > + > + public MdbActivationContext(final ResourceAdapter resourceAdapter, > final EndpointFactory endpointFactory, final ActivationSpec activationSpec) { > + this.resourceAdapter = resourceAdapter; > + this.endpointFactory = endpointFactory; > + this.activationSpec = activationSpec; > + } > + > + public ResourceAdapter getResourceAdapter() { > + return resourceAdapter; > + } > + > + public EndpointFactory getEndpointFactory() { > + return endpointFactory; > + } > + > + public ActivationSpec getActivationSpec() { > + return activationSpec; > + } > + > + public boolean isStarted() { > + return started.get(); > + } > + > + public void start() throws ResourceException { > + if (started.get()) { > + return; > + } > + > + resourceAdapter.endpointActivation(endpointFactory, > activationSpec); > + started.set(true); > + } > + > + public void stop() { > + if (! started.get()) { > + return; > + } > + > + resourceAdapter.endpointDeactivation(endpointFactory, > activationSpec); > + started.set(false); > + } > + } > + > + public static final class MdbJmxControl implements DynamicMBean { > + private static final AttributeList ATTRIBUTE_LIST = new > AttributeList(); > + private static final MBeanInfo INFO = new MBeanInfo( > + > "org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl", > + "Allows to control a MDB (start/stop)", > + new MBeanAttributeInfo[0], > + new MBeanConstructorInfo[0], > + new MBeanOperationInfo[]{ > + new MBeanOperationInfo("start", "Ensure the listener > is active.", new MBeanParameterInfo[0], "void", ACTION), > + new MBeanOperationInfo("stop", "Ensure the listener > is not active.", new MBeanParameterInfo[0], "void", ACTION) > + }, > + new MBeanNotificationInfo[0]); > + > + private final MdbActivationContext activationContext; > + > + private MdbJmxControl(final MdbActivationContext activationContext) { > + this.activationContext = activationContext; > + } > + > + @Override > + public Object invoke(final String actionName, final Object[] params, > final String[] signature) throws MBeanException, ReflectionException { > + if (actionName.equals("stop")) { > + activationContext.stop(); > + > + } else if (actionName.equals("start")) { > + try { > + activationContext.start(); > + } catch (ResourceException e) { > + throw new MBeanException(new > IllegalStateException(e.getMessage())); > + } > + > + } else { > + throw new MBeanException(new > IllegalStateException("unsupported operation: " + actionName)); > + } > + return null; > + } > + > + @Override > + public MBeanInfo getMBeanInfo() { > + return INFO; > + } > + > + @Override > + public Object getAttribute(final String attribute) throws > AttributeNotFoundException, MBeanException, ReflectionException { > + throw new AttributeNotFoundException(); > + } > + > + @Override > + public void setAttribute(final Attribute attribute) throws > AttributeNotFoundException, InvalidAttributeValueException, MBeanException, > ReflectionException { > + throw new AttributeNotFoundException(); > + } > + > + @Override > + public AttributeList getAttributes(final String[] attributes) { > + return ATTRIBUTE_LIST; > + } > + > + @Override > + public AttributeList setAttributes(final AttributeList attributes) { > + return ATTRIBUTE_LIST; > + } > + } > } > diff --git > a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java > > b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java > index cd44b79ba8..e5f201e3c5 100644 > --- > a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java > +++ > b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java > @@ -29,8 +29,6 @@ > import org.apache.openejb.BeanContext; > import org.apache.openejb.core.mdb.MdbContainer; > import org.apache.openejb.loader.SystemInstance; > -import org.apache.openejb.monitoring.LocalMBeanServer; > -import org.apache.openejb.monitoring.ObjectNameBuilder; > import org.apache.openejb.resource.AutoConnectionTracker; > import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory; > import org.apache.openejb.resource.activemq.jms2.TomEEManagedConnectionProxy; > @@ -44,28 +42,11 @@ > > import javax.jms.Connection; > import javax.jms.JMSException; > -import javax.management.Attribute; > -import javax.management.AttributeList; > -import javax.management.AttributeNotFoundException; > -import javax.management.DynamicMBean; > -import javax.management.InvalidAttributeValueException; > -import javax.management.MBeanAttributeInfo; > -import javax.management.MBeanConstructorInfo; > -import javax.management.MBeanException; > -import javax.management.MBeanInfo; > -import javax.management.MBeanNotificationInfo; > -import javax.management.MBeanOperationInfo; > -import javax.management.MBeanParameterInfo; > -import javax.management.MalformedObjectNameException; > import javax.management.ObjectName; > -import javax.management.ReflectionException; > import javax.naming.NamingException; > -import javax.resource.NotSupportedException; > import javax.resource.ResourceException; > -import javax.resource.spi.ActivationSpec; > import javax.resource.spi.BootstrapContext; > import javax.resource.spi.ResourceAdapterInternalException; > -import javax.resource.spi.endpoint.MessageEndpointFactory; > import java.lang.reflect.InvocationHandler; > import java.lang.reflect.Method; > import java.lang.reflect.Proxy; > @@ -78,8 +59,6 @@ > import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.TimeUnit; > > -import static javax.management.MBeanOperationInfo.ACTION; > - > @SuppressWarnings("UnusedDeclaration") > public class ActiveMQResourceAdapter extends > org.apache.activemq.ra.ActiveMQResourceAdapter { > > @@ -190,65 +169,6 @@ private void createInternalBroker(final String > brokerXmlConfig, final Properties > } > } > > - @Override > - public void endpointActivation(final MessageEndpointFactory > endpointFactory, final ActivationSpec activationSpec) throws > ResourceException { > - final BeanContext current = MdbContainer.current(); > - if (current != null && > "false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup"))) > { > - if (!equals(activationSpec.getResourceAdapter())) { > - throw new ResourceException("Activation spec not initialized > with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + > " != " + this + ")"); > - } > - if (!(activationSpec instanceof MessageActivationSpec)) { > - throw new NotSupportedException("That type of ActivationSpec > not supported: " + activationSpec.getClass()); > - } > - > - final ActiveMQEndpointActivationKey key = new > ActiveMQEndpointActivationKey(endpointFactory, > MessageActivationSpec.class.cast(activationSpec)); > - Map.class.cast(Reflections.get(this, > "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) { > - }); > - // we dont want that worker.start(); > - } else { > - super.endpointActivation(endpointFactory, activationSpec); > - } > - > - if (current != null) { > - addJMxControl(current, > current.getProperties().getProperty("MdbJMXControl")); > - } > - } > - > - private void addJMxControl(final BeanContext current, final String name) > throws ResourceException { > - if (name == null || "false".equalsIgnoreCase(name)) { > - return; > - } > - > - final ActiveMQEndpointWorker worker = getWorker(current); > - final ObjectName jmxName; > - try { > - jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder() > - .set("J2EEServer", "openejb") > - .set("J2EEApplication", null) > - .set("EJBModule", current.getModuleID()) > - .set("StatelessSessionBean", current.getEjbName()) > - .set("j2eeType", "control") > - .set("name", current.getEjbName()) > - .build() : new ObjectName(name); > - } catch (final MalformedObjectNameException e) { > - throw new IllegalArgumentException(e); > - } > - mbeanNames.put(current, jmxName); > - > - LocalMBeanServer.registerSilently(new MdbJmxControl(worker), > jmxName); > - log.info("Deployed MDB control for " + current.getDeploymentID() + " > on " + jmxName); > - } > - > - @Override > - public void endpointDeactivation(final MessageEndpointFactory > endpointFactory, final ActivationSpec activationSpec) { > - final BeanContext current = MdbContainer.current(); > - if (current != null && > "true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl"))) > { > - LocalMBeanServer.unregisterSilently(mbeanNames.remove(current)); > - log.info("Undeployed MDB control for " + > current.getDeploymentID()); > - } > - super.endpointDeactivation(endpointFactory, activationSpec); > - } > - > private ActiveMQEndpointWorker getWorker(final BeanContext beanContext) > throws ResourceException { > final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> > workers = Map.class.cast(Reflections.get( > > MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), > "endpointWorkers")); > @@ -396,72 +316,4 @@ private static void stopScheduler() { > //Ignore > } > } > - > - public static final class MdbJmxControl implements DynamicMBean { > - private static final AttributeList ATTRIBUTE_LIST = new > AttributeList(); > - private static final MBeanInfo INFO = new MBeanInfo( > - > "org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl", > - "Allows to control a MDB (start/stop)", > - new MBeanAttributeInfo[0], > - new MBeanConstructorInfo[0], > - new MBeanOperationInfo[]{ > - new MBeanOperationInfo("start", "Ensure the listener > is active.", new MBeanParameterInfo[0], "void", ACTION), > - new MBeanOperationInfo("stop", "Ensure the listener > is not active.", new MBeanParameterInfo[0], "void", ACTION) > - }, > - new MBeanNotificationInfo[0]); > - > - private final ActiveMQEndpointWorker worker; > - > - private MdbJmxControl(final ActiveMQEndpointWorker worker) { > - this.worker = worker; > - } > - > - @Override > - public Object invoke(final String actionName, final Object[] params, > final String[] signature) throws MBeanException, ReflectionException { > - switch (actionName) { > - case "stop": > - try { > - worker.stop(); > - } catch (final InterruptedException e) { > - Thread.interrupted(); > - } > - break; > - case "start": > - try { > - worker.start(); > - } catch (ResourceException e) { > - throw new MBeanException(new > IllegalStateException(e.getMessage())); > - } > - break; > - default: > - throw new MBeanException(new > IllegalStateException("unsupported operation: " + actionName)); > - } > - return null; > - } > - > - @Override > - public MBeanInfo getMBeanInfo() { > - return INFO; > - } > - > - @Override > - public Object getAttribute(final String attribute) throws > AttributeNotFoundException, MBeanException, ReflectionException { > - throw new AttributeNotFoundException(); > - } > - > - @Override > - public void setAttribute(final Attribute attribute) throws > AttributeNotFoundException, InvalidAttributeValueException, MBeanException, > ReflectionException { > - throw new AttributeNotFoundException(); > - } > - > - @Override > - public AttributeList getAttributes(final String[] attributes) { > - return ATTRIBUTE_LIST; > - } > - > - @Override > - public AttributeList setAttributes(final AttributeList attributes) { > - return ATTRIBUTE_LIST; > - } > - } > } > diff --git > a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java > > b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java > similarity index 96% > rename from > container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java > rename to > container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java > index 7940cd423d..3885dc51de 100644 > --- > a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java > +++ > b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java > @@ -14,7 +14,7 @@ > * See the License for the specific language governing permissions and > * limitations under the License. > */ > -package org.apache.openejb.resource.activemq; > +package org.apache.openejb.core.mdb; > > import org.apache.openejb.config.EjbModule; > import org.apache.openejb.jee.EjbJar; > @@ -45,6 +45,7 @@ > import java.util.Properties; > import java.util.concurrent.Semaphore; > import java.util.concurrent.TimeUnit; > +import java.util.logging.Logger; > > import static org.junit.Assert.assertEquals; > import static org.junit.Assert.assertFalse; > @@ -53,8 +54,9 @@ > import static org.junit.Assert.fail; > > @RunWith(ApplicationComposer.class) > -public class ActiveMQResourceAdapterControlTest { > - @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb") > +public class ResourceAdapterControlTest { > + private static final Logger logger = > Logger.getLogger(ResourceAdapterControlTest.class.getName()); > + @Resource(name = "ResourceAdapterControlTest/test/ejb/Mdb") > private Queue queue; > > @Resource > > > >