Repository: cxf Updated Branches: refs/heads/master 638f4e8df -> 3bd4e8442
Remove some printlns in the tests Cleanup some of the synchronized blocks for slightly better performance Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3bd4e844 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3bd4e844 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3bd4e844 Branch: refs/heads/master Commit: 3bd4e84427598b7dfd18ccfcdea72512bc6cda9e Parents: 638f4e8 Author: Daniel Kulp <[email protected]> Authored: Thu Apr 10 13:04:21 2014 -0400 Committer: Daniel Kulp <[email protected]> Committed: Thu Apr 10 13:04:21 2014 -0400 ---------------------------------------------------------------------- .../apache/cxf/transport/jms/JMSConduit.java | 65 +++++++++++--------- .../cxf/transport/jms/JMSConfiguration.java | 38 ++++++++---- .../cxf/transport/jms/util/TestReceiver.java | 6 +- .../jms/PooledConnectionTempQueueTest.java | 11 ++-- .../transport/jms/util/MessageListenerTest.java | 9 +-- 5 files changed, 78 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/3bd4e844/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 291b608..bff0dd9 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -73,8 +73,8 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me private final AtomicLong messageCount = new AtomicLong(0); private JMSBusLifeCycleListener listener; private Bus bus; - private Connection connection; - private Destination staticReplyDestination; + private volatile Connection connection; + private volatile Destination staticReplyDestination; public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig, @@ -100,30 +100,19 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me MessageStreamUtil.closeStreams(msg); super.close(msg); } - private synchronized Connection getConnection() throws JMSException { - if (connection == null) { - connection = JMSFactory.createConnection(jmsConfig); - connection.start(); - } - return connection; - } - private synchronized void getJMSListener(Destination replyTo) throws JMSException { - if (jmsListener != null) { - return; - } - String messageSelector = JMSFactory.getMessageSelector(jmsConfig, conduitId); - if (messageSelector == null && !jmsConfig.isPubSubDomain()) { - // Do not open listener without selector on a queue as we then can not share the queue. - // An option for this might be a good idea for people who do not plan to share queues. - return; + private Connection getConnection() throws JMSException { + Connection result = connection; + if (result == null) { + synchronized (this) { + result = connection; + if (result == null) { + result = JMSFactory.createConnection(jmsConfig); + result.start(); + connection = result; + } + } } - MessageListenerContainer container = new MessageListenerContainer(getConnection(), replyTo, this); - container.setMessageSelector(messageSelector); - Executor executor = JMSFactory.createExecutor(bus, "jms-conduit"); - container.setExecutor(executor); - container.start(); - jmsListener = container; - addBusListener(); + return result; } /** @@ -176,10 +165,29 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me } } - private synchronized void setupReplyDestination(Session session) throws JMSException { + private void setupReplyDestination(Session session) throws JMSException { if (staticReplyDestination == null) { - staticReplyDestination = jmsConfig.getReplyDestination(session); - getJMSListener(staticReplyDestination); + synchronized (this) { + if (staticReplyDestination == null) { + staticReplyDestination = jmsConfig.getReplyDestination(session); + + String messageSelector = JMSFactory.getMessageSelector(jmsConfig, conduitId); + if (messageSelector == null && !jmsConfig.isPubSubDomain()) { + // Do not open listener without selector on a queue as we then can not share the queue. + // An option for this might be a good idea for people who do not plan to share queues. + return; + } + MessageListenerContainer container = new MessageListenerContainer(getConnection(), + staticReplyDestination, + this); + container.setMessageSelector(messageSelector); + Executor executor = JMSFactory.createExecutor(bus, "jms-conduit"); + container.setExecutor(executor); + container.start(); + jmsListener = container; + addBusListener(); + } + } } } @@ -418,6 +426,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me jmsListener.stop(); jmsListener.shutdown(); jmsListener = null; + staticReplyDestination = null; } } public synchronized void close() { http://git-wip-us.apache.org/repos/asf/cxf/blob/3bd4e844/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java index ecda3b0..f424ee5 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java @@ -38,7 +38,7 @@ public class JMSConfiguration { */ public static final int DEFAULT_VALUE = -1; - private ConnectionFactory connectionFactory; + private volatile ConnectionFactory connectionFactory; private Properties jndiEnvironment; private String connectionFactoryName; private String userName; @@ -68,7 +68,7 @@ public class JMSConfiguration { * Destination name to listen on for reply messages */ private String replyDestination; - private Destination replyDestinationDest; + private volatile Destination replyDestinationDest; /** * Destination name to send out as replyTo address in the message @@ -338,11 +338,18 @@ public class JMSConfiguration { this.reconnectOnException = reconnectOnException; } - public synchronized ConnectionFactory getConnectionFactory() { - if (connectionFactory == null) { - connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this); + public ConnectionFactory getConnectionFactory() { + ConnectionFactory factory = connectionFactory; + if (factory == null) { + synchronized (this) { + factory = connectionFactory; + if (factory == null) { + factory = JMSFactory.getConnectionFactoryFromJndi(this); + connectionFactory = factory; + } + } } - return connectionFactory; + return factory; } public String getDurableSubscriptionClientId() { @@ -399,13 +406,20 @@ public class JMSConfiguration { return destinationResolver.resolveDestinationName(session, userDestination, replyPubSubDomain); } - public synchronized Destination getReplyDestination(Session session) throws JMSException { - if (replyDestinationDest == null) { - replyDestinationDest = replyDestination == null - ? session.createTemporaryQueue() - : destinationResolver.resolveDestinationName(session, replyDestination, replyPubSubDomain); + public Destination getReplyDestination(Session session) throws JMSException { + Destination result = replyDestinationDest; + if (result == null) { + synchronized (this) { + result = replyDestinationDest; + if (result == null) { + result = replyDestination == null + ? session.createTemporaryQueue() + : destinationResolver.resolveDestinationName(session, replyDestination, replyPubSubDomain); + replyDestinationDest = result; + } + } } - return replyDestinationDest; + return result; } public Destination getTargetDestination(Session session) throws JMSException { http://git-wip-us.apache.org/repos/asf/cxf/blob/3bd4e844/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java index fed9b29..da09c0d 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java @@ -92,11 +92,11 @@ public class TestReceiver { .createQueue(receiveQueueName))); final javax.jms.Message inMessage = consumer.receive(10000); if (inMessage == null) { - System.out.println("TestReceiver timed out"); + //System.out.println("TestReceiver timed out"); throw new RuntimeException("No message received on destination " + receiveQueueName); } requestMessageId = inMessage.getJMSMessageID(); - System.out.println("Received message " + requestMessageId); + //System.out.println("Received message " + requestMessageId); final TextMessage replyMessage = session.createTextMessage("Result"); String correlationId = (forceMessageIdAsCorrelationId || inMessage.getJMSCorrelationID() == null) ? inMessage.getJMSMessageID() : inMessage.getJMSCorrelationID(); @@ -106,7 +106,7 @@ public class TestReceiver { if (replyDest != null) { final MessageProducer producer = closer .register(session.createProducer(replyDest)); - System.out.println("Sending reply with correlation id " + correlationId + " to " + replyDest); + //System.out.println("Sending reply with correlation id " + correlationId + " to " + replyDest); producer.send(replyMessage); } } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/cxf/blob/3bd4e844/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java index 34a889c..a299043 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java @@ -31,6 +31,8 @@ import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import org.apache.activemq.pool.PooledConnectionFactory; + +import org.junit.Assert; import org.junit.Test; public class PooledConnectionTempQueueTest { @@ -76,7 +78,8 @@ public class PooledConnectionTempQueueTest { MessageConsumer consumer = session.createConsumer(tempQueue); Message replyMsg = consumer.receive(); - System.out.println(replyMsg.getJMSCorrelationID()); + Assert.assertNotNull(replyMsg); + //System.out.println(replyMsg.getJMSCorrelationID()); consumer.close(); @@ -92,12 +95,12 @@ public class PooledConnectionTempQueueTest { MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); final javax.jms.Message inMessage = consumer.receive(); - String requestMessageId = inMessage.getJMSMessageID(); - System.out.println("Received message " + requestMessageId); + //String requestMessageId = inMessage.getJMSMessageID(); + //System.out.println("Received message " + requestMessageId); final TextMessage replyMessage = session.createTextMessage("Result"); replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); - System.out.println("Sending reply to " + inMessage.getJMSReplyTo()); + //System.out.println("Sending reply to " + inMessage.getJMSReplyTo()); producer.send(replyMessage); producer.close(); http://git-wip-us.apache.org/repos/asf/cxf/blob/3bd4e844/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index 8c05076..a474810 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -171,8 +171,9 @@ public class MessageListenerTest { int actualNum; do { actualNum = getNumMessages(connection, queue); - System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum - + ", expecting: " + expectedNum); + + //System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum + // + ", expecting: " + expectedNum); Thread.sleep(100); } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum); Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum); @@ -221,12 +222,12 @@ public class MessageListenerTest { try { String text = textMessage.getText(); if (OK.equals(text)) { - System.out.println("Simulating Processing successful"); + //System.out.println("Simulating Processing successful"); } else if (FAIL.equals(text)) { throw new RuntimeException("Simulating something went wrong. Expecting rollback"); } else if (FAILFIRST.equals(text)) { if (message.getJMSRedelivered()) { - System.out.println("Simulating processing worked on second try"); + //System.out.println("Simulating processing worked on second try"); } else { throw new RuntimeException("Simulating something went wrong. Expecting rollback"); }
