Repository: tomee Updated Branches: refs/heads/tomee-1.7.x 24181691b -> 5f6ea1d83
TOMEE-2172 Squashed commit of the following: commit 98141ca28da6411e8b12c54eeb084019f9716dce Author: Jonathan Gallimore <j...@jrg.me.uk> Date: Tue Feb 20 14:55:22 2018 +0000 Add test commit 628798d93a973708c720c531a0fc4eb2284392b4 Author: Jonathan Gallimore <j...@jrg.me.uk> Date: Tue Feb 20 14:40:07 2018 +0000 Ensure instances are returned to the pool / discarded correctly if the transaction times out Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/5f6ea1d8 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/5f6ea1d8 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/5f6ea1d8 Branch: refs/heads/tomee-1.7.x Commit: 5f6ea1d83b8a7320b9aba8bd9aaef8f9ccbf3849 Parents: 2418169 Author: Jonathan Gallimore <j...@jrg.me.uk> Authored: Tue Feb 20 15:15:33 2018 +0000 Committer: Jonathan Gallimore <j...@jrg.me.uk> Committed: Tue Feb 20 15:15:33 2018 +0000 ---------------------------------------------------------------------- .../openejb/core/mdb/MdbInstanceManager.java | 50 +++--- .../openejb/core/mdb/MdbPoolContainer.java | 8 +- .../mdb/TxTimeoutPoolEndpointHandlerTest.java | 167 +++++++++++++++++++ 3 files changed, 201 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/5f6ea1d8/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java index 2c6ee67..f7525b6 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbInstanceManager.java @@ -29,10 +29,7 @@ import org.apache.openejb.core.ThreadContext; import org.apache.openejb.core.interceptor.InterceptorData; import org.apache.openejb.core.interceptor.InterceptorStack; import org.apache.openejb.loader.Options; -import org.apache.openejb.monitoring.LocalMBeanServer; -import org.apache.openejb.monitoring.ManagedMBean; -import org.apache.openejb.monitoring.ObjectNameBuilder; -import org.apache.openejb.monitoring.StatsInterceptor; +import org.apache.openejb.monitoring.*; import org.apache.openejb.spi.SecurityService; import org.apache.openejb.util.DaemonThreadFactory; import org.apache.openejb.util.Duration; @@ -200,21 +197,21 @@ public class MdbInstanceManager { data.setBaseContext(mdbContext); beanContext.setContainerData(data); + final MBeanServer server = LocalMBeanServer.get(); + + final ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb.management"); + jmxName.set("J2EEServer", "openejb"); + jmxName.set("J2EEApplication", null); + jmxName.set("EJBModule", beanContext.getModuleID()); + jmxName.set("MessageDrivenBean", beanContext.getEjbName()); + jmxName.set("j2eeType", ""); + jmxName.set("name", beanContext.getEjbName()); + // Create stats interceptor if (StatsInterceptor.isStatsActivated()) { final StatsInterceptor stats = new StatsInterceptor(beanContext.getBeanClass()); beanContext.addFirstSystemInterceptor(stats); - final MBeanServer server = LocalMBeanServer.get(); - - final ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb.management"); - jmxName.set("J2EEServer", "openejb"); - jmxName.set("J2EEApplication", null); - jmxName.set("EJBModule", beanContext.getModuleID()); - jmxName.set("MessageDrivenBean", beanContext.getEjbName()); - jmxName.set("j2eeType", ""); - jmxName.set("name", beanContext.getEjbName()); - // register the invocation stats interceptor try { final ObjectName objectName = jmxName.set("j2eeType", "Invocations").build(); @@ -251,12 +248,12 @@ public class MdbInstanceManager { logger.info("Not auto-activating endpoint for " + beanContext.getDeploymentID()); } - String jmxName = beanContext.getActivationProperties().get("MdbJMXControl"); - if (jmxName == null) { - jmxName = "true"; + String jmxControlName = beanContext.getActivationProperties().get("MdbJMXControl"); + if (jmxControlName == null) { + jmxControlName = "true"; } - addJMxControl(beanContext, jmxName, activationContext); + addJMxControl(beanContext, jmxControlName, activationContext); } catch (final ResourceException e) { throw new OpenEJBException(e); @@ -273,10 +270,22 @@ public class MdbInstanceManager { try { es.awaitTermination(5, TimeUnit.MINUTES); } catch (final InterruptedException e) { - logger.error("can't fill the stateless pool", e); + logger.error("can't fill the message driven bean pool", e); } } + // register the pool + try { + final ObjectName objectName = jmxName.set("j2eeType", "Pool").build(); + if (server.isRegistered(objectName)) { + server.unregisterMBean(objectName); + } + server.registerMBean(new ManagedMBean(data.pool), objectName); + data.add(objectName); + } catch (final Exception e) { + logger.error("Unable to register MBean ", e); + } + data.getPool().start(); } @@ -492,7 +501,7 @@ public class MdbInstanceManager { instance.setPoolEntry(entry); } } catch (final TimeoutException e) { - final String msg = "No instances available in Session Bean pool. Waited " + data.getAccessTimeout().toString(); + final String msg = "No instances available in Message Driven Bean pool. Waited " + data.getAccessTimeout().toString(); final ConcurrentAccessTimeoutException timeoutException = new ConcurrentAccessTimeoutException(msg); timeoutException.fillInStackTrace(); throw new ApplicationException(timeoutException); @@ -683,5 +692,4 @@ public class MdbInstanceManager { this.baseContext = baseContext; } } - } http://git-wip-us.apache.org/repos/asf/tomee/blob/5f6ea1d8/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbPoolContainer.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbPoolContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbPoolContainer.java index bc0edb6..3a2ce12 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbPoolContainer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbPoolContainer.java @@ -422,6 +422,10 @@ public class MdbPoolContainer implements RpcContainer, BaseMdbContainer { // invoke the tx after method try { afterInvoke(mdbCallContext.txPolicy, callContext); + } catch (final ApplicationException e) { + callContext.setDiscardInstance(true); + throw new SystemException("Should never get an Application exception", e); + } finally { if (instance != null) { if (callContext.isDiscardInstance()) { this.instanceManager.discardInstance(callContext, instance); @@ -434,9 +438,7 @@ public class MdbPoolContainer implements RpcContainer, BaseMdbContainer { } } - } catch (final ApplicationException e) { - throw new SystemException("Should never get an Application exception", e); - } finally { + ThreadContext.exit(mdbCallContext.oldCallContext); } } http://git-wip-us.apache.org/repos/asf/tomee/blob/5f6ea1d8/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/TxTimeoutPoolEndpointHandlerTest.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/TxTimeoutPoolEndpointHandlerTest.java b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/TxTimeoutPoolEndpointHandlerTest.java new file mode 100644 index 0000000..3badf9c --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/core/mdb/TxTimeoutPoolEndpointHandlerTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.core.mdb; + +import org.apache.openejb.core.mdb.connector.api.InboundListener; +import org.apache.openejb.core.mdb.connector.api.SampleConnection; +import org.apache.openejb.core.mdb.connector.api.SampleConnectionFactory; +import org.apache.openejb.core.mdb.connector.impl.SampleActivationSpec; +import org.apache.openejb.core.mdb.connector.impl.SampleManagedConnectionFactory; +import org.apache.openejb.core.mdb.connector.impl.SampleResourceAdapter; +import org.apache.openejb.jee.MessageDrivenBean; +import org.apache.openejb.junit.ApplicationComposer; +import org.apache.openejb.monitoring.LocalMBeanServer; +import org.apache.openejb.testing.Configuration; +import org.apache.openejb.testing.Module; +import org.apache.openejb.testng.PropertiesBuilder; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Queue; +import javax.management.ObjectName; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +@RunWith(ApplicationComposer.class) +public class TxTimeoutPoolEndpointHandlerTest { + + private static final String TEXT = "foo"; + + @Configuration + public Properties config() { + return new PropertiesBuilder() + + .p("myTransactionManager", "new://TransactionManager?type=TransactionManager") + .p("myTransactionManager.defaultTransactionTimeout", "5 seconds") + + .p("sra", "new://Resource?class-name=" + SampleResourceAdapter.class.getName()) + + .p("mdbs", "new://Container?type=MESSAGE") + .p("mdbs.ResourceAdapter", "sra") + .p("mdbs.pool", "true") + .p("mdbs.maxSize", "2") // this is deliberately low + .p("mdbs.ActivationSpecClass", SampleActivationSpec.class.getName()) + .p("mdbs.MessageListenerInterface", InboundListener.class.getName()) + + .p("cf", "new://Resource?type=" + SampleConnectionFactory.class.getName() + "&class-name=" + SampleManagedConnectionFactory.class.getName()) + .p("cf.ResourceAdapter", "sra") + .p("cf.TransactionSupport", "none") + .build(); + } + + @Module + public MessageDrivenBean jar() { + return new MessageDrivenBean(Listener.class); + } + + @Resource(name = "target") + private Queue destination; + + @Resource(name = "cf") + private SampleConnectionFactory cf; + + @Before + public void resetLatch() { + Listener.reset(); + } + + @Test + public void shouldSendMessage() throws Exception { + assertNotNull(cf); + + for (int i = 0; i < 5; i++) { + final SampleConnection connection = cf.getConnection(); + try { + connection.sendMessage(TEXT); + } finally { + connection.close(); + } + } + + // start MDB delivery + setControl("start"); + + assertTrue(Listener.sync()); + assertEquals(5, Listener.COUNTER.get()); + } + + private void setControl(final String action) throws Exception { + LocalMBeanServer.get().invoke( + new ObjectName("default:type=test"), + action, new Object[0], new String[0]); + } + + @MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "DeliveryActive", propertyValue = "false"), + @ActivationConfigProperty(propertyName = "MdbJMXControl", propertyValue = "default:type=test") + }) + public static class Listener implements InboundListener { + public static CountDownLatch latch; + private static final List<Boolean> BOOLEANS = new CopyOnWriteArrayList<Boolean>(); + + static final AtomicLong COUNTER = new AtomicLong(); + + @PostConstruct + public void postConstruct() { + COUNTER.incrementAndGet(); + } + + public static void reset() { + latch = new CountDownLatch(100); + BOOLEANS.clear(); + } + + public static boolean sync() throws InterruptedException { + latch.await(1, TimeUnit.MINUTES); + for (boolean result : BOOLEANS) { + if(!result) { + return false; + } + } + return true; + } + + @Override + public void receiveMessage(String message) { + try { + boolean ok = TEXT.equals(message); + BOOLEANS.add(ok); + } finally { + latch.countDown(); + } + + // this should be long enough to make the transaction time out + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +} \ No newline at end of file