This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 7608acbc07 QPID-8680 - [Broker-J] Broker should interpret the value of qpid.port.heartbeatDelay as half of the actual idle timeout (#254) 7608acbc07 is described below commit 7608acbc078f25cab5d8d1cdbe65e6a6f220aea6 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Tue Feb 18 08:14:36 2025 +0100 QPID-8680 - [Broker-J] Broker should interpret the value of qpid.port.heartbeatDelay as half of the actual idle timeout (#254) --- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 2 +- .../protocol/v1_0/AMQPConnection_1_0ImplTest.java | 56 +++++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 2bd689ef6f..d309048cd7 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -897,7 +897,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } else { - initialiseHeartbeating(_outgoingIdleTimeout / 2L, _incomingIdleTimeout); + initialiseHeartbeating(_outgoingIdleTimeout / 2L, _incomingIdleTimeout * 2); final NamedAddressSpace addressSpace = getPort().getAddressSpace(_localHostname); if (addressSpace == null) { diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java index 47e4975018..a67c76de66 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java @@ -25,12 +25,16 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.lang.reflect.Field; import java.net.SocketAddress; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,7 +45,10 @@ import org.apache.qpid.server.model.BrokerTestHelper; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.transport.AggregateTicker; +import org.apache.qpid.server.transport.ByteBufferSender; import org.apache.qpid.server.transport.ServerNetworkConnection; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; @@ -67,7 +74,7 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase when(_port.getModel()).thenReturn(model); when(_port.getTaskExecutor()).thenReturn(taskExecutor); when(_port.getChildExecutor()).thenReturn(taskExecutor); - _aggregateTicket = mock(AggregateTicker.class); + _aggregateTicket = new AggregateTicker(); _virtualHost = BrokerTestHelper.createVirtualHost("test", _broker, true, this); } @@ -176,4 +183,51 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase assertEquals(0L, statisticsAfterReset.get("transactedMessagesIn")); assertEquals(0L, statisticsAfterReset.get("transactedMessagesOut")); } + + @Test + void heartbeat() throws Exception + { + final int writeDelay = 5000; + final int readDelay = 12; + final AtomicLong maxReadIdleMillis = new AtomicLong(); + final AtomicLong maxWriteIdleMillis = new AtomicLong(); + + when(_broker.getNetworkBufferSize()).thenReturn(1024); + + final ByteBufferSender sender = mock(ByteBufferSender.class); + when(sender.isDirectBufferPreferred()).thenReturn(true); + when(_network.getSender()).thenReturn(sender); + doAnswer(invocation -> + { + maxReadIdleMillis.set(invocation.getArgument(0)); + return null; + }).when(_network).setMaxReadIdleMillis(anyLong()); + doAnswer(invocation -> + { + maxWriteIdleMillis.set(invocation.getArgument(0)); + return null; + }).when(_network).setMaxWriteIdleMillis(anyLong()); + + when(_port.getNetworkBufferSize()).thenReturn(1024); + when(_port.getHeartbeatDelay()).thenReturn(readDelay); + when(_port.getAddressSpace("localhost")).thenReturn(_virtualHost); + + final AMQPConnection_1_0Impl connection = + new AMQPConnection_1_0Impl(_broker, _network, _port, Transport.TCP, 0, _aggregateTicket); + connection.setAddressSpace(_virtualHost); + + final Field field = AMQPConnection_1_0Impl.class.getDeclaredField("_connectionState"); + field.setAccessible(true); + field.set(connection, ConnectionState.AWAIT_OPEN); + + final Open open = mock(Open.class); + when(open.getContainerId()).thenReturn("container"); + when(open.getHostname()).thenReturn("localhost"); + when(open.getIdleTimeOut()).thenReturn(UnsignedInteger.valueOf(writeDelay)); + + connection.receiveOpen(1, open); + + assertEquals(readDelay * 2 * 1000, maxReadIdleMillis.get()); + assertEquals(writeDelay / 2, maxWriteIdleMillis.get()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org