[ 
https://issues.apache.org/jira/browse/CXF-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16342533#comment-16342533
 ] 

ASF GitHub Bot commented on CXF-7023:
-------------------------------------

cschneider closed pull request #358: [CXF-7023] Add oneSessionPerConnection 
property to JMS transport
URL: https://github.com/apache/cxf/pull/358
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
index a83984b8f99..5a9550181eb 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
@@ -54,14 +54,19 @@
     private static final Logger LOG = 
LogUtils.getL7dLogger(BackChannelConduit.class);
     private JMSConfiguration jmsConfig;
     private Message inMessage;
-    private Connection connection;
+    private Connection persistentConnection;
 
     BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig, 
Connection connection) {
         super(EndpointReferenceUtils.getAnonymousEndpointReference());
         this.inMessage = inMessage;
         this.jmsConfig = jmsConfig;
-        this.connection = connection;
+        this.persistentConnection = connection;
     }
+
+    BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) {
+        this(inMessage, jmsConfig, null);
+    }
+
     @Override
     public void close(Message msg) throws IOException {
         MessageStreamUtil.closeStreams(msg);
@@ -121,6 +126,14 @@ public void sendExchange(Exchange exchange, final Object 
replyObj) {
 
     private void send(final Message outMessage, final Object replyObj, 
ResourceCloser closer)
         throws JMSException {
+        Connection connection;
+
+        if (persistentConnection == null) {
+            connection = 
closer.register(JMSFactory.createConnection(jmsConfig));
+        } else {
+            connection = this.persistentConnection;
+        }
+
         Session session = closer.register(connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE));
 
         JMSMessageHeadersType outProps = 
(JMSMessageHeadersType)outMessage.get(JMS_SERVER_RESPONSE_HEADERS);
@@ -178,6 +191,7 @@ public static void 
initResponseMessageProperties(JMSMessageHeadersType messagePr
         
messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
         messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
         
messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
+        
messageProperties.setSOAPJMSSOAPAction(inMessageProperties.getSOAPJMSSOAPAction());
         messageProperties.setSOAPJMSBindingVersion("1.0");
     }
 
@@ -220,4 +234,4 @@ public String determineCorrelationID(javax.jms.Message 
request) throws JMSExcept
             : request.getJMSCorrelationID();
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 50db27d0875..d627c1621c0 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -47,10 +47,12 @@
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.security.SecurityContext;
 import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.MessageListenerContainer;
+import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
@@ -157,8 +159,16 @@ public void sendExchange(final Exchange exchange, final 
Object request) {
         assertIsNotTextMessageAndMtom(outMessage);
 
         try (ResourceCloser closer = new ResourceCloser()) {
-            Connection c = getConnection();
-            Session session = closer.register(c.createSession(false,
+            Connection c;
+
+            if (jmsConfig.isOneSessionPerConnection()) {
+                c = closer.register(JMSFactory.createConnection(jmsConfig));
+                c.start();
+            } else {
+                c = getConnection();
+            }
+
+            Session session = closer.register(c.createSession(false, 
                                                               
Session.AUTO_ACKNOWLEDGE));
 
             if (exchange.isOneWay()) {
@@ -168,9 +178,11 @@ public void sendExchange(final Exchange exchange, final 
Object request) {
             }
         } catch (JMSException e) {
             // Close connection so it will be refreshed on next try
-            ResourceCloser.close(connection);
-            this.connection = null;
-            jmsConfig.resetCachedReplyDestination();
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                ResourceCloser.close(connection);
+                this.connection = null;
+                jmsConfig.resetCachedReplyDestination();
+            }
             this.staticReplyDestination = null;
             if (this.jmsListener != null) {
                 this.jmsListener.shutdown();
@@ -192,14 +204,27 @@ private void setupReplyDestination(Session session) 
throws JMSException {
                     staticReplyDestination = 
jmsConfig.getReplyDestination(session);
 
                     String messageSelector = 
JMSFactory.getMessageSelector(jmsConfig, conduitId);
+                    if (jmsConfig.getMessageSelector() != null) {
+                        messageSelector += (messageSelector != null && 
!messageSelector.isEmpty() ? " AND " : "")
+                                + jmsConfig.getMessageSelector();
+                    }
                     if (messageSelector == null && 
!jmsConfig.isPubSubDomain()) {
                         // Do not open listener without selector on a queue as 
we then can not share the queue.
                         // An option for this might be a good idea for people 
who do not plan to share queues.
                         return;
                     }
-                    MessageListenerContainer container = new 
MessageListenerContainer(getConnection(),
-                                                                               
       staticReplyDestination,
-                                                                               
       this);
+
+                    AbstractMessageListenerContainer container;
+
+                    if (jmsConfig.isOneSessionPerConnection()) {
+                        container = new 
PollingMessageListenerContainer(jmsConfig, true, this);
+                    } else {
+                        container = new 
MessageListenerContainer(getConnection(), staticReplyDestination, this);
+                    }
+
+                    
container.setTransactionManager(jmsConfig.getTransactionManager());
+                    container.setTransacted(jmsConfig.isSessionTransacted());
+                    
container.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
                     container.setMessageSelector(messageSelector);
                     Object executor = 
bus.getProperty(JMSFactory.JMS_CONDUIT_EXECUTOR);
                     if (executor instanceof Executor) {
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index 48953f53f53..b7866cf1af2 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -86,6 +86,8 @@ public static JMSConfiguration createFromEndpoint(Bus bus, 
JMSEndpoint endpoint)
         jmsConfig.setUserName(endpoint.getUsername());
         jmsConfig.setPassword(endpoint.getPassword());
         jmsConfig.setConcurrentConsumers(endpoint.getConcurrentConsumers());
+        
jmsConfig.setOneSessionPerConnection(endpoint.isOneSessionPerConnection());
+        jmsConfig.setMessageSelector(endpoint.getMessageSelector());
 
         TransactionManager tm = getTransactionManager(bus, endpoint);
         jmsConfig.setTransactionManager(tm);
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 434cb0cd295..bead3e7746b 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -91,6 +91,7 @@
     private boolean useConduitIdSelector = true;
     private String conduitSelectorPrefix;
     private boolean jmsProviderTibcoEms;
+    private boolean oneSessionPerConnection;
 
     private TransactionManager transactionManager;
 
@@ -432,6 +433,14 @@ public void setJmsProviderTibcoEms(boolean 
jmsProviderTibcoEms) {
         this.jmsProviderTibcoEms = jmsProviderTibcoEms;
     }
 
+    public boolean isOneSessionPerConnection() {
+        return oneSessionPerConnection;
+    }
+
+    public void setOneSessionPerConnection(boolean oneSessionPerConnection) {
+        this.oneSessionPerConnection = oneSessionPerConnection;
+    }
+
     public static Destination resolveOrCreateDestination(final Session session,
                                                          final 
DestinationResolver resolver,
                                                          final String 
replyToDestinationName,
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index b85411e3f7c..c72931bcf5b 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -90,7 +90,13 @@ protected Conduit getInbuiltBackChannel(Message inMessage) {
             && !robust) {
             return null;
         }
-        return new BackChannelConduit(inMessage, jmsConfig, connection);
+
+        if (jmsConfig.isOneSessionPerConnection()) {
+            return new BackChannelConduit(inMessage, jmsConfig);
+        } else {
+            return new BackChannelConduit(inMessage, jmsConfig, connection);
+        }
+
     }
 
     /**
@@ -105,14 +111,15 @@ public void activate() {
             if (e.getCause() != null && 
InvalidClientIDException.class.isInstance(e.getCause())) {
                 throw e;
             }
-            // If first connect fails we will try to establish the connection 
in the background 
-            new Thread(new Runnable() {
-
-                @Override
-                public void run() {
-                    restartConnection();
-                }
-            }).start();
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                // If first connect fails we will try to establish the 
connection in the background
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        restartConnection();
+                    }
+                }).start();
+            }
         }
     }
 
@@ -120,7 +127,6 @@ public void run() {
     private JMSListenerContainer createTargetDestinationListener() {
         Session session = null;
         try {
-            connection = JMSFactory.createConnection(jmsConfig);
             ExceptionListener exListener = new ExceptionListener() {
                 public void onException(JMSException exception) {
                     if (!shutdown) {
@@ -129,12 +135,17 @@ public void onException(JMSException exception) {
                     }
                 }
             };
-            session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            Destination destination = jmsConfig.getTargetDestination(session);
+            
+            PollingMessageListenerContainer container;
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                connection = JMSFactory.createConnection(jmsConfig);
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                Destination destination = 
jmsConfig.getTargetDestination(session);
+                container = new PollingMessageListenerContainer(connection, 
destination, this, exListener);
+            } else {
+                container = new PollingMessageListenerContainer(jmsConfig, 
false, this);
+            }
 
-            PollingMessageListenerContainer container = new 
PollingMessageListenerContainer(connection,
-                                                                               
             destination, 
-                                                                               
             this, exListener);
             
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
             container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());
@@ -149,7 +160,10 @@ public void onException(JMSException exception) {
             container.setJndiEnvironment(jmsConfig.getJndiEnvironment());
             container.start();
             suspendedContinuations.setListenerContainer(container);
-            connection.start();
+
+            if (!jmsConfig.isOneSessionPerConnection()) {
+                connection.start();
+            }
             return container;
         } catch (JMSException e) {
             ResourceCloser.close(connection);
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index 89c12fb50bb..dd004b80011 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -88,6 +88,7 @@
     private int concurrentConsumers = 1;
     private String messageSelector;
     private int retryInterval = 5000;
+    private boolean oneSessionPerConnection;
 
     /**
      * @param uri
@@ -499,4 +500,16 @@ public void setRetryInterval(String retryInterval) {
         this.retryInterval = Integer.valueOf(retryInterval);
     }
     
+    public boolean isOneSessionPerConnection() {
+        return oneSessionPerConnection;
+    }
+    
+    public void setOneSessionPerConnection(String oneSessionPerConnection) {
+        this.oneSessionPerConnection = 
Boolean.valueOf(oneSessionPerConnection);
+    }
+    
+    public void setOneSessionPerConnection(boolean oneSessionPerConnection) {
+        this.oneSessionPerConnection = oneSessionPerConnection;
+    }
+
 }
diff --git 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9edd0dac136..de2c57fa255 100644
--- 
a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ 
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -34,11 +34,23 @@
 import javax.transaction.Transaction;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.apache.cxf.transport.jms.JMSFactory;
 
 public class PollingMessageListenerContainer extends 
AbstractMessageListenerContainer {
     private static final Logger LOG = 
LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
     private ExceptionListener exceptionListener;
 
+    private JMSConfiguration jmsConfig;
+    private boolean reply;
+
+    public PollingMessageListenerContainer(JMSConfiguration jmsConfig, boolean 
isReply,
+                                           MessageListener listenerHandler) {
+        this.jmsConfig = jmsConfig;
+        this.reply = isReply;
+        this.listenerHandler = listenerHandler;
+    }
+
     public PollingMessageListenerContainer(Connection connection, Destination 
destination,
                                            MessageListener listenerHandler, 
ExceptionListener exceptionListener) {
         this.connection = connection;
@@ -55,9 +67,16 @@ public void run() {
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
-                    // Create session early to optimize performance            
    // In
+                    Connection connection;
+                    if (jmsConfig != null && 
jmsConfig.isOneSessionPerConnection()) {
+                        connection = closer.register(createConnection());
+                    } else {
+                        connection = 
PollingMessageListenerContainer.this.connection;
+                    }
+                    // Create session early to optimize performance
                     session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
-                    MessageConsumer consumer = 
closer.register(createConsumer(session));
+                    MessageConsumer consumer = 
closer.register(createConsumer(connection, session));
+
                     while (running) {
                         Message message = consumer.receive(1000);
                         try {
@@ -108,12 +127,20 @@ public void run() {
                         throw new IllegalStateException("External transactions 
are not supported in XAPoller");
                     }
                     transactionManager.begin();
+
+                    Connection connection;
+                    if (getConnection() == null) {
+                        connection = closer.register(createConnection());
+                    } else {
+                        connection = getConnection();
+                    }
+
                     /*
                      * Create session inside transaction to give it the
                      * chance to enlist itself as a resource
                      */
                     Session session = 
closer.register(connection.createSession(transacted, acknowledgeMode));
-                    MessageConsumer consumer = 
closer.register(createConsumer(session));
+                    MessageConsumer consumer = 
closer.register(createConsumer(connection, session));
                     Message message = consumer.receive(1000);
                     try {
                         if (message != null) {
@@ -122,7 +149,7 @@ public void run() {
                         transactionManager.commit();
                     } catch (Throwable e) {
                         LOG.log(Level.WARNING, "Exception while processing jms 
message in cxf. Rolling back", e);
-                        safeRollBack(session);
+                        safeRollBack();
                     }
                 } catch (Throwable e) {
                     handleException(e);
@@ -131,7 +158,7 @@ public void run() {
 
         }
 
-        protected void safeRollBack(Session session) {
+        private void safeRollBack() {
             try {
                 transactionManager.rollback();
             } catch (Throwable e) {
@@ -141,7 +168,31 @@ protected void safeRollBack(Session session) {
 
     }
 
+    private MessageConsumer createConsumer(final Connection connection, final 
Session session)
+            throws JMSException {
+        final MessageConsumer consumer;
+        
+        if (jmsConfig != null && jmsConfig.isOneSessionPerConnection()) {
+            Destination destination;
+            if (!isReply()) {
+                destination = jmsConfig.getTargetDestination(session);
+            } else {
+                destination = jmsConfig.getReplyDestination(session);
+            }
+            consumer = createConsumer(destination, session);
+            connection.start();
+        } else {
+            consumer = createConsumer(session);
+        }
+        
+        return consumer;
+    }
+
     private MessageConsumer createConsumer(Session session) throws 
JMSException {
+        return createConsumer(this.destination, session);
+    }
+
+    private MessageConsumer createConsumer(Destination destination, Session 
session) throws JMSException {
         if (durableSubscriptionName != null && destination instanceof Topic) {
             return session.createDurableSubscriber((Topic)destination, 
durableSubscriptionName,
                                                    messageSelector, 
pubSubNoLocal);
@@ -161,6 +212,19 @@ protected void handleException(Throwable e) {
         this.exceptionListener.onException(wrapped);
     }
 
+    private boolean isReply() {
+        return reply;
+    }
+
+    private Connection createConnection() {
+        try {
+            return JMSFactory.createConnection(jmsConfig);
+        } catch (JMSException e) {
+            handleException(e);
+            throw JMSUtil.convertJmsException(e);
+        }
+    }
+
     @Override
     public void start() {
         if (running) {
diff --git 
a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
 
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
index 609ce2c75b4..3492f32df83 100644
--- 
a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
+++ 
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SoapJmsSpecTest.java
@@ -106,7 +106,7 @@ public void testWsdlExtensionSpecJMS() throws Exception {
         JMSMessageHeadersType responseHeader = 
(JMSMessageHeadersType)responseContext
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
         Assert.assertEquals("1.0", responseHeader.getSOAPJMSBindingVersion());
-        Assert.assertEquals(null, responseHeader.getSOAPJMSSOAPAction());
+        Assert.assertEquals("\"test\"", responseHeader.getSOAPJMSSOAPAction());
         Assert.assertEquals(DeliveryMode.PERSISTENT, 
responseHeader.getJMSDeliveryMode());
         Assert.assertEquals(7, responseHeader.getJMSPriority());
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SOAP over JMS transport does not use XA transactions with Websphere MQ 
> resource adapter
> ---------------------------------------------------------------------------------------
>
>                 Key: CXF-7023
>                 URL: https://issues.apache.org/jira/browse/CXF-7023
>             Project: CXF
>          Issue Type: Bug
>          Components: JMS
>    Affects Versions: 3.1.7
>            Reporter: Nikolay Boklaschuk
>            Assignee: Christian Schneider
>            Priority: Major
>
> When using Websphere MQ resource adapter
> Inbound one-way service does not uses XA transactions.
> This is because WMQ adapter decides to use XA transaction when creates jms 
> connection, but connection opened in JMSDestination, and transaction started 
> in PollingMessageListnerContainer after connection created.
> Futhermore WMQ adapter holds only one session per connection.
> I have patched XAPoller to hold connection for each thread, it works, but may 
> be there are better way to provide support for WMQ adapter? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to