This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 7608acbc07 QPID-8680 - [Broker-J] Broker should interpret the value of 
qpid.port.heartbeatDelay as half of the actual idle timeout (#254)
7608acbc07 is described below

commit 7608acbc078f25cab5d8d1cdbe65e6a6f220aea6
Author: Daniil Kirilyuk <daniel.kiril...@gmail.com>
AuthorDate: Tue Feb 18 08:14:36 2025 +0100

    QPID-8680 - [Broker-J] Broker should interpret the value of 
qpid.port.heartbeatDelay as half of the actual idle timeout (#254)
---
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      |  2 +-
 .../protocol/v1_0/AMQPConnection_1_0ImplTest.java  | 56 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 2bd689ef6f..d309048cd7 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -897,7 +897,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         }
         else
         {
-            initialiseHeartbeating(_outgoingIdleTimeout / 2L, 
_incomingIdleTimeout);
+            initialiseHeartbeating(_outgoingIdleTimeout / 2L, 
_incomingIdleTimeout * 2);
             final NamedAddressSpace addressSpace = 
getPort().getAddressSpace(_localHostname);
             if (addressSpace == null)
             {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
index 47e4975018..a67c76de66 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
@@ -25,12 +25,16 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Field;
 import java.net.SocketAddress;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -41,7 +45,10 @@ import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Model;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -67,7 +74,7 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase
         when(_port.getModel()).thenReturn(model);
         when(_port.getTaskExecutor()).thenReturn(taskExecutor);
         when(_port.getChildExecutor()).thenReturn(taskExecutor);
-        _aggregateTicket = mock(AggregateTicker.class);
+        _aggregateTicket = new AggregateTicker();
         _virtualHost = BrokerTestHelper.createVirtualHost("test", _broker, 
true, this);
     }
 
@@ -176,4 +183,51 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase
         assertEquals(0L, statisticsAfterReset.get("transactedMessagesIn"));
         assertEquals(0L, statisticsAfterReset.get("transactedMessagesOut"));
     }
+
+    @Test
+    void heartbeat() throws Exception
+    {
+        final int writeDelay = 5000;
+        final int readDelay = 12;
+        final AtomicLong maxReadIdleMillis = new AtomicLong();
+        final AtomicLong maxWriteIdleMillis = new AtomicLong();
+
+        when(_broker.getNetworkBufferSize()).thenReturn(1024);
+
+        final ByteBufferSender sender = mock(ByteBufferSender.class);
+        when(sender.isDirectBufferPreferred()).thenReturn(true);
+        when(_network.getSender()).thenReturn(sender);
+        doAnswer(invocation ->
+        {
+            maxReadIdleMillis.set(invocation.getArgument(0));
+            return null;
+        }).when(_network).setMaxReadIdleMillis(anyLong());
+        doAnswer(invocation ->
+        {
+            maxWriteIdleMillis.set(invocation.getArgument(0));
+            return null;
+        }).when(_network).setMaxWriteIdleMillis(anyLong());
+
+        when(_port.getNetworkBufferSize()).thenReturn(1024);
+        when(_port.getHeartbeatDelay()).thenReturn(readDelay);
+        when(_port.getAddressSpace("localhost")).thenReturn(_virtualHost);
+
+        final AMQPConnection_1_0Impl connection =
+                new AMQPConnection_1_0Impl(_broker, _network, _port, 
Transport.TCP, 0, _aggregateTicket);
+        connection.setAddressSpace(_virtualHost);
+
+        final Field field = 
AMQPConnection_1_0Impl.class.getDeclaredField("_connectionState");
+        field.setAccessible(true);
+        field.set(connection, ConnectionState.AWAIT_OPEN);
+
+        final Open open = mock(Open.class);
+        when(open.getContainerId()).thenReturn("container");
+        when(open.getHostname()).thenReturn("localhost");
+        
when(open.getIdleTimeOut()).thenReturn(UnsignedInteger.valueOf(writeDelay));
+
+        connection.receiveOpen(1, open);
+
+        assertEquals(readDelay * 2 * 1000, maxReadIdleMillis.get());
+        assertEquals(writeDelay / 2, maxWriteIdleMillis.get());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to