This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 7608acbc07 QPID-8680 - [Broker-J] Broker should interpret the value of
qpid.port.heartbeatDelay as half of the actual idle timeout (#254)
7608acbc07 is described below
commit 7608acbc078f25cab5d8d1cdbe65e6a6f220aea6
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue Feb 18 08:14:36 2025 +0100
QPID-8680 - [Broker-J] Broker should interpret the value of
qpid.port.heartbeatDelay as half of the actual idle timeout (#254)
---
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 2 +-
.../protocol/v1_0/AMQPConnection_1_0ImplTest.java | 56 +++++++++++++++++++++-
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 2bd689ef6f..d309048cd7 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -897,7 +897,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
else
{
- initialiseHeartbeating(_outgoingIdleTimeout / 2L,
_incomingIdleTimeout);
+ initialiseHeartbeating(_outgoingIdleTimeout / 2L,
_incomingIdleTimeout * 2);
final NamedAddressSpace addressSpace =
getPort().getAddressSpace(_localHostname);
if (addressSpace == null)
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
index 47e4975018..a67c76de66 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0ImplTest.java
@@ -25,12 +25,16 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.lang.reflect.Field;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -41,7 +45,10 @@ import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -67,7 +74,7 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase
when(_port.getModel()).thenReturn(model);
when(_port.getTaskExecutor()).thenReturn(taskExecutor);
when(_port.getChildExecutor()).thenReturn(taskExecutor);
- _aggregateTicket = mock(AggregateTicker.class);
+ _aggregateTicket = new AggregateTicker();
_virtualHost = BrokerTestHelper.createVirtualHost("test", _broker,
true, this);
}
@@ -176,4 +183,51 @@ class AMQPConnection_1_0ImplTest extends UnitTestBase
assertEquals(0L, statisticsAfterReset.get("transactedMessagesIn"));
assertEquals(0L, statisticsAfterReset.get("transactedMessagesOut"));
}
+
+ @Test
+ void heartbeat() throws Exception
+ {
+ final int writeDelay = 5000;
+ final int readDelay = 12;
+ final AtomicLong maxReadIdleMillis = new AtomicLong();
+ final AtomicLong maxWriteIdleMillis = new AtomicLong();
+
+ when(_broker.getNetworkBufferSize()).thenReturn(1024);
+
+ final ByteBufferSender sender = mock(ByteBufferSender.class);
+ when(sender.isDirectBufferPreferred()).thenReturn(true);
+ when(_network.getSender()).thenReturn(sender);
+ doAnswer(invocation ->
+ {
+ maxReadIdleMillis.set(invocation.getArgument(0));
+ return null;
+ }).when(_network).setMaxReadIdleMillis(anyLong());
+ doAnswer(invocation ->
+ {
+ maxWriteIdleMillis.set(invocation.getArgument(0));
+ return null;
+ }).when(_network).setMaxWriteIdleMillis(anyLong());
+
+ when(_port.getNetworkBufferSize()).thenReturn(1024);
+ when(_port.getHeartbeatDelay()).thenReturn(readDelay);
+ when(_port.getAddressSpace("localhost")).thenReturn(_virtualHost);
+
+ final AMQPConnection_1_0Impl connection =
+ new AMQPConnection_1_0Impl(_broker, _network, _port,
Transport.TCP, 0, _aggregateTicket);
+ connection.setAddressSpace(_virtualHost);
+
+ final Field field =
AMQPConnection_1_0Impl.class.getDeclaredField("_connectionState");
+ field.setAccessible(true);
+ field.set(connection, ConnectionState.AWAIT_OPEN);
+
+ final Open open = mock(Open.class);
+ when(open.getContainerId()).thenReturn("container");
+ when(open.getHostname()).thenReturn("localhost");
+
when(open.getIdleTimeOut()).thenReturn(UnsignedInteger.valueOf(writeDelay));
+
+ connection.receiveOpen(1, open);
+
+ assertEquals(readDelay * 2 * 1000, maxReadIdleMillis.get());
+ assertEquals(writeDelay / 2, maxWriteIdleMillis.get());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]