This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new d40f53e  CXF-8591: Temporary queues are never deleted when the are 
used. Added a test case
d40f53e is described below

commit d40f53ea97eea476f0c6af30f352b3e9f0bb5903
Author: Andriy Redko <[email protected]>
AuthorDate: Thu Sep 2 11:13:04 2021 -0400

    CXF-8591: Temporary queues are never deleted when the are used. Added a 
test case
---
 .../cxf/transport/jms/JMSDestinationTest.java      | 73 ++++++++++++++++++++++
 1 file changed, 73 insertions(+)

diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 597d1bd..e92a4a4 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.jms;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import javax.jms.Connection;
@@ -38,6 +39,8 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.EnhancedConnection;
+import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -53,9 +56,12 @@ import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JMSDestinationTest extends AbstractJMSTester {
@@ -383,6 +389,73 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test
+    public void testTemporaryQueueDeletionUponReset() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldService", 
"HelloWorldPort");
+
+        // set up the conduit send to be true
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        assertNull(conduit.getJmsConfig().getReplyDestination());
+
+        // Store the connection so we could check temporary queues
+        final AtomicReference<DestinationSource> destinationSource = new 
AtomicReference<>();
+        final Message outMessage = createMessage();
+        
+        // Capture the DestinationSource instance associated with the 
connection
+        final JMSDestination destination = setupJMSDestination(ei, c -> new 
ConnectionFactory() {
+            @Override
+            public Connection createConnection() throws JMSException {
+                final Connection connection = c.createConnection();
+                
destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+
+            @Override
+            public Connection createConnection(String userName, String 
password) throws JMSException {
+                final Connection connection = c.createConnection(userName, 
password);
+                
destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+        });
+
+        // set up MessageObserver for handling the conduit message
+        final MessageObserver observer = new MessageObserver() {
+            public void onMessage(Message m) {
+                final Exchange exchange = new ExchangeImpl();
+                exchange.setInMessage(m);
+                m.setExchange(exchange);
+
+                try {
+                    final Conduit backConduit = destination.getBackChannel(m);
+                    sendOneWayMessage(backConduit, new MessageImpl());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        
+        destination.setMessageObserver(observer);
+        sendMessageSync(conduit, outMessage);
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+
+        Message inMessage = waitForReceiveInMessage();
+        verifyReceivedMessage(inMessage);
+
+        final DestinationSource ds = destinationSource.get();
+        assertThat(ds.getTemporaryQueues().size(), equalTo(1));
+        
+        // Force manual temporary queue deletion by resetting the reply 
destination
+        conduit.getJmsConfig().resetCachedReplyDestination();
+        // The queue deletion events (as well as others) are propagated 
asynchronously
+        await()
+            .atMost(1, TimeUnit.SECONDS)
+            .untilAsserted(() -> assertThat(ds.getTemporaryQueues().size(), 
equalTo(0)));
+        
+        conduit.close();
+        destination.shutdown();
+    }
 
     @Test
     public void testIsMultiplexCapable() throws Exception {

Reply via email to