[ https://issues.apache.org/jira/browse/CXF-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342533#comment-16342533 ]
ASF GitHub Bot commented on CXF-7023: ------------------------------------- cschneider closed pull request #358: [CXF-7023] Add oneSessionPerConnection property to JMS transport URL: https://github.com/apache/cxf/pull/358 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java index a83984b8f99..5a9550181eb 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java @@ -54,14 +54,19 @@ private static final Logger LOG = LogUtils.getL7dLogger(BackChannelConduit.class); private JMSConfiguration jmsConfig; private Message inMessage; - private Connection connection; + private Connection persistentConnection; BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig, Connection connection) { super(EndpointReferenceUtils.getAnonymousEndpointReference()); this.inMessage = inMessage; this.jmsConfig = jmsConfig; - this.connection = connection; + this.persistentConnection = connection; } + + BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) { + this(inMessage, jmsConfig, null); + } + @Override public void close(Message msg) throws IOException { MessageStreamUtil.closeStreams(msg); @@ -121,6 +126,14 @@ public void sendExchange(Exchange exchange, final Object replyObj) { private void send(final Message outMessage, final Object replyObj, ResourceCloser closer) throws JMSException { + Connection connection; + + if (persistentConnection == null) { + connection = closer.register(JMSFactory.createConnection(jmsConfig)); + } else { + connection = this.persistentConnection; + } + Session session = closer.register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); JMSMessageHeadersType outProps = (JMSMessageHeadersType)outMessage.get(JMS_SERVER_RESPONSE_HEADERS); @@ -178,6 +191,7 @@ public static void initResponseMessageProperties(JMSMessageHeadersType messagePr messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode()); messageProperties.setJMSPriority(inMessageProperties.getJMSPriority()); messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI()); + messageProperties.setSOAPJMSSOAPAction(inMessageProperties.getSOAPJMSSOAPAction()); messageProperties.setSOAPJMSBindingVersion("1.0"); } @@ -220,4 +234,4 @@ public String determineCorrelationID(javax.jms.Message request) throws JMSExcept : request.getJMSCorrelationID(); } -} \ No newline at end of file +} diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 50db27d0875..d627c1621c0 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -47,10 +47,12 @@ import org.apache.cxf.message.MessageUtils; import org.apache.cxf.security.SecurityContext; import org.apache.cxf.transport.AbstractConduit; +import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer; import org.apache.cxf.transport.jms.util.JMSListenerContainer; import org.apache.cxf.transport.jms.util.JMSSender; import org.apache.cxf.transport.jms.util.JMSUtil; import org.apache.cxf.transport.jms.util.MessageListenerContainer; +import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer; import org.apache.cxf.transport.jms.util.ResourceCloser; import org.apache.cxf.ws.addressing.EndpointReferenceType; @@ -157,8 +159,16 @@ public void sendExchange(final Exchange exchange, final Object request) { assertIsNotTextMessageAndMtom(outMessage); try (ResourceCloser closer = new ResourceCloser()) { - Connection c = getConnection(); - Session session = closer.register(c.createSession(false, + Connection c; + + if (jmsConfig.isOneSessionPerConnection()) { + c = closer.register(JMSFactory.createConnection(jmsConfig)); + c.start(); + } else { + c = getConnection(); + } + + Session session = closer.register(c.createSession(false, Session.AUTO_ACKNOWLEDGE)); if (exchange.isOneWay()) { @@ -168,9 +178,11 @@ public void sendExchange(final Exchange exchange, final Object request) { } } catch (JMSException e) { // Close connection so it will be refreshed on next try - ResourceCloser.close(connection); - this.connection = null; - jmsConfig.resetCachedReplyDestination(); + if (!jmsConfig.isOneSessionPerConnection()) { + ResourceCloser.close(connection); + this.connection = null; + jmsConfig.resetCachedReplyDestination(); + } this.staticReplyDestination = null; if (this.jmsListener != null) { this.jmsListener.shutdown(); @@ -192,14 +204,27 @@ private void setupReplyDestination(Session session) throws JMSException { staticReplyDestination = jmsConfig.getReplyDestination(session); String messageSelector = JMSFactory.getMessageSelector(jmsConfig, conduitId); + if (jmsConfig.getMessageSelector() != null) { + messageSelector += (messageSelector != null && !messageSelector.isEmpty() ? " AND " : "") + + jmsConfig.getMessageSelector(); + } if (messageSelector == null && !jmsConfig.isPubSubDomain()) { // Do not open listener without selector on a queue as we then can not share the queue. // An option for this might be a good idea for people who do not plan to share queues. return; } - MessageListenerContainer container = new MessageListenerContainer(getConnection(), - staticReplyDestination, - this); + + AbstractMessageListenerContainer container; + + if (jmsConfig.isOneSessionPerConnection()) { + container = new PollingMessageListenerContainer(jmsConfig, true, this); + } else { + container = new MessageListenerContainer(getConnection(), staticReplyDestination, this); + } + + container.setTransactionManager(jmsConfig.getTransactionManager()); + container.setTransacted(jmsConfig.isSessionTransacted()); + container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); container.setMessageSelector(messageSelector); Object executor = bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR); if (executor instanceof Executor) { diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java index 48953f53f53..b7866cf1af2 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java @@ -86,6 +86,8 @@ public static JMSConfiguration createFromEndpoint(Bus bus, JMSEndpoint endpoint) jmsConfig.setUserName(endpoint.getUsername()); jmsConfig.setPassword(endpoint.getPassword()); jmsConfig.setConcurrentConsumers(endpoint.getConcurrentConsumers()); + jmsConfig.setOneSessionPerConnection(endpoint.isOneSessionPerConnection()); + jmsConfig.setMessageSelector(endpoint.getMessageSelector()); TransactionManager tm = getTransactionManager(bus, endpoint); jmsConfig.setTransactionManager(tm); diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java index 434cb0cd295..bead3e7746b 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java @@ -91,6 +91,7 @@ private boolean useConduitIdSelector = true; private String conduitSelectorPrefix; private boolean jmsProviderTibcoEms; + private boolean oneSessionPerConnection; private TransactionManager transactionManager; @@ -432,6 +433,14 @@ public void setJmsProviderTibcoEms(boolean jmsProviderTibcoEms) { this.jmsProviderTibcoEms = jmsProviderTibcoEms; } + public boolean isOneSessionPerConnection() { + return oneSessionPerConnection; + } + + public void setOneSessionPerConnection(boolean oneSessionPerConnection) { + this.oneSessionPerConnection = oneSessionPerConnection; + } + public static Destination resolveOrCreateDestination(final Session session, final DestinationResolver resolver, final String replyToDestinationName, diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index b85411e3f7c..c72931bcf5b 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -90,7 +90,13 @@ protected Conduit getInbuiltBackChannel(Message inMessage) { && !robust) { return null; } - return new BackChannelConduit(inMessage, jmsConfig, connection); + + if (jmsConfig.isOneSessionPerConnection()) { + return new BackChannelConduit(inMessage, jmsConfig); + } else { + return new BackChannelConduit(inMessage, jmsConfig, connection); + } + } /** @@ -105,14 +111,15 @@ public void activate() { if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause())) { throw e; } - // If first connect fails we will try to establish the connection in the background - new Thread(new Runnable() { - - @Override - public void run() { - restartConnection(); - } - }).start(); + if (!jmsConfig.isOneSessionPerConnection()) { + // If first connect fails we will try to establish the connection in the background + new Thread(new Runnable() { + @Override + public void run() { + restartConnection(); + } + }).start(); + } } } @@ -120,7 +127,6 @@ public void run() { private JMSListenerContainer createTargetDestinationListener() { Session session = null; try { - connection = JMSFactory.createConnection(jmsConfig); ExceptionListener exListener = new ExceptionListener() { public void onException(JMSException exception) { if (!shutdown) { @@ -129,12 +135,17 @@ public void onException(JMSException exception) { } } }; - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = jmsConfig.getTargetDestination(session); + + PollingMessageListenerContainer container; + if (!jmsConfig.isOneSessionPerConnection()) { + connection = JMSFactory.createConnection(jmsConfig); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = jmsConfig.getTargetDestination(session); + container = new PollingMessageListenerContainer(connection, destination, this, exListener); + } else { + container = new PollingMessageListenerContainer(jmsConfig, false, this); + } - PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, - destination, - this, exListener); container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); container.setTransactionManager(jmsConfig.getTransactionManager()); container.setMessageSelector(jmsConfig.getMessageSelector()); @@ -149,7 +160,10 @@ public void onException(JMSException exception) { container.setJndiEnvironment(jmsConfig.getJndiEnvironment()); container.start(); suspendedContinuations.setListenerContainer(container); - connection.start(); + + if (!jmsConfig.isOneSessionPerConnection()) { + connection.start(); + } return container; } catch (JMSException e) { ResourceCloser.close(connection); diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java index 89c12fb50bb..dd004b80011 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java @@ -88,6 +88,7 @@ private int concurrentConsumers = 1; private String messageSelector; private int retryInterval = 5000; + private boolean oneSessionPerConnection; /** * @param uri @@ -499,4 +500,16 @@ public void setRetryInterval(String retryInterval) { this.retryInterval = Integer.valueOf(retryInterval); } + public boolean isOneSessionPerConnection() { + return oneSessionPerConnection; + } + + public void setOneSessionPerConnection(String oneSessionPerConnection) { + this.oneSessionPerConnection = Boolean.valueOf(oneSessionPerConnection); + } + + public void setOneSessionPerConnection(boolean oneSessionPerConnection) { + this.oneSessionPerConnection = oneSessionPerConnection; + } + } diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 9edd0dac136..de2c57fa255 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -34,11 +34,23 @@ import javax.transaction.Transaction; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.transport.jms.JMSConfiguration; +import org.apache.cxf.transport.jms.JMSFactory; public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); private ExceptionListener exceptionListener; + private JMSConfiguration jmsConfig; + private boolean reply; + + public PollingMessageListenerContainer(JMSConfiguration jmsConfig, boolean isReply, + MessageListener listenerHandler) { + this.jmsConfig = jmsConfig; + this.reply = isReply; + this.listenerHandler = listenerHandler; + } + public PollingMessageListenerContainer(Connection connection, Destination destination, MessageListener listenerHandler, ExceptionListener exceptionListener) { this.connection = connection; @@ -55,9 +67,16 @@ public void run() { while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); - // Create session early to optimize performance // In + Connection connection; + if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) { + connection = closer.register(createConnection()); + } else { + connection = PollingMessageListenerContainer.this.connection; + } + // Create session early to optimize performance session = closer.register(connection.createSession(transacted, acknowledgeMode)); - MessageConsumer consumer = closer.register(createConsumer(session)); + MessageConsumer consumer = closer.register(createConsumer(connection, session)); + while (running) { Message message = consumer.receive(1000); try { @@ -108,12 +127,20 @@ public void run() { throw new IllegalStateException("External transactions are not supported in XAPoller"); } transactionManager.begin(); + + Connection connection; + if (getConnection() == null) { + connection = closer.register(createConnection()); + } else { + connection = getConnection(); + } + /* * Create session inside transaction to give it the * chance to enlist itself as a resource */ Session session = closer.register(connection.createSession(transacted, acknowledgeMode)); - MessageConsumer consumer = closer.register(createConsumer(session)); + MessageConsumer consumer = closer.register(createConsumer(connection, session)); Message message = consumer.receive(1000); try { if (message != null) { @@ -122,7 +149,7 @@ public void run() { transactionManager.commit(); } catch (Throwable e) { LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); - safeRollBack(session); + safeRollBack(); } } catch (Throwable e) { handleException(e); @@ -131,7 +158,7 @@ public void run() { } - protected void safeRollBack(Session session) { + private void safeRollBack() { try { transactionManager.rollback(); } catch (Throwable e) { @@ -141,7 +168,31 @@ protected void safeRollBack(Session session) { } + private MessageConsumer createConsumer(final Connection connection, final Session session) + throws JMSException { + final MessageConsumer consumer; + + if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) { + Destination destination; + if (!isReply()) { + destination = jmsConfig.getTargetDestination(session); + } else { + destination = jmsConfig.getReplyDestination(session); + } + consumer = createConsumer(destination, session); + connection.start(); + } else { + consumer = createConsumer(session); + } + + return consumer; + } + private MessageConsumer createConsumer(Session session) throws JMSException { + return createConsumer(this.destination, session); + } + + private MessageConsumer createConsumer(Destination destination, Session session) throws JMSException { if (durableSubscriptionName != null && destination instanceof Topic) { return session.createDurableSubscriber((Topic)destination, durableSubscriptionName, messageSelector, pubSubNoLocal); @@ -161,6 +212,19 @@ protected void handleException(Throwable e) { this.exceptionListener.onException(wrapped); } + private boolean isReply() { + return reply; + } + + private Connection createConnection() { + try { + return JMSFactory.createConnection(jmsConfig); + } catch (JMSException e) { + handleException(e); + throw JMSUtil.convertJmsException(e); + } + } + @Override public void start() { if (running) { diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java index 609ce2c75b4..3492f32df83 100644 --- a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java +++ b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java @@ -106,7 +106,7 @@ public void testWsdlExtensionSpecJMS() throws Exception { JMSMessageHeadersType responseHeader = (JMSMessageHeadersType)responseContext .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); Assert.assertEquals("1.0", responseHeader.getSOAPJMSBindingVersion()); - Assert.assertEquals(null, responseHeader.getSOAPJMSSOAPAction()); + Assert.assertEquals("\"test\"", responseHeader.getSOAPJMSSOAPAction()); Assert.assertEquals(DeliveryMode.PERSISTENT, responseHeader.getJMSDeliveryMode()); Assert.assertEquals(7, responseHeader.getJMSPriority()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SOAP over JMS transport does not use XA transactions with Websphere MQ > resource adapter > --------------------------------------------------------------------------------------- > > Key: CXF-7023 > URL: https://issues.apache.org/jira/browse/CXF-7023 > Project: CXF > Issue Type: Bug > Components: JMS > Affects Versions: 3.1.7 > Reporter: Nikolay Boklaschuk > Assignee: Christian Schneider > Priority: Major > > When using Websphere MQ resource adapter > Inbound one-way service does not uses XA transactions. > This is because WMQ adapter decides to use XA transaction when creates jms > connection, but connection opened in JMSDestination, and transaction started > in PollingMessageListnerContainer after connection created. > Futhermore WMQ adapter holds only one session per connection. > I have patched XAPoller to hold connection for each thread, it works, but may > be there are better way to provide support for WMQ adapter? -- This message was sent by Atlassian JIRA (v7.6.3#76005)