Hey folks

We added a feature to TomEE a little while back, where we could control MDB
listening states via JMX. This currently only works with ActiveMQ, so I'd
like to propose a patch to make it work and MDB/RAR. In essence, the logic
is exactly the same, but has shifted from ActiveMQResourceAdapter to
MdbContainer. Here's the diff for master showing what I had in mind.

Any thoughts / objections?

Thanks

Jon

diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index a17f342bab..724effd72f 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -44,8 +44,22 @@
 import org.apache.xbean.recipe.ObjectRecipe;
 import org.apache.xbean.recipe.Option;

+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanConstructorInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
 import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 import javax.naming.NamingException;
 import javax.resource.ResourceException;
 import javax.resource.spi.ActivationSpec;
@@ -63,7 +77,9 @@
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;

+import static javax.management.MBeanOperationInfo.ACTION;
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.afterInvoke;
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.createTransactionPolicy;
 import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.handleApplicationException;
@@ -73,9 +89,11 @@
     private static final Logger logger =
Logger.getInstance(LogCategory.OPENEJB,
"org.apache.openejb.util.resources");

     private static final ThreadLocal<BeanContext> CURRENT = new
ThreadLocal<>();
-
     private static final Object[] NO_ARGS = new Object[0];

+    private final Map<BeanContext, ObjectName> mbeanNames = new
ConcurrentHashMap<>();
+    private final Map<BeanContext, MdbActivationContext>
activationContexts = new ConcurrentHashMap<>();
+
     private final Object containerID;
     private final SecurityService securityService;
     private final ResourceAdapter resourceAdapter;
