This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 6685c0c347 QPID-8700 - [Broker-J]
NonBlockingConnection#shutdownFinalWrite() may loop infinitely (#299)
6685c0c347 is described below
commit 6685c0c347093b025adb074327752826f149c437
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue Jul 29 10:06:53 2025 +0200
QPID-8700 - [Broker-J] NonBlockingConnection#shutdownFinalWrite() may loop
infinitely (#299)
---
.../apache/qpid/server/model/port/AmqpPort.java | 12 +
.../server/transport/NonBlockingConnection.java | 15 +-
.../transport/NonBlockingConnectionTest.java | 308 +++++++++++++++++++++
.../server/transport/TCPandSSLTransportTest.java | 2 +
4 files changed, 335 insertions(+), 2 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index b51f90d710..20afa83d11 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -159,6 +159,18 @@ public interface AmqpPort<X extends AmqpPort<X>> extends
Port<X>
description = "The connection property enrichers to apply to
connections created on this port.")
String DEFAULT_CONNECTION_PROTOCOL_ENRICHERS = "[ \"STANDARD\" ] ";
+ String FINAL_WRITE_THRESHOLD = "qpid.port.final_write_threshold";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = FINAL_WRITE_THRESHOLD, description =
"Threshold to check for final write timeout.")
+ int DEFAULT_FINAL_WRITE_THRESHOLD = 100;
+
+ String FINAL_WRITE_TIMEOUT = "qpid.port.final_write_timeout";
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = FINAL_WRITE_TIMEOUT,
+ description = "Maximum time allowed for a connection to be
closed." +
+ " If the connection does not close this time, it will be aborted.")
+ long DEFAULT_FINAL_WRITE_TIMEOUT = 1000L;
+
@ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_TCP_NO_DELAY )
boolean isTcpNoDelay();
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index fdc99dd0eb..aab4806a2c 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -58,6 +58,8 @@ public class NonBlockingConnection implements
ServerNetworkConnection, ByteBuffe
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
+ private final int _finalWriteThreshold;
+ private final long _finalWriteTimeout;
private volatile boolean _fullyWritten = true;
@@ -110,7 +112,8 @@ public class NonBlockingConnection implements
ServerNetworkConnection, ByteBuffe
{
_delegate = new NonBlockingConnectionUndecidedDelegate(this);
}
-
+ _finalWriteThreshold = port.getContextValue(Integer.class,
AmqpPort.FINAL_WRITE_THRESHOLD);
+ _finalWriteTimeout = port.getContextValue(Long.class,
AmqpPort.FINAL_WRITE_TIMEOUT);
}
String getThreadName()
@@ -414,8 +417,16 @@ public class NonBlockingConnection implements
ServerNetworkConnection, ByteBuffe
{
try
{
- while(!doWrite())
+ final long startTime = System.currentTimeMillis();
+ int cnt = 0;
+ while (!doWrite())
{
+ if (cnt % _finalWriteThreshold == 0 &&
System.currentTimeMillis() - startTime > _finalWriteTimeout)
+ {
+ final long executionTime = System.currentTimeMillis() -
startTime;
+ throw new IOException("Failed to perform final write to
connection after " + executionTime + " ms timeout");
+ }
+ cnt++;
}
}
catch (IOException e)
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
new file mode 100644
index 0000000000..b18416a8fd
--- /dev/null
+++
b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.Context;
+import ch.qos.logback.core.LogbackException;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.spi.FilterReply;
+import ch.qos.logback.core.status.Status;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.network.TransportEncryption;
+
+class NonBlockingConnectionTest
+{
+ private static final TestAppender appender = new
TestAppender(NonBlockingConnection.class);
+ private static final ch.qos.logback.classic.Logger ROOT_LOGGER =
+ (ch.qos.logback.classic.Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+
+ private final SelectorThread.SelectionTask selectionTask =
mock(SelectorThread.SelectionTask.class);
+
+ private NonBlockingConnection _nonBlockingConnection;
+
+ @BeforeAll
+ static void beforeAll()
+ {
+ ROOT_LOGGER.addAppender(appender);
+ }
+
+ @AfterAll
+ static void afterAll()
+ {
+ ROOT_LOGGER.detachAppender(appender.getName());
+ }
+
+ @BeforeEach
+ void beforeEach()
+ {
+ final SocketAddress localAddress = mock(SocketAddress.class);
+ when(localAddress.toString()).thenReturn("127.0.0.1:5672");
+ final Socket socket = mock(Socket.class);
+ when(socket.getRemoteSocketAddress()).thenReturn(new
InetSocketAddress("localhost", 1000));
+ when(socket.getLocalSocketAddress()).thenReturn(localAddress);
+ final SocketChannel socketChannel = mock(SocketChannel.class);
+ when(socketChannel.socket()).thenReturn(socket);
+ final ProtocolEngine protocolEngine = mock(ProtocolEngine.class);
+ final NetworkConnectionScheduler scheduler =
mock(NetworkConnectionScheduler.class);
+ final AmqpPort<?> port = mock(AmqpPort.class);
+ final EventLogger eventLogger = mock(EventLogger.class);
+ final Broker broker = mock(Broker.class);
+ when(broker.getEventLogger()).thenReturn(eventLogger);
+ when(port.getContextValue(Integer.class,
AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100);
+ when(port.getContextValue(Long.class,
AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L);
+ when(port.getParent()).thenReturn(broker);
+ final Set<TransportEncryption> encryptionSet =
Set.of(TransportEncryption.NONE);
+
+ _nonBlockingConnection =
+ new NonBlockingConnection(socketChannel, protocolEngine,
encryptionSet, () -> {}, scheduler, port);
+
+ appender.clear();
+ }
+
+ /** Delegate always returns WriteResult containing complete = false,
causing an infinite loop in
+ * NonBlockingConnection#shutdownFinalWrite(), which should be handled by
the timeout handling */
+ @Test
+ void shutdownFinalWriteLooping() throws Exception
+ {
+ // construct delegate mock returning WriteResult[complete=false]
+ final NonBlockingConnectionPlainDelegate delegate =
mock(NonBlockingConnectionPlainDelegate.class);
+ when(delegate.doWrite(any())).thenReturn(new
NonBlockingConnectionDelegate.WriteResult(false, 0L));
+ injectDelegate(delegate);
+
+ // close the connection
+ _nonBlockingConnection.setSelectionTask(selectionTask);
+ _nonBlockingConnection.close();
+ _nonBlockingConnection.doWork();
+
+ // there should be only 2 log messages
+ assertEquals(2, appender.getEvents().size());
+
+ // first log message states that connection will be closed
+ final ILoggingEvent firstLogEntry = appender.getEvents().get(0);
+ assertEquals(Level.DEBUG, firstLogEntry.getLevel());
+ assertEquals("Closing localhost/127.0.0.1:1000",
firstLogEntry.getMessage());
+
+ // second log message informs about timeout which happened during
shutdownFinalWrite()
+ final ILoggingEvent secondLogEntry = appender.getEvents().get(1);
+ assertEquals(Level.INFO, secondLogEntry.getLevel());
+ assertEquals("Exception performing final write/close for '{}': {}",
secondLogEntry.getMessage());
+ assertEquals("localhost/127.0.0.1:1000",
secondLogEntry.getArgumentArray()[0]);
+
assertTrue(String.valueOf(secondLogEntry.getArgumentArray()[1]).startsWith("Failed
to perform final write to connection after"));
+ }
+
+ /** Delegate immediately returns WriteResult containing complete = true,
no timeout handling involved */
+ @Test
+ void shutdownFinalWriteWithoutLooping() throws Exception
+ {
+ // construct delegate mock returning WriteResult[complete=true]
+ final NonBlockingConnectionPlainDelegate delegate =
mock(NonBlockingConnectionPlainDelegate.class);
+ when(delegate.doWrite(any())).thenReturn(new
NonBlockingConnectionDelegate.WriteResult(true, 0L));
+ injectDelegate(delegate);
+
+ // close the connection
+ _nonBlockingConnection.setSelectionTask(selectionTask);
+ _nonBlockingConnection.close();
+ _nonBlockingConnection.doWork();
+
+ // there should be only 1 log message (no timeout)
+ assertEquals(1, appender.getEvents().size());
+
+ // first log message states that connection will be closed
+ final ILoggingEvent firstLogEntry = appender.getEvents().get(0);
+ assertEquals(Level.DEBUG, firstLogEntry.getLevel());
+ assertEquals("Closing localhost/127.0.0.1:1000",
firstLogEntry.getMessage());
+ }
+
+ /** Inject delegate using reflection */
+ private void injectDelegate(final NonBlockingConnectionPlainDelegate
delegate) throws Exception
+ {
+ final Field delegateField =
NonBlockingConnection.class.getDeclaredField("_delegate");
+ delegateField.setAccessible(true);
+ delegateField.set(_nonBlockingConnection, delegate);
+ }
+
+ /** Logging Appender string list of logging events */
+ static class TestAppender implements Appender<ILoggingEvent>
+ {
+ private final String className;
+ private final List<ILoggingEvent> _events = new ArrayList<>();
+
+ TestAppender(final Class<?> clazz)
+ {
+ className = clazz.getCanonicalName();
+ }
+
+ @Override
+ public String getName()
+ {
+ return getClass().getSimpleName();
+ }
+
+ @Override
+ public void doAppend(final ILoggingEvent iLoggingEvent) throws
LogbackException
+ {
+ if (className.equals(iLoggingEvent.getLoggerName()))
+ {
+ _events.add(iLoggingEvent);
+ }
+ }
+
+ @Override
+ public void setName(final String s)
+ {
+
+ }
+
+ @Override
+ public void setContext(final Context context)
+ {
+
+ }
+
+ @Override
+ public Context getContext()
+ {
+ return null;
+ }
+
+ @Override
+ public void addStatus(final Status status)
+ {
+
+ }
+
+ @Override
+ public void addInfo(final String s)
+ {
+
+ }
+
+ @Override
+ public void addInfo(final String s, final Throwable throwable)
+ {
+
+ }
+
+ @Override
+ public void addWarn(final String s)
+ {
+
+ }
+
+ @Override
+ public void addWarn(final String s, final Throwable throwable)
+ {
+
+ }
+
+ @Override
+ public void addError(final String s)
+ {
+
+ }
+
+ @Override
+ public void addError(final String s, final Throwable throwable)
+ {
+
+ }
+
+ @Override
+ public void addFilter(final Filter<ILoggingEvent> filter)
+ {
+
+ }
+
+ @Override
+ public void clearAllFilters()
+ {
+
+ }
+
+ @Override
+ public List<Filter<ILoggingEvent>> getCopyOfAttachedFiltersList()
+ {
+ return List.of();
+ }
+
+ @Override
+ public FilterReply getFilterChainDecision(final ILoggingEvent
iLoggingEvent)
+ {
+ return null;
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
+ public void stop()
+ {
+
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ return true;
+ }
+
+ public List<ILoggingEvent> getEvents()
+ {
+ return _events;
+ }
+
+ public void clear()
+ {
+ _events.clear();
+ }
+ }
+}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 510177a91c..435607210a 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -213,6 +213,8 @@ public class TCPandSSLTransportTest extends UnitTestBase
when(port.getContextValue(Boolean.class,
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false);
when(port.getContextValue(Integer.class,
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000);
when(port.getContextValue(Integer.class,
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005);
+ when(port.getContextValue(Integer.class,
AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100);
+ when(port.getContextValue(Long.class,
AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L);
final ObjectMapper mapper = new ObjectMapper();
final JavaType type =
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
final List<String> allowList =
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]