This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push: new 144ff8c [CXF-7023] Add oneSessionPerConnection property to JMS transport 144ff8c is described below commit 144ff8c9fa7a4495ad4825ba47cef74af563a299 Author: Simon Marti <simon.ma...@inventage.com> AuthorDate: Thu Dec 14 14:44:07 2017 +0100 [CXF-7023] Add oneSessionPerConnection property to JMS transport --- .../cxf/transport/jms/BackChannelConduit.java | 20 +++++- .../org/apache/cxf/transport/jms/JMSConduit.java | 41 +++++++++--- .../apache/cxf/transport/jms/JMSConfigFactory.java | 2 + .../apache/cxf/transport/jms/JMSConfiguration.java | 9 +++ .../apache/cxf/transport/jms/JMSDestination.java | 46 +++++++++----- .../apache/cxf/transport/jms/uri/JMSEndpoint.java | 13 ++++ .../jms/util/PollingMessageListenerContainer.java | 74 ++++++++++++++++++++-- .../jms/testsuite/testcases/SoapJmsSpecTest.java | 2 +- 8 files changed, 174 insertions(+), 33 deletions(-) diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java index a83984b..5a95501 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java @@ -54,14 +54,19 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { private static final Logger LOG = LogUtils.getL7dLogger(BackChannelConduit.class); private JMSConfiguration jmsConfig; private Message inMessage; - private Connection connection; + private Connection persistentConnection; BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig, Connection connection) { super(EndpointReferenceUtils.getAnonymousEndpointReference()); this.inMessage = inMessage; this.jmsConfig = jmsConfig; - this.connection = connection; + this.persistentConnection = connection; } + + BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) { + this(inMessage, jmsConfig, null); + } + @Override public void close(Message msg) throws IOException { MessageStreamUtil.closeStreams(msg); @@ -121,6 +126,14 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { private void send(final Message outMessage, final Object replyObj, ResourceCloser closer) throws JMSException { + Connection connection; + + if (persistentConnection == null) { + connection = closer.register(JMSFactory.createConnection(jmsConfig)); + } else { + connection = this.persistentConnection; + } + Session session = closer.register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); JMSMessageHeadersType outProps = (JMSMessageHeadersType)outMessage.get(JMS_SERVER_RESPONSE_HEADERS); @@ -178,6 +191,7 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode()); messageProperties.setJMSPriority(inMessageProperties.getJMSPriority()); messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI()); + messageProperties.setSOAPJMSSOAPAction(inMessageProperties.getSOAPJMSSOAPAction()); messageProperties.setSOAPJMSBindingVersion("1.0"); } @@ -220,4 +234,4 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { : request.getJMSCorrelationID(); } -} \ No newline at end of file +} diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 50db27d..d627c16 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -47,10 +47,12 @@ import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.security.SecurityContext; import org.apache.cxf.transport.AbstractConduit; +import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer; import org.apache.cxf.transport.jms.util.JMSListenerContainer; import org.apache.cxf.transport.jms.util.JMSSender; import org.apache.cxf.transport.jms.util.JMSUtil; import org.apache.cxf.transport.jms.util.MessageListenerContainer; +import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer; import org.apache.cxf.transport.jms.util.ResourceCloser; import org.apache.cxf.ws.addressing.EndpointReferenceType; @@ -157,8 +159,16 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me assertIsNotTextMessageAndMtom(outMessage); try (ResourceCloser closer = new ResourceCloser()) { - Connection c = getConnection(); - Session session = closer.register(c.createSession(false, + Connection c; + + if (jmsConfig.isOneSessionPerConnection()) { + c = closer.register(JMSFactory.createConnection(jmsConfig)); + c.start(); + } else { + c = getConnection(); + } + + Session session = closer.register(c.createSession(false, Session.AUTO_ACKNOWLEDGE)); if (exchange.isOneWay()) { @@ -168,9 +178,11 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me } } catch (JMSException e) { // Close connection so it will be refreshed on next try - ResourceCloser.close(connection); - this.connection = null; - jmsConfig.resetCachedReplyDestination(); + if (!jmsConfig.isOneSessionPerConnection()) { + ResourceCloser.close(connection); + this.connection = null; + jmsConfig.resetCachedReplyDestination(); + } this.staticReplyDestination = null; if (this.jmsListener != null) { this.jmsListener.shutdown(); @@ -192,14 +204,27 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me staticReplyDestination = jmsConfig.getReplyDestination(session); String messageSelector = JMSFactory.getMessageSelector(jmsConfig, conduitId); + if (jmsConfig.getMessageSelector() != null) { + messageSelector += (messageSelector != null && !messageSelector.isEmpty() ? " AND " : "") + + jmsConfig.getMessageSelector(); + } if (messageSelector == null && !jmsConfig.isPubSubDomain()) { // Do not open listener without selector on a queue as we then can not share the queue. // An option for this might be a good idea for people who do not plan to share queues. return; } - MessageListenerContainer container = new MessageListenerContainer(getConnection(), - staticReplyDestination, - this); + + AbstractMessageListenerContainer container; + + if (jmsConfig.isOneSessionPerConnection()) { + container = new PollingMessageListenerContainer(jmsConfig, true, this); + } else { + container = new MessageListenerContainer(getConnection(), staticReplyDestination, this); + } + + container.setTransactionManager(jmsConfig.getTransactionManager()); + container.setTransacted(jmsConfig.isSessionTransacted()); + container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); container.setMessageSelector(messageSelector); Object executor = bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR); if (executor instanceof Executor) { diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java index 48953f5..b7866cf 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java @@ -86,6 +86,8 @@ public final class JMSConfigFactory { jmsConfig.setUserName(endpoint.getUsername()); jmsConfig.setPassword(endpoint.getPassword()); jmsConfig.setConcurrentConsumers(endpoint.getConcurrentConsumers()); + jmsConfig.setOneSessionPerConnection(endpoint.isOneSessionPerConnection()); + jmsConfig.setMessageSelector(endpoint.getMessageSelector()); TransactionManager tm = getTransactionManager(bus, endpoint); jmsConfig.setTransactionManager(tm); diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java index 434cb0c..bead3e7 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java @@ -91,6 +91,7 @@ public class JMSConfiguration { private boolean useConduitIdSelector = true; private String conduitSelectorPrefix; private boolean jmsProviderTibcoEms; + private boolean oneSessionPerConnection; private TransactionManager transactionManager; @@ -432,6 +433,14 @@ public class JMSConfiguration { this.jmsProviderTibcoEms = jmsProviderTibcoEms; } + public boolean isOneSessionPerConnection() { + return oneSessionPerConnection; + } + + public void setOneSessionPerConnection(boolean oneSessionPerConnection) { + this.oneSessionPerConnection = oneSessionPerConnection; + } + public static Destination resolveOrCreateDestination(final Session session, final DestinationResolver resolver, final String replyToDestinationName, diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index b85411e..c72931b 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -90,7 +90,13 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess && !robust) { return null; } - return new BackChannelConduit(inMessage, jmsConfig, connection); + + if (jmsConfig.isOneSessionPerConnection()) { + return new BackChannelConduit(inMessage, jmsConfig); + } else { + return new BackChannelConduit(inMessage, jmsConfig, connection); + } + } /** @@ -105,14 +111,15 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause())) { throw e; } - // If first connect fails we will try to establish the connection in the background - new Thread(new Runnable() { - - @Override - public void run() { - restartConnection(); - } - }).start(); + if (!jmsConfig.isOneSessionPerConnection()) { + // If first connect fails we will try to establish the connection in the background + new Thread(new Runnable() { + @Override + public void run() { + restartConnection(); + } + }).start(); + } } } @@ -120,7 +127,6 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess private JMSListenerContainer createTargetDestinationListener() { Session session = null; try { - connection = JMSFactory.createConnection(jmsConfig); ExceptionListener exListener = new ExceptionListener() { public void onException(JMSException exception) { if (!shutdown) { @@ -129,12 +135,17 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess } } }; - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = jmsConfig.getTargetDestination(session); + + PollingMessageListenerContainer container; + if (!jmsConfig.isOneSessionPerConnection()) { + connection = JMSFactory.createConnection(jmsConfig); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = jmsConfig.getTargetDestination(session); + container = new PollingMessageListenerContainer(connection, destination, this, exListener); + } else { + container = new PollingMessageListenerContainer(jmsConfig, false, this); + } - PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, - destination, - this, exListener); container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); container.setTransactionManager(jmsConfig.getTransactionManager()); container.setMessageSelector(jmsConfig.getMessageSelector()); @@ -149,7 +160,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess container.setJndiEnvironment(jmsConfig.getJndiEnvironment()); container.start(); suspendedContinuations.setListenerContainer(container); - connection.start(); + + if (!jmsConfig.isOneSessionPerConnection()) { + connection.start(); + } return container; } catch (JMSException e) { ResourceCloser.close(connection); diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java index 89c12fb..dd004b8 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java @@ -88,6 +88,7 @@ public class JMSEndpoint { private int concurrentConsumers = 1; private String messageSelector; private int retryInterval = 5000; + private boolean oneSessionPerConnection; /** * @param uri @@ -499,4 +500,16 @@ public class JMSEndpoint { this.retryInterval = Integer.valueOf(retryInterval); } + public boolean isOneSessionPerConnection() { + return oneSessionPerConnection; + } + + public void setOneSessionPerConnection(String oneSessionPerConnection) { + this.oneSessionPerConnection = Boolean.valueOf(oneSessionPerConnection); + } + + public void setOneSessionPerConnection(boolean oneSessionPerConnection) { + this.oneSessionPerConnection = oneSessionPerConnection; + } + } diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 9edd0da..de2c57f 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -34,11 +34,23 @@ import javax.transaction.Status; import javax.transaction.Transaction; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.jms.JMSConfiguration; +import org.apache.cxf.transport.jms.JMSFactory; public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); private ExceptionListener exceptionListener; + private JMSConfiguration jmsConfig; + private boolean reply; + + public PollingMessageListenerContainer(JMSConfiguration jmsConfig, boolean isReply, + MessageListener listenerHandler) { + this.jmsConfig = jmsConfig; + this.reply = isReply; + this.listenerHandler = listenerHandler; + } + public PollingMessageListenerContainer(Connection connection, Destination destination, MessageListener listenerHandler, ExceptionListener exceptionListener) { this.connection = connection; @@ -55,9 +67,16 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); - // Create session early to optimize performance // In + Connection connection; + if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) { + connection = closer.register(createConnection()); + } else { + connection = PollingMessageListenerContainer.this.connection; + } + // Create session early to optimize performance session = closer.register(connection.createSession(transacted, acknowledgeMode)); - MessageConsumer consumer = closer.register(createConsumer(session)); + MessageConsumer consumer = closer.register(createConsumer(connection, session)); + while (running) { Message message = consumer.receive(1000); try { @@ -108,12 +127,20 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont throw new IllegalStateException("External transactions are not supported in XAPoller"); } transactionManager.begin(); + + Connection connection; + if (getConnection() == null) { + connection = closer.register(createConnection()); + } else { + connection = getConnection(); + } + /* * Create session inside transaction to give it the * chance to enlist itself as a resource */ Session session = closer.register(connection.createSession(transacted, acknowledgeMode)); - MessageConsumer consumer = closer.register(createConsumer(session)); + MessageConsumer consumer = closer.register(createConsumer(connection, session)); Message message = consumer.receive(1000); try { if (message != null) { @@ -122,7 +149,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont transactionManager.commit(); } catch (Throwable e) { LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); - safeRollBack(session); + safeRollBack(); } } catch (Throwable e) { handleException(e); @@ -131,7 +158,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } - protected void safeRollBack(Session session) { + private void safeRollBack() { try { transactionManager.rollback(); } catch (Throwable e) { @@ -141,7 +168,31 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } + private MessageConsumer createConsumer(final Connection connection, final Session session) + throws JMSException { + final MessageConsumer consumer; + + if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) { + Destination destination; + if (!isReply()) { + destination = jmsConfig.getTargetDestination(session); + } else { + destination = jmsConfig.getReplyDestination(session); + } + consumer = createConsumer(destination, session); + connection.start(); + } else { + consumer = createConsumer(session); + } + + return consumer; + } + private MessageConsumer createConsumer(Session session) throws JMSException { + return createConsumer(this.destination, session); + } + + private MessageConsumer createConsumer(Destination destination, Session session) throws JMSException { if (durableSubscriptionName != null && destination instanceof Topic) { return session.createDurableSubscriber((Topic)destination, durableSubscriptionName, messageSelector, pubSubNoLocal); @@ -161,6 +212,19 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont this.exceptionListener.onException(wrapped); } + private boolean isReply() { + return reply; + } + + private Connection createConnection() { + try { + return JMSFactory.createConnection(jmsConfig); + } catch (JMSException e) { + handleException(e); + throw JMSUtil.convertJmsException(e); + } + } + @Override public void start() { if (running) { diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java index 12beef5..7c78b6d 100644 --- a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java +++ b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java @@ -107,7 +107,7 @@ public class SoapJmsSpecTest extends AbstractVmJMSTest { JMSMessageHeadersType responseHeader = (JMSMessageHeadersType)responseContext .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); Assert.assertEquals("1.0", responseHeader.getSOAPJMSBindingVersion()); - Assert.assertEquals(null, responseHeader.getSOAPJMSSOAPAction()); + Assert.assertEquals("\"test\"", responseHeader.getSOAPJMSSOAPAction()); Assert.assertEquals(DeliveryMode.PERSISTENT, responseHeader.getJMSDeliveryMode()); Assert.assertEquals(7, responseHeader.getJMSPriority()); } -- To stop receiving notification emails like this one, please contact cschnei...@apache.org.