This is an automated email from the ASF dual-hosted git repository. buhhunyx pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push: new 7225b3a cxf-rt-transports-jms: improve test stability 7225b3a is described below commit 7225b3a8c2a52218d2c2efdd0daa1556ffbf321b Author: amarkevich <amarkev...@talend.com> AuthorDate: Thu Dec 27 16:37:58 2018 +0300 cxf-rt-transports-jms: improve test stability --- .../cxf/transport/jms/AbstractJMSTester.java | 164 +++++++++++++-------- .../apache/cxf/transport/jms/JMSConduitTest.java | 11 +- .../cxf/transport/jms/JMSConfigFactoryTest.java | 4 +- .../cxf/transport/jms/JMSDestinationTest.java | 131 ++++------------ .../MessageIdAsCorrelationIdJMSConduitTest.java | 31 ++-- .../jms/PooledConnectionTempQueueTest.java | 29 ++-- .../cxf/transport/jms/RequestResponseTest.java | 54 +------ .../jms/uri/URIConfiguredConduitTest.java | 18 +-- .../transport/jms/util/MessageListenerTest.java | 40 ++--- 9 files changed, 190 insertions(+), 292 deletions(-) diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java index 9340ef9..37acc66 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java @@ -18,16 +18,16 @@ */ package org.apache.cxf.transport.jms; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.io.Writer; import java.net.URL; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; import javax.xml.namespace.QName; import org.apache.activemq.ActiveMQConnectionFactory; @@ -39,36 +39,35 @@ import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; +import org.apache.cxf.message.MessageImpl; import org.apache.cxf.service.Service; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.testutil.common.TestUtil; import org.apache.cxf.transport.Conduit; import org.apache.cxf.transport.MessageObserver; -import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.wsdl11.WSDLServiceFactory; import org.junit.AfterClass; import org.junit.BeforeClass; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public abstract class AbstractJMSTester { protected static final String WSDL = "/jms_test.wsdl"; protected static final String SERVICE_NS = "http://cxf.apache.org/hello_world_jms"; protected static final int MAX_RECEIVE_TIME = 10; - protected static final String MESSAGE_CONTENT = "HelloWorld"; protected static Bus bus; protected static ActiveMQConnectionFactory cf1; protected static ConnectionFactory cf; protected static BrokerService broker; + private static final String MESSAGE_CONTENT = "HelloWorld"; protected enum ExchangePattern { oneway, requestReply }; - protected EndpointReferenceType target; - protected Message inMessage; - protected Message destMessage; + private final AtomicReference<Message> inMessage = new AtomicReference<>(); + private final AtomicReference<Message> destMessage = new AtomicReference<>(); @BeforeClass public static void startSerices() throws Exception { @@ -92,12 +91,12 @@ public abstract class AbstractJMSTester { broker.stop(); } - protected EndpointInfo setupServiceInfo(String serviceName, String portName) { + protected static EndpointInfo setupServiceInfo(String serviceName, String portName) { return setupServiceInfo(SERVICE_NS, WSDL, serviceName, portName); } - protected EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) { - URL wsdlUrl = getClass().getResource(wsdl); + protected static EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) { + URL wsdlUrl = AbstractJMSTester.class.getResource(wsdl); if (wsdlUrl == null) { throw new IllegalArgumentException("Wsdl file not found on class path " + wsdl); } @@ -113,64 +112,72 @@ public abstract class AbstractJMSTester { protected MessageObserver createMessageObserver() { return new MessageObserver() { public void onMessage(Message m) { - Exchange exchange = new ExchangeImpl(); - exchange.setInMessage(m); - m.setExchange(exchange); - destMessage = m; +// Exchange exchange = new ExchangeImpl(); +// exchange.setInMessage(m); +// m.setExchange(exchange); + destMessage.set(m); + synchronized (destMessage) { + destMessage.notifyAll(); + } } }; } - protected void sendMessageAsync(Conduit conduit, Message message) throws IOException { + protected static void sendMessageAsync(Conduit conduit, Message message) throws IOException { sendoutMessage(conduit, message, false, false); } - protected void sendMessageSync(Conduit conduit, Message message) throws IOException { + protected static void sendMessageSync(Conduit conduit, Message message) throws IOException { sendoutMessage(conduit, message, false, true); } - protected void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException { + protected static void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException { sendoutMessage(conduit, message, false, synchronous); } - protected void sendOneWayMessage(Conduit conduit, Message message) throws IOException { + protected static void sendOneWayMessage(Conduit conduit, Message message) throws IOException { sendoutMessage(conduit, message, true, true); } - private void sendoutMessage(Conduit conduit, + private static void sendoutMessage(Conduit conduit, Message message, boolean isOneWay, boolean synchronous) throws IOException { - - Exchange exchange = new ExchangeImpl(); + final Exchange exchange = new ExchangeImpl(); exchange.setOneWay(isOneWay); exchange.setSynchronous(synchronous); message.setExchange(exchange); exchange.setOutMessage(message); conduit.prepare(message); - OutputStream os = message.getContent(OutputStream.class); - Writer writer = message.getContent(Writer.class); - assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null); - if (os != null) { - os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding - os.close(); - } else { - writer.write(MESSAGE_CONTENT); - writer.close(); + try (OutputStream os = message.getContent(OutputStream.class)) { + if (os != null) { + os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding + return; + } + } + try (Writer writer = message.getContent(Writer.class)) { + if (writer != null) { + writer.write(MESSAGE_CONTENT); + return; + } } + fail("The OutputStream and Writer should not both be null"); } - protected JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException { + protected static JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException { JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); jmsConfig.setConnectionFactory(cf); - return new JMSConduit(target, jmsConfig, bus); + return new JMSConduit(null, jmsConfig, bus); } protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws IOException { JMSConduit jmsConduit = setupJMSConduit(ei); MessageObserver observer = new MessageObserver() { public void onMessage(Message m) { - inMessage = m; + inMessage.set(m); + synchronized (inMessage) { + inMessage.notifyAll(); + } } }; jmsConduit.setMessageObserver(observer); @@ -183,58 +190,85 @@ public abstract class AbstractJMSTester { return new JMSDestination(bus, ei, jmsConfig); } - protected String getContent(Message message) { - ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class); + protected static Message createMessage() { + return createMessage(null); + } + + protected static Message createMessage(String correlationId) { + Message outMessage = new MessageImpl(); + JMSMessageHeadersType header = new JMSMessageHeadersType(); + header.setJMSDeliveryMode(DeliveryMode.PERSISTENT); + header.setJMSPriority(1); + header.setTimeToLive(1000L); + outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header); + outMessage.put(Message.ENCODING, "US-ASCII"); + return outMessage; + } + + protected static void verifyReceivedMessage(Message message) { String response = "<not found>"; + InputStream bis = message.getContent(InputStream.class); if (bis != null) { - byte[] bytes = new byte[bis.available()]; try { + byte[] bytes = new byte[bis.available()]; bis.read(bytes); + response = IOUtils.newStringFromBytes(bytes); } catch (IOException ex) { - assertFalse("Read the Destination recieved Message error ", false); - ex.printStackTrace(); + fail("Read the Destination recieved Message error: " + ex.getMessage()); } - response = IOUtils.newStringFromBytes(bytes); } else { - StringReader reader = (StringReader)message.getContent(Reader.class); + Reader reader = message.getContent(Reader.class); char[] buffer = new char[5000]; try { int i = reader.read(buffer); response = new String(buffer, 0, i); } catch (IOException e) { - assertFalse("Read the Destination recieved Message error ", false); - e.printStackTrace(); + fail("Read the Destination recieved Message error: " + e.getMessage()); } } - return response; + assertEquals("The response content should be equal", MESSAGE_CONTENT, response); } - protected void waitForReceiveInMessage() { - int waitTime = 0; - while (inMessage == null && waitTime < MAX_RECEIVE_TIME * 10) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // do nothing here + protected static void verifyHeaders(Message msgIn, Message msgOut) { + JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut + .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); + + JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn + .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + + verifyJmsHeaderEquality(outHeader, inHeader); + + } + + protected static void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) { + assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader + .getJMSPriority(), inHeader.getJMSPriority()); + assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader + .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode()); + assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader + .getJMSType(), inHeader.getJMSType()); + } + + + protected Message waitForReceiveInMessage() throws InterruptedException { + if (null == inMessage.get()) { + synchronized (inMessage) { + inMessage.wait(MAX_RECEIVE_TIME * 1000L); } - waitTime++; + assertNotNull("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds", inMessage.get()); } - assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds", - inMessage != null); + return inMessage.getAndSet(null); } - protected void waitForReceiveDestMessage() { - int waitTime = 0; - while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - // do nothing here + protected Message waitForReceiveDestMessage() throws InterruptedException { + if (null == destMessage.get()) { + synchronized (destMessage) { + destMessage.wait(MAX_RECEIVE_TIME * 1000L); } - waitTime++; + assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME + " seconds", + destMessage.get()); } - assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME - + " seconds", destMessage); + return destMessage.getAndSet(null); } } \ No newline at end of file diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java index 6a6ee45..bf34bd3 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java @@ -36,8 +36,7 @@ public class JMSConduitTest extends AbstractJMSTester { @Test public void testGetConfiguration() throws Exception { - EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL, - "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); + EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); JMSConduit conduit = setupJMSConduit(ei); assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig() .getReceiveTimeout().longValue()); @@ -46,8 +45,7 @@ public class JMSConduitTest extends AbstractJMSTester { @Test public void testPrepareSend() throws Exception { - EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL, - "HelloWorldService", "HelloWorldPort"); + EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort"); JMSConduit conduit = setupJMSConduit(ei); Message message = new MessageImpl(); @@ -66,12 +64,11 @@ public class JMSConduitTest extends AbstractJMSTester { */ @Test public void testTimeoutOnReceive() throws Exception { - EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL, - "HelloWorldServiceLoop", "HelloWorldPortLoop"); + EndpointInfo ei = setupServiceInfo("HelloWorldServiceLoop", "HelloWorldPortLoop"); JMSConduit conduit = setupJMSConduitWithObserver(ei); // If the system is extremely fast. The message could still get through - conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1)); + conduit.getJmsConfig().setReceiveTimeout(1L); Message message = new MessageImpl(); try { sendMessageSync(conduit, message); diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java index c15940f..f785a0f 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java @@ -39,7 +39,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester { @Test public void testUsernameAndPassword() throws Exception { EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort"); - JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target); + JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); Assert.assertEquals("User name does not match.", "testUser", config.getUserName()); Assert.assertEquals("Password does not match.", "testPassword", config.getPassword()); } @@ -92,7 +92,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester { @Test public void testMessageSelectorIsSet() { EndpointInfo ei = setupServiceInfo("HelloWorldSelectorService", "HelloWorldPort"); - JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target); + JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); Assert.assertEquals("customJMSAttribute=helloWorld", config.getMessageSelector()); } } diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index f216f6b..22a770c 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -19,12 +19,6 @@ package org.apache.cxf.transport.jms; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.io.StringReader; - import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -33,7 +27,6 @@ import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Topic; -import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; @@ -49,7 +42,6 @@ import org.junit.Ignore; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -68,17 +60,15 @@ public class JMSDestinationTest extends AbstractJMSTester { @Test public void testDurableSubscriber() throws Exception { - destMessage = null; EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); JMSConduit conduit = setupJMSConduitWithObserver(ei); - Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage); + Message outMessage = createMessage(); JMSDestination destination = setupJMSDestination(ei); destination.setMessageObserver(createMessageObserver()); // The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber - Thread.sleep(500); + Thread.sleep(500L); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + Message destMessage = waitForReceiveDestMessage(); assertNotNull("The destiantion should have got the message ", destMessage); verifyReceivedMessage(destMessage); @@ -86,7 +76,7 @@ public class JMSDestinationTest extends AbstractJMSTester { conduit.close(); destination.shutdown(); } - + @Test(expected = InvalidClientIDException.class) public void testDurableInvalidClientId() throws Throwable { Connection con = cf1.createConnection(); @@ -94,7 +84,6 @@ public class JMSDestinationTest extends AbstractJMSTester { try { con.setClientID("testClient"); con.start(); - destMessage = null; EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); jmsConfig.setDurableSubscriptionClientId("testClient"); @@ -117,12 +106,11 @@ public class JMSDestinationTest extends AbstractJMSTester { destination.setMessageObserver(createMessageObserver()); JMSConduit conduit = setupJMSConduitWithObserver(ei); - Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage); + Message outMessage = createMessage(); sendOneWayMessage(conduit, outMessage); // wait for the message to be get from the destination - waitForReceiveDestMessage(); + Message destMessage = waitForReceiveDestMessage(); // just verify the Destination inMessage assertNotNull("The destiantion should have got the message ", destMessage); verifyReceivedMessage(destMessage); @@ -131,51 +119,17 @@ public class JMSDestinationTest extends AbstractJMSTester { destination.shutdown(); } - private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) { + private static void setupMessageHeader(Message outMessage, String correlationId, String replyTo) { JMSMessageHeadersType header = new JMSMessageHeadersType(); header.setJMSCorrelationID(correlationId); header.setJMSDeliveryMode(DeliveryMode.PERSISTENT); header.setJMSPriority(1); - header.setTimeToLive(1000); - header.setJMSReplyTo(replyTo != null ? replyTo : null); + header.setTimeToLive(1000L); + header.setJMSReplyTo(replyTo); outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header); outMessage.put(Message.ENCODING, "US-ASCII"); } - private void setupMessageHeader(Message outMessage) { - setupMessageHeader(outMessage, "Destination test", null); - } - - private void setupMessageHeader(Message outMessage, String correlationId) { - setupMessageHeader(outMessage, correlationId, null); - } - - private void verifyReceivedMessage(Message message) { - ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class); - String response = "<not found>"; - if (bis != null) { - byte[] bytes = new byte[bis.available()]; - try { - bis.read(bytes); - } catch (IOException ex) { - assertFalse("Read the Destination recieved Message error ", false); - ex.printStackTrace(); - } - response = IOUtils.newStringFromBytes(bytes); - } else { - StringReader reader = (StringReader)message.getContent(Reader.class); - char[] buffer = new char[5000]; - try { - int i = reader.read(buffer); - response = new String(buffer, 0, i); - } catch (IOException e) { - assertFalse("Read the Destination recieved Message error ", false); - e.printStackTrace(); - } - } - assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response); - } - private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) { JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); @@ -188,27 +142,6 @@ public class JMSDestinationTest extends AbstractJMSTester { .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); verifyJmsHeaderEquality(outHeader, inHeader); - - } - - private void verifyHeaders(Message msgIn, Message msgOut) { - JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut - .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - - JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn - .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - - verifyJmsHeaderEquality(outHeader, inHeader); - - } - - private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) { - assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader - .getJMSPriority(), inHeader.getJMSPriority()); - assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader - .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode()); - assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader - .getJMSType(), inHeader.getJMSType()); } @Test @@ -233,8 +166,7 @@ public class JMSDestinationTest extends AbstractJMSTester { JMSConduit conduit = setupJMSConduitWithObserver(ei); conduit.getJmsConfig().setCreateSecurityContext(createSecurityContext); - final Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage, null); + final Message outMessage = createMessage(); final JMSDestination destination = setupJMSDestination(ei); @@ -263,19 +195,17 @@ public class JMSDestinationTest extends AbstractJMSTester { // wait for the message to be got from the destination, // create the thread to handler the Destination incoming message - waitForReceiveInMessage(); - verifyReceivedMessage(inMessage); + verifyReceivedMessage(waitForReceiveInMessage()); // wait for a while for the jms session recycling - inMessage = null; // Send a second message to check for an issue // Where the session was closed the second time sendMessageSync(conduit, outMessage); - waitForReceiveInMessage(); + Message inMessage = waitForReceiveInMessage(); verifyReceivedMessage(inMessage); // wait for a while for the jms session recycling - Thread.sleep(1000); +// Thread.sleep(1000L); conduit.close(); destination.shutdown(); @@ -289,8 +219,7 @@ public class JMSDestinationTest extends AbstractJMSTester { // set up the conduit send to be true JMSConduit conduit = setupJMSConduitWithObserver(ei); - final Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage, null); + final Message outMessage = createMessage(); JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); @@ -325,7 +254,7 @@ public class JMSDestinationTest extends AbstractJMSTester { // wait for the message to be got from the destination, // create the thread to handler the Destination incoming message - waitForReceiveInMessage(); + Message inMessage = waitForReceiveInMessage(); verifyReceivedMessage(inMessage); verifyRequestResponseHeaders(inMessage, outMessage); @@ -336,7 +265,7 @@ public class JMSDestinationTest extends AbstractJMSTester { // TODO we need to check the SOAP JMS transport properties here // wait for a while for the jms session recycling - Thread.sleep(1000); +// Thread.sleep(1000L); conduit.close(); destination.shutdown(); } @@ -371,10 +300,9 @@ public class JMSDestinationTest extends AbstractJMSTester { destination.setMessageObserver(createMessageObserver()); // set up the conduit send to be true JMSConduit conduit = setupJMSConduitWithObserver(ei); - final Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage, null); + final Message outMessage = createMessage(); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + Message destMessage = waitForReceiveDestMessage(); SecurityContext securityContext = destMessage.get(SecurityContext.class); conduit.close(); @@ -390,30 +318,26 @@ public class JMSDestinationTest extends AbstractJMSTester { /* 1. Test that replyTo destination set in WSDL is NOT used * in spec compliant mode */ - destMessage = null; EndpointInfo ei = setupServiceInfo( "HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort"); JMSConduit conduit = setupJMSConduitWithObserver(ei); - Message outMessage = new MessageImpl(); - setupMessageHeader(outMessage); + Message outMessage = createMessage(); JMSDestination destination = setupJMSDestination(ei); destination.setMessageObserver(createMessageObserver()); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + Message destMessage = waitForReceiveDestMessage(); // just verify the Destination inMessage assertNotNull("The destination should have got the message ", destMessage); verifyReplyToNotSet(destMessage); - destMessage = null; /* 2. Test that replyTo destination set in WSDL IS used * in spec non-compliant mode */ sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + destMessage = waitForReceiveDestMessage(); assertNotNull("The destination should have got the message ", destMessage); String exName = getQueueName(conduit.getJmsConfig().getReplyDestination()); verifyReplyToSet(destMessage, Queue.class, exName); - destMessage = null; /* 3. Test that replyTo destination provided via invocation context * overrides the value set in WSDL and IS used in spec non-compliant mode */ @@ -422,34 +346,31 @@ public class JMSDestinationTest extends AbstractJMSTester { exName += ".context"; setupMessageHeader(outMessage, "cidValue", contextReplyTo); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + destMessage = waitForReceiveDestMessage(); assertNotNull("The destiantion should have got the message ", destMessage); verifyReplyToSet(destMessage, Queue.class, exName); - destMessage = null; /* 4. Test that replyTo destination provided via invocation context * and the value set in WSDL are NOT used in spec non-compliant mode * when JMSConstants.JMS_SET_REPLY_TO == false */ - setupMessageHeader(outMessage); + setupMessageHeader(outMessage, null, null); outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + destMessage = waitForReceiveDestMessage(); assertNotNull("The destiantion should have got the message ", destMessage); verifyReplyToNotSet(destMessage); - destMessage = null; /* 5. Test that replyTo destination set in WSDL IS used in spec non-compliant * mode when JMSConstants.JMS_SET_REPLY_TO == true */ - setupMessageHeader(outMessage); + setupMessageHeader(outMessage, null, null); outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE); sendOneWayMessage(conduit, outMessage); - waitForReceiveDestMessage(); + destMessage = waitForReceiveDestMessage(); assertNotNull("The destiantion should have got the message ", destMessage); exName = getQueueName(conduit.getJmsConfig().getReplyDestination()); verifyReplyToSet(destMessage, Queue.class, exName); - destMessage = null; conduit.close(); destination.shutdown(); diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java index 9d3e37a..3df49fb 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java @@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms; import javax.jms.ConnectionFactory; import org.apache.activemq.pool.PooledConnectionFactory; -import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; @@ -30,9 +29,11 @@ import org.apache.cxf.message.MessageImpl; import org.apache.cxf.transport.jms.util.TestReceiver; import org.apache.cxf.ws.addressing.EndpointReferenceType; -import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + /** * Checks if a CXF client works correlates requests and responses correctly if the server sets the message id * as correlation id on the response message @@ -40,7 +41,8 @@ import org.junit.Test; public class MessageIdAsCorrelationIdJMSConduitTest { private static final String SERVICE_QUEUE = "test"; private static final String BROKER_URI = "vm://localhost?broker.persistent=false"; - private ConnectionFactory connectionFactory; + + private ConnectionFactory connectionFactory = new PooledConnectionFactory(BROKER_URI); @Test public void testSendReceiveWithTempReplyQueue() throws Exception { @@ -52,13 +54,9 @@ public class MessageIdAsCorrelationIdJMSConduitTest { sendAndReceive(true, "testreply"); } - public void sendAndReceive(boolean synchronous, String replyDestination) throws Exception { - BusFactory bf = BusFactory.newInstance(); - Bus bus = bf.createBus(); - BusFactory.setDefaultBus(bus); + private void sendAndReceive(boolean synchronous, String replyDestination) throws InterruptedException { EndpointReferenceType target = new EndpointReferenceType(); - connectionFactory = new PooledConnectionFactory(BROKER_URI); TestReceiver receiver = new TestReceiver(connectionFactory, SERVICE_QUEUE, true); receiver.runAsync(); @@ -68,7 +66,7 @@ public class MessageIdAsCorrelationIdJMSConduitTest { jmsConfig.setUseConduitIdSelector(false); jmsConfig.setReplyDestination(replyDestination); - JMSConduit conduit = new JMSConduit(target, jmsConfig, bus); + JMSConduit conduit = new JMSConduit(target, jmsConfig, BusFactory.getDefaultBus()); Exchange exchange = new ExchangeImpl(); exchange.setSynchronous(synchronous); Message message = new MessageImpl(); @@ -76,21 +74,16 @@ public class MessageIdAsCorrelationIdJMSConduitTest { conduit.sendExchange(exchange, "Request"); waitForAsyncReply(exchange); receiver.close(); - if (exchange.getInMessage() == null) { - throw new RuntimeException("No reply received within 2 seconds"); - } + assertNotNull("No reply received within 2 seconds", exchange.getInMessage()); JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage() .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID()); + assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID()); conduit.close(); - bus.shutdown(true); } - private void waitForAsyncReply(Exchange exchange) throws InterruptedException { - int count = 0; - while (exchange.getInMessage() == null && count <= 20) { - Thread.sleep(100); - count++; + private static void waitForAsyncReply(Exchange exchange) throws InterruptedException { + for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) { + Thread.sleep(100L); } } diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java index a60ef7a..8cccc26 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java @@ -18,8 +18,6 @@ */ package org.apache.cxf.transport.jms; -import java.util.concurrent.Executors; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; @@ -32,16 +30,17 @@ import javax.jms.TextMessage; import org.apache.activemq.pool.PooledConnectionFactory; -import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertNotNull; + public class PooledConnectionTempQueueTest { protected static final String SERVICE_QUEUE = "queue1"; @Test public void testTempQueueIssue() throws JMSException, InterruptedException { - final PooledConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false"); + final ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false"); Connection con1 = cf.createConnection(); con1.start(); @@ -49,20 +48,18 @@ public class PooledConnectionTempQueueTest { // This order seems to matter to reproduce the issue con1.close(); - Executors.newSingleThreadExecutor().execute(new Runnable() { - public void run() { - try { - receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE); - } catch (JMSException e) { - e.printStackTrace(); - } + new Thread(() -> { + try { + receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE); + } catch (Exception e) { + e.printStackTrace(); } - }); + }).start(); sendWithReplyToTemp(cf, SERVICE_QUEUE); } - private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, + private static void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException { Connection con = cf.createConnection(); con.start(); @@ -74,11 +71,11 @@ public class PooledConnectionTempQueueTest { producer.send(msg); // This sleep also seems to matter - Thread.sleep(500); + Thread.sleep(500L); MessageConsumer consumer = session.createConsumer(tempQueue); Message replyMsg = consumer.receive(); - Assert.assertNotNull(replyMsg); + assertNotNull(replyMsg); //System.out.println(replyMsg.getJMSCorrelationID()); consumer.close(); @@ -88,7 +85,7 @@ public class PooledConnectionTempQueueTest { con.close(); } - public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, + public static void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, String queueName) throws JMSException { Connection con = connectionFactory.createConnection(); Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java index ab77644..053e78e 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java @@ -21,8 +21,6 @@ package org.apache.cxf.transport.jms; import java.io.IOException; -import javax.jms.DeliveryMode; - import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; @@ -33,36 +31,8 @@ import org.apache.cxf.transport.MessageObserver; import org.junit.Test; -import static org.junit.Assert.assertEquals; - public class RequestResponseTest extends AbstractJMSTester { - private void verifyReceivedMessage(Message message) { - String response = getContent(message); - assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response); - } - - private void verifyHeaders(Message msgIn, Message msgOut) { - JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut - .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - - JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn - .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - - verifyJmsHeaderEquality(outHeader, inHeader); - - } - - private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) { - assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader - .getJMSPriority(), inHeader.getJMSPriority()); - assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader - .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode()); - assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader - .getJMSType(), inHeader.getJMSType()); - } - - @Test public void testRequestQueueResponseTempQueue() throws Exception { EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl", @@ -94,19 +64,8 @@ public class RequestResponseTest extends AbstractJMSTester { sendAndReceiveMessages(ei, false); } - private Message createMessage() { - Message outMessage = new MessageImpl(); - JMSMessageHeadersType header = new JMSMessageHeadersType(); - header.setJMSDeliveryMode(DeliveryMode.PERSISTENT); - header.setJMSPriority(1); - header.setTimeToLive(1000); - outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header); - outMessage.put(Message.ENCODING, "US-ASCII"); - return outMessage; - } - - protected void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous) throws IOException { - inMessage = null; + private void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous) + throws IOException, InterruptedException { // set up the conduit send to be true JMSConduit conduit = setupJMSConduitWithObserver(ei); final Message outMessage = createMessage(); @@ -120,9 +79,8 @@ public class RequestResponseTest extends AbstractJMSTester { verifyReceivedMessage(m); verifyHeaders(m, outMessage); // setup the message for - Conduit backConduit; try { - backConduit = destination.getBackChannel(m); + Conduit backConduit = destination.getBackChannel(m); // wait for the message to be got from the conduit Message replyMessage = new MessageImpl(); sendOneWayMessage(backConduit, replyMessage); @@ -138,13 +96,11 @@ public class RequestResponseTest extends AbstractJMSTester { // wait for the message to be got from the destination, // create the thread to handler the Destination incoming message - waitForReceiveInMessage(); - verifyReceivedMessage(inMessage); + verifyReceivedMessage(waitForReceiveInMessage()); } finally { conduit.close(); destination.shutdown(); } } - -} \ No newline at end of file +} diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java index b97a265..d9f9e9c 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java @@ -36,10 +36,12 @@ import org.apache.cxf.transport.jms.JMSMessageHeadersType; import org.apache.cxf.transport.jms.util.TestReceiver; import org.apache.cxf.ws.addressing.EndpointReferenceType; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + /** * Checks if a CXF client works correlates requests and responses correctly if the server sets the message id * as correlation id on the response message @@ -102,20 +104,16 @@ public class URIConfiguredConduitTest { waitForAsyncReply(exchange); receiver.close(); - if (exchange.getInMessage() == null) { - throw new RuntimeException("No reply received within 2 seconds"); - } + assertNotNull("No reply received within 2 seconds", exchange.getInMessage()); JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage() .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID()); + assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID()); conduit.close(); } - private void waitForAsyncReply(Exchange exchange) throws InterruptedException { - int count = 0; - while (exchange.getInMessage() == null && count <= 20) { - Thread.sleep(100); - count++; + private static void waitForAsyncReply(Exchange exchange) throws InterruptedException { + for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) { + Thread.sleep(100L); } } diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index ccdb450..937f0b5 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -40,7 +40,6 @@ import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.awaitility.Awaitility; import org.easymock.Capture; -import org.junit.Assert; import org.junit.Test; @@ -50,6 +49,7 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.newCapture; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; public class MessageListenerTest { @@ -77,7 +77,7 @@ public class MessageListenerTest { Awaitility.await().until(() -> !container.isRunning()); verify(exListener); JMSException ex = captured.getValue(); - Assert.assertEquals("The connection is already closed", ex.getMessage()); + assertEquals("The connection is already closed", ex.getMessage()); } @Test @@ -106,7 +106,7 @@ public class MessageListenerTest { verify(exListener); JMSException ex = captured.getValue(); // Closing the pooled connection will result in a NPE when using it - Assert.assertEquals("Wrapped exception. null", ex.getMessage()); + assertEquals("Wrapped exception. null", ex.getMessage()); } @Test @@ -147,14 +147,14 @@ public class MessageListenerTest { container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); container.start(); - assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0); + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L); sendMessage(connection, dest, OK); - assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L); sendMessage(connection, dest, FAIL); assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection, - dest, 0, 1000); + dest, 0, 1000L); container.stop(); connection.close(); @@ -178,17 +178,17 @@ public class MessageListenerTest { private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException, InterruptedException { Queue dlq = JMSUtil.createQueue(connection, "ActiveMQ.DLQ"); - assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0); - assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0); + assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L); + assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0L); sendMessage(connection, dest, OK); - assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000); + assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L); sendMessage(connection, dest, FAILFIRST); - assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000); + assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000L); sendMessage(connection, dest, FAIL); - assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500); + assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500L); } private Connection createConnection(String name) throws JMSException { @@ -227,25 +227,27 @@ public class MessageListenerTest { } consumer.close(); session.close(); - assertNumMessagesInQueue("", connection, dest, 0, 0); + assertNumMessagesInQueue("", connection, dest, 0, 0L); } - private void assertNumMessagesInQueue(String message, Connection connection, Queue queue, - int expectedNum, int timeout) throws JMSException, + private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue, + int expectedNum, long timeout) throws JMSException, InterruptedException { long startTime = System.currentTimeMillis(); int actualNum; do { actualNum = JMSUtil.getNumMessages(connection, queue); - + if (actualNum == expectedNum) { + break; + } //System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum // + ", expecting: " + expectedNum); - Thread.sleep(100); + Thread.sleep(100L); } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum); - Assert.assertEquals(message + " -> number of messages on queue", expectedNum, actualNum); + assertEquals(message + " -> number of messages on queue", expectedNum, actualNum); } - private void sendMessage(Connection connection, Destination dest, String content) throws JMSException, + private static void sendMessage(Connection connection, Destination dest, String content) throws JMSException, InterruptedException { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod = session.createProducer(dest); @@ -253,7 +255,7 @@ public class MessageListenerTest { prod.send(message); prod.close(); session.close(); - Thread.sleep(500); // Give receiver some time to process +// Thread.sleep(500L); // Give receiver some time to process } private static final class TestMessageListener implements MessageListener {