Repository: cxf Updated Branches: refs/heads/master 728a647a2 -> 638f4e8df
CXF-5680 Support case when jms server is down at start Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/638f4e8d Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/638f4e8d Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/638f4e8d Branch: refs/heads/master Commit: 638f4e8df6ae21b40244366a18b1af236ead0f0e Parents: 728a647 Author: Christian Schneider <[email protected]> Authored: Thu Apr 10 13:45:42 2014 +0200 Committer: Christian Schneider <[email protected]> Committed: Thu Apr 10 13:45:42 2014 +0200 ---------------------------------------------------------------------- .../cxf/transport/jms/BackChannelConduit.java | 4 +- .../apache/cxf/transport/jms/JMSConduit.java | 5 +- .../cxf/transport/jms/JMSConfiguration.java | 11 ++ .../cxf/transport/jms/JMSDestination.java | 44 ++++-- .../util/AbstractMessageListenerContainer.java | 121 +++++++++++++++ .../cxf/transport/jms/util/JMSSender.java | 18 ++- .../jms/util/MessageListenerContainer.java | 90 +---------- .../util/PollingMessageListenerContainer.java | 62 +------- .../transport/jms/util/MessageListenerTest.java | 4 +- .../testcases/SOAPJMSTestSuiteTest.java | 2 +- .../jms/tx/GreeterImplWithTransaction.java | 27 +++- .../jms/tx/JMSTransactionClientServerTest.java | 149 ++++--------------- 12 files changed, 238 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java index 321eded..08bb4f3 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java @@ -114,7 +114,7 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { ResourceCloser closer = new ResourceCloser(); try { Session session = closer - .register(connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE)); + .register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); @@ -160,7 +160,7 @@ class BackChannelConduit extends AbstractConduit implements JMSExchangeSender { correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS); JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties); LOG.log(Level.FINE, "server sending reply: ", reply); - sender.sendMessage(closer, session, replyTo, reply); + sender.sendMessage(session, replyTo, reply); } catch (JMSException ex) { throw JMSUtil.convertJmsException(ex); } finally { http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java index 93771a2..291b608 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java @@ -148,7 +148,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me ResourceCloser closer = new ResourceCloser(); try { Connection c = getConnection(); - Session session = closer.register(c.createSession(jmsConfig.isSessionTransacted(), + Session session = closer.register(c.createSession(false, Session.AUTO_ACKNOWLEDGE)); if (exchange.isOneWay()) { @@ -203,7 +203,6 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me .getReplyToDestination(session, headers.getJMSReplyTo()); String jmsMessageID = sendMessage(request, outMessage, replyToDestination, correlationId, closer, session); - boolean useSyncReceive = ((correlationId == null || userCID != null) && !jmsConfig.isPubSubDomain()) || !replyToDestination.equals(staticReplyDestination); if (correlationId == null) { @@ -254,7 +253,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers); Destination targetDest = jmsConfig.getTargetDestination(session); - sender.sendMessage(closer, session, targetDest, message); + sender.sendMessage(session, targetDest, message); String jmsMessageID = message.getJMSMessageID(); LOG.log(Level.FINE, "client sending request message " + jmsMessageID + " to " + targetDest); http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java index 84321c6..ecda3b0 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java @@ -25,6 +25,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; +import javax.transaction.TransactionManager; import org.apache.cxf.common.injection.NoJSR250Annotations; import org.apache.cxf.transport.jms.util.DestinationResolver; @@ -85,6 +86,8 @@ public class JMSConfiguration { private boolean useConduitIdSelector = true; private String conduitSelectorPrefix; private boolean jmsProviderTibcoEms; + + private TransactionManager transactionManager; // For jms spec. Do not configure manually private String targetService; @@ -413,4 +416,12 @@ public class JMSConfiguration { return destinationResolver.resolveDestinationName(session, replyToName, replyPubSubDomain); } + public TransactionManager getTransactionManager() { + return transactionManager; + } + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index e9dc970..34398c3 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -46,9 +46,11 @@ import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.AbstractMultiplexDestination; import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider; +import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer; import org.apache.cxf.transport.jms.util.JMSListenerContainer; import org.apache.cxf.transport.jms.util.JMSUtil; import org.apache.cxf.transport.jms.util.MessageListenerContainer; +import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer; import org.apache.cxf.transport.jms.util.ResourceCloser; public class JMSDestination extends AbstractMultiplexDestination implements MessageListener { @@ -93,7 +95,18 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess public void activate() { getLogger().log(Level.FINE, "JMSDestination activate().... "); jmsConfig.ensureProperlyConfigured(); - jmsListener = createTargetDestinationListener(); + try { + this.jmsListener = createTargetDestinationListener(); + } catch (Exception e) { + // If first connect fails we will try to establish the connection in the background + new Thread(new Runnable() { + + @Override + public void run() { + restartConnection(); + } + }).start(); + } } @@ -102,21 +115,25 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess try { connection = JMSFactory.createConnection(jmsConfig); connection.setExceptionListener(new ExceptionListener() { - - @Override public void onException(JMSException exception) { - restartConnection(exception); + LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception); + restartConnection(); } }); - connection.start(); - session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = jmsConfig.getTargetDestination(session); - MessageListenerContainer container = new MessageListenerContainer(connection, destination, this); + AbstractMessageListenerContainer container = jmsConfig.getTransactionManager() != null + ? new PollingMessageListenerContainer(connection, destination, this) + : new MessageListenerContainer(connection, destination, this); + container.setTransactionManager(jmsConfig.getTransactionManager()); container.setMessageSelector(jmsConfig.getMessageSelector()); + container.setTransacted(jmsConfig.isSessionTransacted()); + Executor executor = JMSFactory.createExecutor(bus, "jms-destination"); container.setExecutor(executor); container.start(); suspendedContinuations.setListenerContainer(container); + connection.start(); return container; } catch (JMSException e) { throw JMSUtil.convertJmsException(e); @@ -125,20 +142,19 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess } } - protected void restartConnection(JMSException e) { - LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", e); + protected void restartConnection() { int tries = 0; do { tries++; try { deactivate(); - activate(); - LOG.log(Level.INFO, "Reestablished JMS connection"); + this.jmsListener = createTargetDestinationListener(); + LOG.log(Level.INFO, "Established JMS connection"); } catch (Exception e1) { jmsListener = null; String message = "Exception on reconnect. Trying again, attempt num " + tries; if (LOG.isLoggable(Level.FINE)) { - LOG.log(Level.WARNING, message, e); + LOG.log(Level.WARNING, message, e1); } else { LOG.log(Level.WARNING, message); } @@ -155,6 +171,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess if (jmsListener != null) { jmsListener.shutdown(); } + ResourceCloser.close(connection); suspendedContinuations.setListenerContainer(null); connection = null; } @@ -198,9 +215,6 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess origBus = BusFactory.getAndSetThreadDefaultBus(bus); - // FIXME - // JCATransactionalMessageListenerContainer.setMessageEndpoint(inMessage); - // handle the incoming message incomingObserver.onMessage(inMessage); http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java new file mode 100644 index 0000000..f33b3c8 --- /dev/null +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms.util; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.transaction.TransactionManager; + +import org.apache.cxf.common.logging.LogUtils; + +public abstract class AbstractMessageListenerContainer implements JMSListenerContainer { + + protected static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); + protected Connection connection; + protected Destination destination; + protected MessageListener listenerHandler; + protected boolean transacted; + protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + protected String messageSelector; + protected boolean running; + protected MessageConsumer consumer; + protected Session session; + protected Executor executor; + protected String durableSubscriptionName; + protected boolean pubSubNoLocal; + protected TransactionManager transactionManager; + + public AbstractMessageListenerContainer() { + super(); + } + + public Connection getConnection() { + return connection; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + if (this.transacted) { + this.acknowledgeMode = Session.SESSION_TRANSACTED; + } + } + + public void setAcknowledgeMode(int acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + protected Executor getExecutor() { + if (executor == null) { + executor = Executors.newFixedThreadPool(10); + } + return executor; + } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + public void setPubSubNoLocal(boolean pubSubNoLocal) { + this.pubSubNoLocal = pubSubNoLocal; + } + + @Override + public boolean isRunning() { + return running; + } + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + + + + + /* + protected TransactionManager getTransactionManager() { + if (this.transactionManager == null) { + try { + InitialContext ctx = new InitialContext(); + this.transactionManager = (TransactionManager)ctx + .lookup("javax.transaction.TransactionManager"); + } catch (NamingException e) { + // Ignore + } + } + return this.transactionManager; + } + */ + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java index a5d8424..5a0d099 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java @@ -45,13 +45,19 @@ public class JMSSender { this.timeToLive = timeToLive; } - public void sendMessage(ResourceCloser closer, Session session, Destination targetDest, + public void sendMessage(Session session, Destination targetDest, javax.jms.Message message) throws JMSException { - MessageProducer producer = closer.register(session.createProducer(targetDest)); - if (explicitQosEnabled) { - producer.send(message, deliveryMode, priority, timeToLive); - } else { - producer.send(message); + MessageProducer producer = null; + try { + producer = session.createProducer(targetDest); + if (explicitQosEnabled) { + producer.send(message, deliveryMode, priority, timeToLive); + } else { + producer.send(message); + } + } finally { + ResourceCloser.close(producer); } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java index 106934c..3a7b2e3 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java @@ -19,25 +19,18 @@ package org.apache.cxf.transport.jms.util; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.logging.Level; -import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.XASession; -import javax.naming.InitialContext; -import javax.naming.NamingException; import javax.transaction.TransactionManager; -import org.apache.cxf.common.logging.LogUtils; - /** * Listen for messages on a queue or topic asynchronously by registering a * MessageListener. @@ -45,74 +38,14 @@ import org.apache.cxf.common.logging.LogUtils; * Warning: This class does not refresh connections when the server goes away * This has to be handled outside. */ -public class MessageListenerContainer implements JMSListenerContainer { - private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); - - private Connection connection; - private Destination destination; - private MessageListener listenerHandler; - private boolean transacted; - private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; - private String messageSelector; - private boolean running; - private MessageConsumer consumer; - private Session session; - private Executor executor; - private String durableSubscriptionName; - private boolean pubSubNoLocal; - private TransactionManager transactionManager; - +public class MessageListenerContainer extends AbstractMessageListenerContainer { public MessageListenerContainer(Connection connection, Destination destination, MessageListener listenerHandler) { this.connection = connection; this.destination = destination; this.listenerHandler = listenerHandler; } - - public Connection getConnection() { - return connection; - } - - public void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - public void setAcknowledgeMode(int acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - protected Executor getExecutor() { - if (executor == null) { - executor = Executors.newFixedThreadPool(10); - } - return executor; - } - - public void setExecutor(Executor executor) { - this.executor = executor; - } - - public void setDurableSubscriptionName(String durableSubscriptionName) { - this.durableSubscriptionName = durableSubscriptionName; - } - - public void setPubSubNoLocal(boolean pubSubNoLocal) { - this.pubSubNoLocal = pubSubNoLocal; - } - - @Override - public boolean isRunning() { - return running; - } - - public void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - + @Override public void start() { try { @@ -124,9 +57,7 @@ public class MessageListenerContainer implements JMSListenerContainer { consumer = session.createConsumer(destination, messageSelector); } - MessageListener intListener = (transactionManager != null) - ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) - : new LocalTransactionalMessageListener(session, listenerHandler); + MessageListener intListener = new LocalTransactionalMessageListener(session, listenerHandler); // new DispachingListener(getExecutor(), listenerHandler); consumer.setMessageListener(intListener); @@ -144,26 +75,13 @@ public class MessageListenerContainer implements JMSListenerContainer { consumer = null; session = null; } - + @Override public void shutdown() { stop(); ResourceCloser.close(connection); } - protected TransactionManager getTransactionManager() { - if (this.transactionManager == null) { - try { - InitialContext ctx = new InitialContext(); - this.transactionManager = (TransactionManager)ctx - .lookup("javax.transaction.TransactionManager"); - } catch (NamingException e) { - // Ignore - } - } - return this.transactionManager; - } - static class DispachingListener implements MessageListener { private Executor executor; private MessageListener listenerHandler; http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index b7c725d..1d81a7e 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -18,7 +18,6 @@ */ package org.apache.cxf.transport.jms.util; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -31,27 +30,12 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; -import javax.transaction.TransactionManager; import org.apache.cxf.common.logging.LogUtils; -public class PollingMessageListenerContainer implements JMSListenerContainer { +public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); - private Connection connection; - private Destination destination; - private MessageListener listenerHandler; - private boolean transacted; - private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; - private String messageSelector; - private boolean running; - private Executor executor; - @SuppressWarnings("unused") - private String durableSubscriptionName; - @SuppressWarnings("unused") - private boolean pubSubNoLocal; - private TransactionManager transactionManager; - private ExecutorService pollers; private int numListenerThreads = 1; @@ -63,50 +47,6 @@ public class PollingMessageListenerContainer implements JMSListenerContainer { this.listenerHandler = listenerHandler; } - public Connection getConnection() { - return connection; - } - - public void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - public void setAcknowledgeMode(int acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - protected Executor getExecutor() { - if (executor == null) { - executor = Executors.newFixedThreadPool(10); - } - return executor; - } - - public void setExecutor(Executor executor) { - this.executor = executor; - } - - public void setDurableSubscriptionName(String durableSubscriptionName) { - this.durableSubscriptionName = durableSubscriptionName; - } - - public void setPubSubNoLocal(boolean pubSubNoLocal) { - this.pubSubNoLocal = pubSubNoLocal; - } - - @Override - public boolean isRunning() { - return running; - } - - public void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - class Poller implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index c1bf86a..8c05076 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -75,7 +75,7 @@ public class MessageListenerTest { Queue dest = createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); - PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest, + AbstractMessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); container.setTransacted(false); container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); @@ -99,7 +99,7 @@ public class MessageListenerTest { Connection connection = createConnection("brokerLocalTransaction"); Queue dest = createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); - MessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); + AbstractMessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler); container.setTransacted(true); container.setAcknowledgeMode(Session.SESSION_TRANSACTED); container.start(); http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java ---------------------------------------------------------------------- diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java index dd0f628..1b8939a 100644 --- a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java +++ b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java @@ -647,7 +647,7 @@ public class SOAPJMSTestSuiteTest extends AbstractBusClientServerTestBase { Destination replyToDestination = jmsConfig.getReplyToDestination(session, null); JMSSender sender = JMSFactory.createJmsSender(jmsConfig, null); Message jmsMessage = JMSTestUtil.buildJMSMessageFromTestCase(testcase, session, replyToDestination); - sender.sendMessage(closer, session, targetDest, jmsMessage); + sender.sendMessage(session, targetDest, jmsMessage); Message replyMessage = JMSUtil.receive(session, replyToDestination, jmsMessage.getJMSMessageID(), 10000, true); checkReplyMessage(replyMessage, testcase); http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java ---------------------------------------------------------------------- diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java index 3536f86..e2a16a2 100644 --- a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java +++ b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/GreeterImplWithTransaction.java @@ -22,15 +22,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jws.WebService; - -import org.apache.cxf.systest.jms.GreeterImplDocBase; +import org.apache.hello_world_doc_lit.Greeter; +import org.apache.hello_world_doc_lit.PingMeFault; @WebService(endpointInterface = "org.apache.hello_world_doc_lit.Greeter") -public class GreeterImplWithTransaction extends GreeterImplDocBase { +public class GreeterImplWithTransaction implements Greeter { private AtomicBoolean flag = new AtomicBoolean(true); public String greetMe(String requestType) { - //System.out.println("Reached here :" + requestType); + System.out.println("Reached here :" + requestType); if ("Bad guy".equals(requestType)) { if (flag.getAndSet(false)) { //System.out.println("Throw exception here :" + requestType); @@ -42,5 +42,24 @@ public class GreeterImplWithTransaction extends GreeterImplDocBase { } return "Hello " + requestType; } + + @Override + public void greetMeOneWay(String name) { + if ("Bad guy".equals(name)) { + throw new RuntimeException("Got a bad guy call for greetMe"); + } + } + + @Override + public void pingMe() throws PingMeFault { + // TODO Auto-generated method stub + + } + + @Override + public String sayHi() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/638f4e8d/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java ---------------------------------------------------------------------- diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java index fb40aa4..1b262e8 100644 --- a/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java +++ b/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java @@ -18,145 +18,56 @@ */ package org.apache.cxf.systest.jms.tx; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.URL; +import java.util.Collections; -import javax.jms.ConnectionFactory; -import javax.xml.namespace.QName; - -import org.apache.activemq.pool.PooledConnectionFactory; -import org.apache.cxf.Bus; -import org.apache.cxf.BusFactory; -import org.apache.cxf.bus.spring.SpringBusFactory; import org.apache.cxf.jaxws.EndpointImpl; import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; -import org.apache.cxf.testutil.common.AbstractBusTestServerBase; -import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher; -import org.apache.cxf.transport.jms.JMSConfigFeature; -import org.apache.cxf.transport.jms.JMSConfiguration; +import org.apache.cxf.systest.jms.AbstractVmJMSTest; +import org.apache.cxf.transport.jms.spec.JMSSpecConstants; import org.apache.hello_world_doc_lit.Greeter; -import org.apache.hello_world_doc_lit.PingMeFault; -import org.apache.hello_world_doc_lit.SOAPService2; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.springframework.context.support.ClassPathXmlApplicationContext; -/** - * Test transactions based on spring transactions. - * These will not be supported anymore in cxf >= 3 - */ @Ignore -public class JMSTransactionClientServerTest extends AbstractBusClientServerTestBase { - private static final String BROKER_URI = "vm://JMSTransactionClientServerTest?broker.persistent=false"; - private static EmbeddedJMSBrokerLauncher broker; - - public static class Server extends AbstractBusTestServerBase { - ClassPathXmlApplicationContext context; - EndpointImpl endpoint; - protected void run() { - SpringBusFactory bf = new SpringBusFactory(); - Bus bus = bf.createBus("org/apache/cxf/systest/jms/tx/jms_server_config.xml"); - BusFactory.setDefaultBus(bus); - endpoint = new EndpointImpl(bus, new GreeterImplWithTransaction()); - endpoint.setAddress("jms:queue:greeter.queue.noaop?sessionTransacted=true"); - endpoint.publish(); - } - public void tearDown() { - endpoint.stop(); - context.close(); - } - } +public class JMSTransactionClientServerTest extends AbstractVmJMSTest { + private static final String SERVICE_ADDRESS = + "jms:queue:greeter.queue.tx?receivetTimeOut=5000&sessionTransacted=true"; + private static EndpointImpl endpoint; @BeforeClass public static void startServers() throws Exception { - broker = new EmbeddedJMSBrokerLauncher(BROKER_URI); - System.setProperty("EmbeddedBrokerURL", broker.getBrokerURL()); - launchServer(broker); - launchServer(new Server()); - createStaticBus(); + startBusAndJMS(JMSTransactionClientServerTest.class); + + endpoint = new EndpointImpl(bus, new GreeterImplWithTransaction()); + endpoint.setAddress(SERVICE_ADDRESS); + endpoint.setFeatures(Collections.singletonList(cff)); + endpoint.publish(); } + @AfterClass public static void clearProperty() { - System.clearProperty("EmbeddedBrokerURL"); - } - public URL getWSDLURL(String s) throws Exception { - return getClass().getResource(s); - } - public QName getServiceName(QName q) { - return q; + endpoint.stop(); } - public QName getPortName(QName q) { - return q; - } - - @Ignore - @Test - public void testDocBasicConnection() throws Exception { - QName serviceName = getServiceName(new QName("http://apache.org/hello_world_doc_lit", - "SOAPService2")); - QName portName = getPortName(new QName("http://apache.org/hello_world_doc_lit", "SoapPort2")); - URL wsdl = getWSDLURL("/wsdl/hello_world_doc_lit.wsdl"); - assertNotNull(wsdl); - String wsdlString = wsdl.toString(); - SOAPService2 service = new SOAPService2(wsdl, serviceName); - broker.updateWsdl(getBus(), wsdlString); - assertNotNull(service); - Greeter greeter = service.getPort(portName, Greeter.class); - doService(greeter, true); - } - - @Ignore @Test - public void testNonAopTransaction() throws Exception { - JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); - factory.setServiceClass(Greeter.class); - factory.setAddress("jms://"); - - JMSConfiguration jmsConfig = new JMSConfiguration(); - ConnectionFactory connectionFactory - = new PooledConnectionFactory(broker.getBrokerURL()); - jmsConfig.setConnectionFactory(connectionFactory); - jmsConfig.setTargetDestination("greeter.queue.noaop"); - jmsConfig.setPubSubDomain(false); - - JMSConfigFeature jmsConfigFeature = new JMSConfigFeature(); - jmsConfigFeature.setJmsConfig(jmsConfig); - factory.getFeatures().add(jmsConfigFeature); - - Greeter greeter = (Greeter)factory.create(); - doService(greeter, false); - } - public void doService(Greeter greeter, boolean doEx) throws Exception { - - String response1 = new String("Hello "); + public void testTransaction() throws Exception { + Greeter greeter = createGreeterProxy(); + // Should be processed normally + greeter.greetMeOneWay("Good guy"); - try { - - String greeting = greeter.greetMe("Good guy"); - assertNotNull("No response received from service", greeting); - String exResponse = response1 + "Good guy"; - assertEquals("Get unexcpeted result", exResponse, greeting); - - greeting = greeter.greetMe("Bad guy"); - assertNotNull("No response received from service", greeting); - exResponse = response1 + "[Bad guy]"; - assertEquals("Get unexcpeted result", exResponse, greeting); - - if (doEx) { - try { - greeter.pingMe(); - fail("Should have thrown FaultException"); - } catch (PingMeFault ex) { - assertNotNull(ex.getFaultInfo()); - } - } - } catch (UndeclaredThrowableException ex) { - throw (Exception)ex.getCause(); - } + // Should cause rollback, redelivery and in the end the message should go to the dead letter queue + greeter.greetMe("Bad guy"); } + private Greeter createGreeterProxy() throws Exception { + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setBus(bus); + factory.getFeatures().add(cff); + factory.setTransportId(JMSSpecConstants.SOAP_JMS_SPECIFICATION_TRANSPORTID); + factory.setServiceClass(Greeter.class); + factory.setAddress(SERVICE_ADDRESS); + return (Greeter)markForClose(factory.create()); + } }
