This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f3c334b95 QPID-8640: [Broker-J] Remove producer on link destruction 
(#179)
9f3c334b95 is described below

commit 9f3c334b9551fe2edc43b6bb387843f8d6c88e5c
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue May 9 11:37:14 2023 +0200

    QPID-8640: [Broker-J] Remove producer on link destruction (#179)
---
 .../v1_0/StandardReceivingLinkEndpoint.java        |  3 +-
 .../v1_0/StandardReceivingLinkEndpointTest.java    | 78 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 73b2c86dc2..a17385f0eb 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -518,8 +518,9 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
     public void destroy()
     {
         super.destroy();
-        if(_receivingDestination != null && 
_receivingDestination.getMessageDestination() != null)
+        if (_receivingDestination != null && 
_receivingDestination.getMessageDestination() != null)
         {
+            getSession().removeProducer(_publishingLink);
             
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
             _receivingDestination = null;
         }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
new file mode 100644
index 0000000000..751b13b031
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+@SuppressWarnings({"unchecked"})
+public class StandardReceivingLinkEndpointTest extends UnitTestBase
+{
+    @Test
+    public void linkAddedAndRemoved()
+    {
+        final SectionDecoderRegistry sectionDecoderRegistry = 
mock(SectionDecoderRegistry.class);
+
+        final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = 
mock(AMQPDescribedTypeRegistry.class);
+        
doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry();
+
+        final AMQPConnection_1_0<?> connection = 
mock(AMQPConnection_1_0.class);
+        
doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry();
+
+        final Session_1_0 session = mock(Session_1_0.class);
+        doReturn(connection).when(session).getConnection();
+
+        final Link_1_0<Source, Target> link = mock(Link_1_0.class);
+        doReturn("test-link").when(link).getName();
+
+        final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint =
+                new StandardReceivingLinkEndpoint(session, link);
+
+        final Exchange<?> exchange = mock(Exchange.class);
+
+        final ReceivingDestination receivingDestination = 
mock(ReceivingDestination.class);
+        doReturn(exchange).when(receivingDestination).getMessageDestination();
+
+        standardReceivingLinkEndpoint.setDestination(receivingDestination);
+
+        verify(session).addProducer(any(PublishingLink.class), eq(exchange));
+        verify(exchange).linkAdded(any(MessageSender.class), 
any(PublishingLink.class));
+
+        standardReceivingLinkEndpoint.destroy();
+
+        verify(session).removeProducer(any(PublishingLink.class));
+        verify(exchange).linkRemoved(any(MessageSender.class), 
any(PublishingLink.class));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to