@@ -188,7 +206,18 @@ public void deploy(final BeanContext beanContext)
throws OpenEJBException {
         // activate the endpoint
         CURRENT.set(beanContext);
         try {
-            resourceAdapter.endpointActivation(endpointFactory,
activationSpec);
+
+            final MdbActivationContext activationContext = new
MdbActivationContext(resourceAdapter, endpointFactory,
activationSpec);
+            activationContexts.put(beanContext, activationContext);
+
+            final boolean activeOnStartup =
Boolean.parseBoolean(beanContext.getProperties().getProperty("MdbActiveOnStartup",
"true"));
+            if (activeOnStartup) {
+                activationContext.start();
+            }
+
+            final String jmxName =
beanContext.getProperties().getProperty("MdbJMXControl", "true");
+            addJMxControl(beanContext, jmxName, activationContext);
+
         } catch (final ResourceException e) {
             // activation failed... clean up
             beanContext.setContainer(null);
@@ -201,6 +230,10 @@ public void deploy(final BeanContext beanContext)
throws OpenEJBException {
         }
     }

+    private static String getOrDefault(final Map<String, String> map,
final String key, final String defaultValue) {
+        return map.get(key) != null ? map.get(key) : defaultValue;
+    }
+
     private ActivationSpec createActivationSpec(final BeanContext
beanContext) throws OpenEJBException {
         try {
             // initialize the object recipe
@@ -253,7 +286,6 @@ private ActivationSpec createActivationSpec(final
BeanContext beanContext) throw
                 logger.debug("No Validator bound to JNDI context");
             }

-
             // set the resource adapter into the activation spec
             activationSpec.setResourceAdapter(resourceAdapter);

@@ -284,7 +316,17 @@ public void undeploy(final BeanContext
beanContext) throws OpenEJBException {
             if (endpointFactory != null) {
                 CURRENT.set(beanContext);
                 try {
-
resourceAdapter.endpointDeactivation(endpointFactory,
endpointFactory.getActivationSpec());
+
+                    final ObjectName jmxBeanToRemove =
mbeanNames.remove(beanContext);
+                    if (jmxBeanToRemove != null) {
+                        LocalMBeanServer.unregisterSilently(jmxBeanToRemove);
+                        logger.info("Undeployed MDB control for " +
beanContext.getDeploymentID());
+                    }
+
+                    final MdbActivationContext activationContext =
activationContexts.remove(beanContext);
+                    if (activationContext != null &&
activationContext.isStarted()) {
+
resourceAdapter.endpointDeactivation(endpointFactory,
endpointFactory.getActivationSpec());
+                    }
                 } finally {
                     CURRENT.remove();
                 }
@@ -492,6 +534,30 @@ public void release(final BeanContext deployInfo,
final Object instance) {
         }
     }

+    private void addJMxControl(final BeanContext current, final
String name, final MdbActivationContext activationContext) throws
ResourceException {
+        if (name == null || "false".equalsIgnoreCase(name)) {
+            return;
+        }
+
+        final ObjectName jmxName;
+        try {
+            jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
+                    .set("J2EEServer", "openejb")
+                    .set("J2EEApplication", null)
+                    .set("EJBModule", current.getModuleID())
+                    .set("StatelessSessionBean", current.getEjbName())
+                    .set("j2eeType", "control")
+                    .set("name", current.getEjbName())
+                    .build() : new ObjectName(name);
+        } catch (final MalformedObjectNameException e) {
+            throw new IllegalArgumentException(e);
+        }
+        mbeanNames.put(current, jmxName);
+
+        LocalMBeanServer.registerSilently(new
MdbJmxControl(activationContext), jmxName);
+        logger.info("Deployed MDB control for " +
current.getDeploymentID() + " on " + jmxName);
+    }
+
     public static BeanContext current() {
         final BeanContext beanContext = CURRENT.get();
         if (beanContext == null) {
@@ -505,4 +571,115 @@ public static BeanContext current() {
         private TransactionPolicy txPolicy;
         private ThreadContext oldCallContext;
     }
+
+    private static class MdbActivationContext {
+        private final ResourceAdapter resourceAdapter;
+        private final EndpointFactory endpointFactory;
+        private final ActivationSpec activationSpec;
+
+        private AtomicBoolean started = new AtomicBoolean(false);
+
+        public MdbActivationContext(final ResourceAdapter
resourceAdapter, final EndpointFactory endpointFactory, final
ActivationSpec activationSpec) {
+            this.resourceAdapter = resourceAdapter;
+            this.endpointFactory = endpointFactory;
+            this.activationSpec = activationSpec;
+        }
+
+        public ResourceAdapter getResourceAdapter() {
+            return resourceAdapter;
+        }
+
+        public EndpointFactory getEndpointFactory() {
+            return endpointFactory;
+        }
+
+        public ActivationSpec getActivationSpec() {
+            return activationSpec;
+        }
+
+        public boolean isStarted() {
+            return started.get();
+        }
+
+        public void start() throws ResourceException {
+            if (started.get()) {
+                return;
+            }
+
+            resourceAdapter.endpointActivation(endpointFactory,
activationSpec);
+            started.set(true);
+        }
+
+        public void stop() {
+            if (! started.get()) {
+                return;
+            }
+
+            resourceAdapter.endpointDeactivation(endpointFactory,
activationSpec);
+            started.set(false);
+        }
+    }
+
+    public static final class MdbJmxControl implements DynamicMBean {
+        private static final AttributeList ATTRIBUTE_LIST = new
AttributeList();
+        private static final MBeanInfo INFO = new MBeanInfo(
+
"org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
+                "Allows to control a MDB (start/stop)",
+                new MBeanAttributeInfo[0],
+                new MBeanConstructorInfo[0],
+                new MBeanOperationInfo[]{
+                        new MBeanOperationInfo("start", "Ensure the
listener is active.", new MBeanParameterInfo[0], "void", ACTION),
+                        new MBeanOperationInfo("stop", "Ensure the
listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
+                },
+                new MBeanNotificationInfo[0]);
+
+        private final MdbActivationContext activationContext;
+
+        private MdbJmxControl(final MdbActivationContext activationContext) {
+            this.activationContext = activationContext;
+        }
+
+        @Override
+        public Object invoke(final String actionName, final Object[]
params, final String[] signature) throws MBeanException,
ReflectionException {
+            if (actionName.equals("stop")) {
+                activationContext.stop();
+
+            } else if (actionName.equals("start")) {
+                try {
+                    activationContext.start();
+                } catch (ResourceException e) {
+                    throw new MBeanException(new
IllegalStateException(e.getMessage()));
+                }
+
+            } else {
+                throw new MBeanException(new
IllegalStateException("unsupported operation: " + actionName));
+            }
+            return null;
+        }
+
+        @Override
+        public MBeanInfo getMBeanInfo() {
+            return INFO;
+        }
+
+        @Override
+        public Object getAttribute(final String attribute) throws
AttributeNotFoundException, MBeanException, ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public void setAttribute(final Attribute attribute) throws
AttributeNotFoundException, InvalidAttributeValueException,
MBeanException, ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public AttributeList getAttributes(final String[] attributes) {
+            return ATTRIBUTE_LIST;
+        }
+
+        @Override
+        public AttributeList setAttributes(final AttributeList attributes) {
+            return ATTRIBUTE_LIST;
+        }
+    }
 }
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index cd44b79ba8..e5f201e3c5 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -29,8 +29,6 @@
 import org.apache.openejb.BeanContext;
 import org.apache.openejb.core.mdb.MdbContainer;
 import org.apache.openejb.loader.SystemInstance;
-import org.apache.openejb.monitoring.LocalMBeanServer;
-import org.apache.openejb.monitoring.ObjectNameBuilder;
 import org.apache.openejb.resource.AutoConnectionTracker;
 import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory;
 import org.apache.openejb.resource.activemq.jms2.TomEEManagedConnectionProxy;
@@ -44,28 +42,11 @@

 import javax.jms.Connection;
 import javax.jms.JMSException;
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.AttributeNotFoundException;
-import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanConstructorInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanNotificationInfo;
-import javax.management.MBeanOperationInfo;
-import javax.management.MBeanParameterInfo;
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-import javax.management.ReflectionException;
 import javax.naming.NamingException;
-import javax.resource.NotSupportedException;
 import javax.resource.ResourceException;
-import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
 import javax.resource.spi.ResourceAdapterInternalException;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
@@ -78,8 +59,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;

-import static javax.management.MBeanOperationInfo.ACTION;
-
 @SuppressWarnings("UnusedDeclaration")
 public class ActiveMQResourceAdapter extends
org.apache.activemq.ra.ActiveMQResourceAdapter {

@@ -190,65 +169,6 @@ private void createInternalBroker(final String
brokerXmlConfig, final Properties
         }
     }

-    @Override
-    public void endpointActivation(final MessageEndpointFactory
endpointFactory, final ActivationSpec activationSpec) throws
ResourceException {
-        final BeanContext current = MdbContainer.current();
-        if (current != null &&
"false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup")))
{
-            if (!equals(activationSpec.getResourceAdapter())) {
-                throw new ResourceException("Activation spec not
initialized with this ResourceAdapter instance (" +
activationSpec.getResourceAdapter() + " != " + this + ")");
-            }
-            if (!(activationSpec instanceof MessageActivationSpec)) {
-                throw new NotSupportedException("That type of
ActivationSpec not supported: " + activationSpec.getClass());
-            }
-
-            final ActiveMQEndpointActivationKey key = new
ActiveMQEndpointActivationKey(endpointFactory,
MessageActivationSpec.class.cast(activationSpec));
-            Map.class.cast(Reflections.get(this,
"endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) {
-            });
-            // we dont want that worker.start();
-        } else {
-            super.endpointActivation(endpointFactory, activationSpec);
-        }
-
-        if (current != null) {
-            addJMxControl(current,
current.getProperties().getProperty("MdbJMXControl"));
-        }
-    }
-
-    private void addJMxControl(final BeanContext current, final
String name) throws ResourceException {
-        if (name == null || "false".equalsIgnoreCase(name)) {
-            return;
-        }
-
-        final ActiveMQEndpointWorker worker = getWorker(current);
-        final ObjectName jmxName;
-        try {
-            jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
-                    .set("J2EEServer", "openejb")
-                    .set("J2EEApplication", null)
-                    .set("EJBModule", current.getModuleID())
-                    .set("StatelessSessionBean", current.getEjbName())
-                    .set("j2eeType", "control")
-                    .set("name", current.getEjbName())
-                    .build() : new ObjectName(name);
-        } catch (final MalformedObjectNameException e) {
-            throw new IllegalArgumentException(e);
-        }
-        mbeanNames.put(current, jmxName);
-
-        LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName);
-        log.info("Deployed MDB control for " +
current.getDeploymentID() + " on " + jmxName);
-    }
-
-    @Override
-    public void endpointDeactivation(final MessageEndpointFactory
endpointFactory, final ActivationSpec activationSpec) {
-        final BeanContext current = MdbContainer.current();
-        if (current != null &&
"true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl")))
{
-            LocalMBeanServer.unregisterSilently(mbeanNames.remove(current));
-            log.info("Undeployed MDB control for " +
current.getDeploymentID());
-        }
-        super.endpointDeactivation(endpointFactory, activationSpec);
-    }
-
     private ActiveMQEndpointWorker getWorker(final BeanContext
beanContext) throws ResourceException {
         final Map<ActiveMQEndpointActivationKey,
ActiveMQEndpointWorker> workers = Map.class.cast(Reflections.get(

MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(),
"endpointWorkers"));
@@ -396,72 +316,4 @@ private static void stopScheduler() {
             //Ignore
         }
     }
-
-    public static final class MdbJmxControl implements DynamicMBean {
-        private static final AttributeList ATTRIBUTE_LIST = new
AttributeList();
-        private static final MBeanInfo INFO = new MBeanInfo(
-
"org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
-                "Allows to control a MDB (start/stop)",
-                new MBeanAttributeInfo[0],
-                new MBeanConstructorInfo[0],
-                new MBeanOperationInfo[]{
-                        new MBeanOperationInfo("start", "Ensure the
listener is active.", new MBeanParameterInfo[0], "void", ACTION),
-                        new MBeanOperationInfo("stop", "Ensure the
listener is not active.", new MBeanParameterInfo[0], "void", ACTION)
-                },
-                new MBeanNotificationInfo[0]);
-
-        private final ActiveMQEndpointWorker worker;
-
-        private MdbJmxControl(final ActiveMQEndpointWorker worker) {
-            this.worker = worker;
-        }
-
-        @Override
-        public Object invoke(final String actionName, final Object[]
params, final String[] signature) throws MBeanException,
ReflectionException {
-            switch (actionName) {
-                case "stop":
-                    try {
-                        worker.stop();
-                    } catch (final InterruptedException e) {
-                        Thread.interrupted();
-                    }
-                    break;
-                case "start":
-                    try {
-                        worker.start();
-                    } catch (ResourceException e) {
-                        throw new MBeanException(new
IllegalStateException(e.getMessage()));
-                    }
-                    break;
-                default:
-                    throw new MBeanException(new
IllegalStateException("unsupported operation: " + actionName));
-            }
-            return null;
-        }
-
-        @Override
-        public MBeanInfo getMBeanInfo() {
-            return INFO;
-        }
-
-        @Override
-        public Object getAttribute(final String attribute) throws
AttributeNotFoundException, MBeanException, ReflectionException {
-            throw new AttributeNotFoundException();
-        }
-
-        @Override
-        public void setAttribute(final Attribute attribute) throws
AttributeNotFoundException, InvalidAttributeValueException,
MBeanException, ReflectionException {
-            throw new AttributeNotFoundException();
-        }
-
-        @Override
-        public AttributeList getAttributes(final String[] attributes) {
-            return ATTRIBUTE_LIST;
-        }
-
-        @Override
-        public AttributeList setAttributes(final AttributeList attributes) {
-            return ATTRIBUTE_LIST;
-        }
-    }
 }
diff --git 
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
similarity index 96%
rename from 
container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
rename to 
container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
index 7940cd423d..3885dc51de 100644
--- 
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
+++ 
b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/ResourceAdapterControlTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.openejb.resource.activemq;
+package org.apache.openejb.core.mdb;

 import org.apache.openejb.config.EjbModule;
 import org.apache.openejb.jee.EjbJar;
@@ -45,6 +45,7 @@
 import java.util.Properties;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;

 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -53,8 +54,9 @@
 import static org.junit.Assert.fail;

 @RunWith(ApplicationComposer.class)
-public class ActiveMQResourceAdapterControlTest {
-    @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
+public class ResourceAdapterControlTest {
+    private static final Logger logger =
Logger.getLogger(ResourceAdapterControlTest.class.getName());
+    @Resource(name = "ResourceAdapterControlTest/test/ejb/Mdb")
     private Queue queue;

     @Resource

Reply via email to