This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 803ccf7 ARTEMIS-2743 Synchronize JMS connection methods
new 38de1f7 This closes #3106
803ccf7 is described below
commit 803ccf72292d7ad6202f500023845adb7aad503b
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Apr 29 13:45:36 2020 -0500
ARTEMIS-2743 Synchronize JMS connection methods
---
.../artemis/jms/client/ActiveMQConnection.java | 14 +++---
.../artemis/jms/client/ActiveMQXAConnection.java | 6 +--
.../integration/jms/client/ConnectionTest.java | 58 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 10 deletions(-)
diff --git
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 9241ddf..b3026ad 100644
---
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -192,7 +192,7 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since
the JMS Javadoc
* mandates createSession to return a XASession.
*/
- public Session createNonXASession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
+ public synchronized Session createNonXASession(final boolean transacted,
final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode,
ActiveMQConnection.TYPE_GENERIC_CONNECTION);
@@ -206,7 +206,7 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since
the JMS Javadoc
* mandates createSession to return a XASession.
*/
- public Session createNonXATopicSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
+ public synchronized Session createNonXATopicSession(final boolean
transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode,
ActiveMQConnection.TYPE_TOPIC_CONNECTION);
@@ -220,7 +220,7 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
* For that reason we have this method to force that nonXASession, since
the JMS Javadoc
* mandates createSession to return a XASession.
*/
- public Session createNonXAQueueSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
+ public synchronized Session createNonXAQueueSession(final boolean
transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, acknowledgeMode,
ActiveMQConnection.TYPE_QUEUE_CONNECTION);
@@ -432,14 +432,14 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
}
@Override
- public Session createSession(int sessionMode) throws JMSException {
+ public synchronized Session createSession(int sessionMode) throws
JMSException {
checkClosed();
return createSessionInternal(false, sessionMode ==
Session.SESSION_TRANSACTED, sessionMode, ActiveMQSession.TYPE_GENERIC_SESSION);
}
@Override
- public Session createSession() throws JMSException {
+ public synchronized Session createSession() throws JMSException {
checkClosed();
return createSessionInternal(false, false, Session.AUTO_ACKNOWLEDGE,
ActiveMQSession.TYPE_GENERIC_SESSION);
}
@@ -447,7 +447,7 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
// QueueConnection implementation
---------------------------------------------------------------
@Override
- public QueueSession createQueueSession(final boolean transacted, int
acknowledgeMode) throws JMSException {
+ public synchronized QueueSession createQueueSession(final boolean
transacted, int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted,
acknowledgeMode), ActiveMQSession.TYPE_QUEUE_SESSION);
}
@@ -477,7 +477,7 @@ public class ActiveMQConnection extends
ActiveMQConnectionForContextImpl impleme
// TopicConnection implementation
---------------------------------------------------------------
@Override
- public TopicSession createTopicSession(final boolean transacted, final int
acknowledgeMode) throws JMSException {
+ public synchronized TopicSession createTopicSession(final boolean
transacted, final int acknowledgeMode) throws JMSException {
checkClosed();
return createSessionInternal(false, transacted, checkAck(transacted,
acknowledgeMode), ActiveMQSession.TYPE_TOPIC_SESSION);
}
diff --git
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
index 0d6158e..c5eea98 100644
---
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
+++
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQXAConnection.java
@@ -48,20 +48,20 @@ public final class ActiveMQXAConnection extends
ActiveMQConnection implements XA
}
@Override
- public XASession createXASession() throws JMSException {
+ public synchronized XASession createXASession() throws JMSException {
checkClosed();
return (XASession) createSessionInternal(isXA(), true,
Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_GENERIC_SESSION);
}
@Override
- public XAQueueSession createXAQueueSession() throws JMSException {
+ public synchronized XAQueueSession createXAQueueSession() throws
JMSException {
checkClosed();
return (XAQueueSession) createSessionInternal(isXA(), true,
Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_QUEUE_SESSION);
}
@Override
- public XATopicSession createXATopicSession() throws JMSException {
+ public synchronized XATopicSession createXATopicSession() throws
JMSException {
checkClosed();
return (XATopicSession) createSessionInternal(isXA(), true,
Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
index 259162e..d1ed5ba 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConnectionTest.java
@@ -20,6 +20,7 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSContext;
+import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -31,15 +32,23 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class ConnectionTest extends JMSTestBase {
+ private static final Logger log = Logger.getLogger(ConnectionTest.class);
+
private Connection conn2;
@Test
@@ -248,6 +257,55 @@ public class ConnectionTest extends JMSTestBase {
}
}
+ @Test
+ public void testCreateSessionAndCloseConnectionConcurrently() throws
Exception {
+ final int ATTEMPTS = 10;
+ final int THREAD_COUNT = 50;
+ final int SESSION_COUNT = 10;
+ final ExecutorService executor =
Executors.newFixedThreadPool(THREAD_COUNT);
+ try {
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
+
+ for (int i = 0; i < ATTEMPTS; i++) {
+ final CountDownLatch lineUp = new CountDownLatch(THREAD_COUNT);
+ final AtomicBoolean error = new AtomicBoolean(false);
+ final Connection connection = cf.createConnection();
+
+ for (int j = 0; j < THREAD_COUNT; ++j) {
+ executor.execute(() -> {
+ for (int k = 0; k < SESSION_COUNT; k++) {
+ try {
+ connection.createSession().close();
+ if (k == 0) {
+ lineUp.countDown();
+ }
+ } catch (javax.jms.IllegalStateException e) {
+ // ignore
+ break;
+ } catch (JMSException e) {
+ // ignore
+ break;
+ } catch (Throwable t) {
+ log.warn(t.getMessage(), t);
+ error.set(true);
+ break;
+ }
+ }
+ });
+ }
+
+ // wait until all the threads have created & closed at least 1
session
+ assertTrue(lineUp.await(10, TimeUnit.SECONDS));
+ connection.close();
+ if (error.get()) {
+ assertFalse(error.get());
+ }
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
@Override
@After
public void tearDown() throws Exception {