This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new ecb77fe40a QPID-8676 - [Broker-J] NPE when detaching endpoint (#246)
ecb77fe40a is described below

commit ecb77fe40a8365a225b9d0aad7471bf74247a34e
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Mon Sep 30 09:59:20 2024 +0200

    QPID-8676 - [Broker-J] NPE when detaching endpoint (#246)
---
 .../apache/qpid/server/util/CollectionUtils.java   |  50 +++
 .../v1_0/TxnCoordinatorReceivingLinkEndpoint.java  |   4 +-
 .../TxnCoordinatorReceivingLinkEndpointTest.java   | 339 +++++++++++++++++++++
 3 files changed, 391 insertions(+), 2 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java 
b/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java
new file mode 100644
index 0000000000..fb96b5fb53
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/util/CollectionUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Collection utilities
+ */
+public final class CollectionUtils
+{
+    /** Utility class shouldn't be instantiated directly */
+    private CollectionUtils()
+    {
+
+    }
+
+    /**
+     * Returns a fixed-size list backed by the specified array. When array is 
null, returns an empty list
+     * @param array Array of elements
+     * @return List of elements
+     * @param <T> Element type
+     */
+    public static <T> List<T> nullSafeList(final T[] array)
+    {
+        if (array == null)
+        {
+            return new ArrayList<>();
+        }
+        return Arrays.asList(array);
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 307201023b..c87b00a389 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -19,7 +19,6 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +49,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.CollectionUtils;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
@@ -121,7 +121,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
                             {
                                 outcome = new Accepted();
                             }
-                            else if 
(Arrays.asList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL))
+                            else if 
(CollectionUtils.nullSafeList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL))
                             {
                                 final Rejected rejected = new Rejected();
                                 rejected.setError(error);
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java
new file mode 100644
index 0000000000..b305824a39
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpointTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations;
+import 
org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import 
org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
+class TxnCoordinatorReceivingLinkEndpointTest
+{
+    private final AMQPConnection_1_0<?> _connection = 
mock(AMQPConnection_1_0.class);
+    private final Session_1_0 _session = mock(Session_1_0.class);
+    private final ServerTransaction serverTransaction = 
mock(ServerTransaction.class);
+    private final IdentifiedTransaction identifiedTransaction = 
mock(IdentifiedTransaction.class);
+
+    private final AMQPDescribedTypeRegistry _amqpDescribedTypeRegistry = 
AMQPDescribedTypeRegistry.newInstance()
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer()
+            .registerExtensionSoleconnLayer();
+
+    @BeforeEach
+    void beforeEach()
+    {
+        when(identifiedTransaction.getId()).thenReturn(1);
+        
when(identifiedTransaction.getServerTransaction()).thenReturn(serverTransaction);
+
+        
when(_connection.getDescribedTypeRegistry()).thenReturn(_amqpDescribedTypeRegistry);
+        
when(_connection.createIdentifiedTransaction()).thenReturn(identifiedTransaction);
+
+        doReturn(_connection).when(_session).getConnection();
+        when(_session.getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD))
+                .thenReturn(30_000L);
+    }
+
+    @Test
+    void receiveDeliveryDischargeOutcomeAccepted()
+    {
+        final Binary declareDeliveryTag = new Binary(new byte[] { (byte) 0 });
+        final QpidByteBuffer declareQpidByteBuffer = declareMessage();
+        final Delivery declareDelivery = mock(Delivery.class);
+        when(declareDelivery.getDeliveryTag()).thenReturn(declareDeliveryTag);
+        when(declareDelivery.getPayload()).thenReturn(declareQpidByteBuffer);
+
+        final Binary dischargeDeliveryTag = new Binary(new byte[] { (byte) 1 
});
+        final QpidByteBuffer dischargeQpidByteBuffer = dischargeMessage();
+        final Delivery dischargeDelivery = mock(Delivery.class);
+        
when(dischargeDelivery.getDeliveryTag()).thenReturn(dischargeDeliveryTag);
+        
when(dischargeDelivery.getPayload()).thenReturn(dischargeQpidByteBuffer);
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(null);
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link));
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final Error declareError = 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(declareDelivery);
+        final Error dischargeError = 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(dischargeDelivery);
+
+        verify(txnCoordinatorReceivingLinkEndpoint, times(1))
+                .updateDispositions(anySet(), any(Declared.class), 
anyBoolean());
+        verify(txnCoordinatorReceivingLinkEndpoint, times(1))
+                .updateDispositions(anySet(), any(Accepted.class), 
anyBoolean());
+        assertNull(declareError);
+        assertNull(dischargeError);
+    }
+
+    @Test
+    void receiveDeliveryDischargeOutcomeNull()
+    {
+        final QpidByteBuffer qpidByteBuffer = dischargeMessage();
+        final Delivery delivery = mock(Delivery.class);
+        when(delivery.getPayload()).thenReturn(qpidByteBuffer);
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(null);
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link));
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final Error error = 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery);
+
+        verify(txnCoordinatorReceivingLinkEndpoint, times(0))
+                .updateDispositions(anySet(), any(), anyBoolean());
+        assertEquals("unknown-id", error.getCondition().toString());
+    }
+
+    @Test
+    void receiveDeliveryDischargeOutcomeRejected()
+    {
+        final QpidByteBuffer qpidByteBuffer = dischargeMessage();
+        final Delivery delivery = mock(Delivery.class);
+        when(delivery.getPayload()).thenReturn(qpidByteBuffer);
+        when(delivery.getDeliveryTag()).thenReturn(new 
Binary("1".getBytes(StandardCharsets.UTF_8)));
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(new Symbol[] { 
Rejected.REJECTED_SYMBOL });
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                spy(new TxnCoordinatorReceivingLinkEndpoint(_session, link));
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final Error error = 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery);
+
+        verify(txnCoordinatorReceivingLinkEndpoint, times(1))
+                .updateDispositions(anySet(), any(Rejected.class), 
anyBoolean());
+        verify(link, times(1)).getSource();
+        verify(source, times(1)).getOutcomes();
+
+        assertNull(error);
+    }
+
+    @Test
+    void amqpValueSectionNotFound()
+    {
+        final QpidByteBuffer qpidByteBuffer = emptyMessage();
+        final Delivery delivery = mock(Delivery.class);
+        when(delivery.getPayload()).thenReturn(qpidByteBuffer);
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(new Symbol[] { 
Rejected.REJECTED_SYMBOL });
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                new TxnCoordinatorReceivingLinkEndpoint(_session, link);
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final ConnectionScopedRuntimeException exception = 
assertThrows(ConnectionScopedRuntimeException.class,
+                () -> 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery));
+
+        assertEquals("Received no AmqpValue section", exception.getMessage());
+    }
+
+    @Test
+    void invalidMessage()
+    {
+        final QpidByteBuffer qpidByteBuffer = 
QpidByteBuffer.allocateDirect(1000);
+        final Delivery delivery = mock(Delivery.class);
+        when(delivery.getPayload()).thenReturn(qpidByteBuffer);
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(new Symbol[] { 
Rejected.REJECTED_SYMBOL });
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                new TxnCoordinatorReceivingLinkEndpoint(_session, link);
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final Error error = 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery);
+
+        assertEquals("decode-error", error.getCondition().toString());
+    }
+
+    @Test
+    void unknownCommand()
+    {
+        final QpidByteBuffer qpidByteBuffer = coordinatorMessage();
+        final Delivery delivery = mock(Delivery.class);
+        when(delivery.getPayload()).thenReturn(qpidByteBuffer);
+
+        final Source source = mock(Source.class);
+        when(source.getOutcomes()).thenReturn(new Symbol[] { 
Rejected.REJECTED_SYMBOL });
+
+        final Link_1_0<Source, Coordinator> link = mock(Link_1_0.class);
+        when(link.getSource()).thenReturn(source);
+
+        final TxnCoordinatorReceivingLinkEndpoint 
txnCoordinatorReceivingLinkEndpoint =
+                new TxnCoordinatorReceivingLinkEndpoint(_session, link);
+        txnCoordinatorReceivingLinkEndpoint.start();
+
+        final ConnectionScopedRuntimeException exception = 
assertThrows(ConnectionScopedRuntimeException.class,
+                () -> 
txnCoordinatorReceivingLinkEndpoint.receiveDelivery(delivery));
+
+        assertEquals("Received unknown command 'Coordinator'", 
exception.getMessage());
+    }
+
+    private HeaderSection header()
+    {
+        final Header header = new Header();
+        header.setTtl(UnsignedInteger.valueOf(10000L));
+        return header.createEncodingRetainingSection();
+    }
+
+    private DeliveryAnnotationsSection deliveryAnnotations()
+    {
+        final Map<Symbol, Object> annotationMap = 
Map.of(Symbol.valueOf("foo"), "bar");
+        final DeliveryAnnotations annotations = new 
DeliveryAnnotations(annotationMap);
+        return annotations.createEncodingRetainingSection();
+    }
+
+    private QpidByteBuffer declareMessage()
+    {
+        final List<QpidByteBuffer> payloads = new ArrayList<>();
+        try
+        {
+            add(payloads, header());
+            add(payloads, deliveryAnnotations());
+            add(payloads, new AmqpValue(new 
Declare()).createEncodingRetainingSection());
+            return QpidByteBuffer.concatenate(payloads);
+        }
+        finally
+        {
+            payloads.forEach(QpidByteBuffer::dispose);
+        }
+    }
+
+    private QpidByteBuffer dischargeMessage()
+    {
+        final List<QpidByteBuffer> payloads = new ArrayList<>();
+        try
+        {
+            add(payloads, header());
+            add(payloads, deliveryAnnotations());
+            final Discharge discharge = new Discharge();
+            discharge.setTxnId(new Binary(new byte[] { (byte) 1 }));
+            add(payloads, new 
AmqpValue(discharge).createEncodingRetainingSection());
+            return QpidByteBuffer.concatenate(payloads);
+        }
+        finally
+        {
+            payloads.forEach(QpidByteBuffer::dispose);
+        }
+    }
+
+    private QpidByteBuffer emptyMessage()
+    {
+        final List<QpidByteBuffer> payloads = new ArrayList<>();
+        try
+        {
+            add(payloads, header());
+            add(payloads, deliveryAnnotations());
+            return QpidByteBuffer.concatenate(payloads);
+        }
+        finally
+        {
+            payloads.forEach(QpidByteBuffer::dispose);
+        }
+    }
+
+    private QpidByteBuffer coordinatorMessage()
+    {
+        final List<QpidByteBuffer> payloads = new ArrayList<>();
+        try
+        {
+            add(payloads, header());
+            add(payloads, deliveryAnnotations());
+            add(payloads, new AmqpValue(new 
Coordinator()).createEncodingRetainingSection());
+            return QpidByteBuffer.concatenate(payloads);
+        }
+        finally
+        {
+            payloads.forEach(QpidByteBuffer::dispose);
+        }
+    }
+
+    private void add(final List<QpidByteBuffer> payloads, final 
EncodingRetainingSection<?> section)
+    {
+        try
+        {
+            payloads.add(section.getEncodedForm());
+        }
+        finally
+        {
+            section.dispose();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to