This is an automated email from the ASF dual-hosted git repository. jgallimore pushed a commit to branch tomee-7.0.x in repository https://gitbox.apache.org/repos/asf/tomee.git
The following commit(s) were added to refs/heads/tomee-7.0.x by this push: new dcc15d0 TOMEE-2859 set instancelimit correctly on the container dcc15d0 is described below commit dcc15d09238cf585ec707e45eea944e5b553ba3b Author: Jonathan Gallimore <j...@jrg.me.uk> AuthorDate: Mon Jul 6 20:37:46 2020 +0100 TOMEE-2859 set instancelimit correctly on the container --- .../org/apache/openejb/core/mdb/MdbContainer.java | 51 +++++++- .../openejb/core/mdb/MdbContainerFactory.java | 2 +- .../core/mdb/MaxInstanceEndpointHandlerTest.java | 142 +++++++++++++++++++++ 3 files changed, 188 insertions(+), 7 deletions(-) diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java index bfbc65a..d5e182f 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java @@ -33,10 +33,7 @@ import org.apache.openejb.core.timer.EjbTimerService; import org.apache.openejb.core.transaction.TransactionPolicy; import org.apache.openejb.loader.Options; import org.apache.openejb.loader.SystemInstance; -import org.apache.openejb.monitoring.LocalMBeanServer; -import org.apache.openejb.monitoring.ManagedMBean; -import org.apache.openejb.monitoring.ObjectNameBuilder; -import org.apache.openejb.monitoring.StatsInterceptor; +import org.apache.openejb.monitoring.*; import org.apache.openejb.resource.XAResourceWrapper; import org.apache.openejb.spi.SecurityService; import org.apache.openejb.util.LogCategory; @@ -182,18 +179,19 @@ public class MdbContainer implements RpcContainer, BaseMdbContainer { beanContext.setContainerData(endpointFactory); deployments.put(deploymentId, beanContext); + final MBeanServer server = LocalMBeanServer.get(); + // Create stats interceptor if (StatsInterceptor.isStatsActivated()) { final StatsInterceptor stats = new StatsInterceptor(beanContext.getBeanClass()); beanContext.addFirstSystemInterceptor(stats); - final MBeanServer server = LocalMBeanServer.get(); final ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb.management"); jmxName.set("J2EEServer", "openejb"); jmxName.set("J2EEApplication", null); jmxName.set("EJBModule", beanContext.getModuleID()); - jmxName.set("StatelessSessionBean", beanContext.getEjbName()); + jmxName.set("MessageDrivenBean", beanContext.getEjbName()); jmxName.set("j2eeType", ""); jmxName.set("name", beanContext.getEjbName()); @@ -210,6 +208,29 @@ public class MdbContainer implements RpcContainer, BaseMdbContainer { } } + // Expose InstanceLimit/InstanceCount stats through JMX + { + final ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb.management"); + jmxName.set("J2EEServer", "openejb"); + jmxName.set("J2EEApplication", null); + jmxName.set("EJBModule", beanContext.getModuleID()); + jmxName.set("MessageDrivenBean", beanContext.getEjbName()); + jmxName.set("j2eeType", ""); + jmxName.set("name", beanContext.getEjbName()); + + try { + final ObjectName objectName = jmxName.set("j2eeType", "Instances").build(); + if (server.isRegistered(objectName)) { + server.unregisterMBean(objectName); + } + server.registerMBean(new ManagedMBean(new InstanceMonitor(instanceFactory)), objectName); + endpointFactory.jmxNames.add(objectName); + } catch (final Exception e) { + logger.error("Unable to register MBean ", e); + } + } + + // activate the endpoint CURRENT.set(beanContext); try { @@ -731,4 +752,22 @@ public class MdbContainer implements RpcContainer, BaseMdbContainer { return ATTRIBUTE_LIST; } } + + public static class InstanceMonitor { + private final MdbInstanceFactory instanceFactory; + + public InstanceMonitor(MdbInstanceFactory instanceFactory) { + this.instanceFactory = instanceFactory; + } + + @Managed + public int getInstanceLimit() { + return instanceFactory.getInstanceLimit(); + } + + @Managed + public int getInstanceCount() { + return instanceFactory.getInstanceCount(); + } + } } diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainerFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainerFactory.java index 86b6ce9..f6413f9 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainerFactory.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainerFactory.java @@ -97,7 +97,7 @@ public class MdbContainerFactory { } public void setMaxSize(final int max) { - this.instanceLimit = instanceLimit; + this.instanceLimit = max; this.poolBuilder.setPoolSize(max); } diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MaxInstanceEndpointHandlerTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MaxInstanceEndpointHandlerTest.java new file mode 100644 index 0000000..708038e --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/MaxInstanceEndpointHandlerTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.core.mdb; + +import org.apache.openejb.jee.MessageDrivenBean; +import org.apache.openejb.junit.ApplicationComposer; +import org.apache.openejb.monitoring.LocalMBeanServer; +import org.apache.openejb.testing.Configuration; +import org.apache.openejb.testing.Module; +import org.apache.openejb.testng.PropertiesBuilder; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.*; +import javax.management.ObjectName; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@RunWith(ApplicationComposer.class) +public class MaxInstanceEndpointHandlerTest { + + private static final String TEXT = "foo"; + + @Configuration + public Properties config() { + return new PropertiesBuilder() + + .p("sra", "new://Resource?type=ActiveMQResourceAdapter") + .p("sra.threadPoolSize", "100") + + .p("mdbs", "new://Container?type=MESSAGE") + .p("mdbs.ResourceAdapter", "sra") + .p("mdbs.pool", "false") + .p("mdbs.InstanceLimit", "30") + .p("mdbs.activation.maxSessions", "50") + + .p("cf", "new://Resource?type=javax.jms.ConnectionFactory") + .p("cf.ResourceAdapter", "sra") + .p("cf.TransactionSupport", "none") + .p("cf.ConnectionMaxWaitTime", "30 seconds") + .p("cf.MaxThreadPoolSize", "40") + .build(); + } + + @Module + public MessageDrivenBean jar() { + return new MessageDrivenBean(Listener.class); + } + + @Resource(name = "target") + private Queue destination; + + @Resource(name = "cf") + private ConnectionFactory cf; + + @Before + public void resetLatch() { + Listener.reset(); + } + + @Test + public void shouldSendMessage() throws Exception { + assertNotNull(cf); + + for (int i = 0; i < 100; i++) { + try (final Connection connection = cf.createConnection(); final Session session = connection.createSession()) { + connection.start(); + final TextMessage textMessage = session.createTextMessage(TEXT); + session.createProducer(destination).send(textMessage); + } + } + + // start MDB delivery + setControl("start"); + + assertTrue(Listener.sync()); + assertEquals(30, Listener.COUNTER.get()); + } + + private void setControl(final String action) throws Exception { + LocalMBeanServer.get().invoke( + new ObjectName("default:type=test"), + action, new Object[0], new String[0]); + } + + @MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "DeliveryActive", propertyValue = "false"), + @ActivationConfigProperty(propertyName = "MdbJMXControl", propertyValue = "default:type=test"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "target"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") + }) + public static class Listener implements MessageListener { + public static CountDownLatch latch; + + static final AtomicLong COUNTER = new AtomicLong(); + + @PostConstruct + public void postConstruct() { + COUNTER.incrementAndGet(); + } + + public static void reset() { + latch = new CountDownLatch(100); + } + + public static boolean sync() throws InterruptedException { + latch.await(1, TimeUnit.MINUTES); + return true; + } + + @Override + public void onMessage(Message message) { + latch.countDown(); + } + } + +} \ No newline at end of file