This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5b152feeec2 IGNITE-27639 Close accepted data channels explicitly on
stop (#7458)
5b152feeec2 is described below
commit 5b152feeec2c35b0a66b481dfc29714f18298630
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jan 23 16:08:25 2026 +0400
IGNITE-27639 Close accepted data channels explicitly on stop (#7458)
---
.../ignite/internal/network/netty/NettyServer.java | 101 +++++++++++++++------
.../internal/network/netty/NettyServerTest.java | 26 ++++++
2 files changed, 100 insertions(+), 27 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 17013239321..624b72f685d 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.network.netty;
+import static java.util.Objects.requireNonNullElse;
+import static
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Network.BIND_ERR;
@@ -24,13 +26,17 @@ import static
org.apache.ignite.lang.ErrorGroups.Network.BIND_ERR;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -75,11 +81,11 @@ public class NettyServer {
/** Server socket channel. */
@Nullable
- private volatile ServerChannel channel;
+ private volatile ServerChannel serverChannel;
/** Server close future. */
@Nullable
- private CompletableFuture<Void> serverCloseFuture;
+ private CompletableFuture<Void> serverChannelCloseFuture;
/** Flag indicating if {@link #stop()} has been called. */
private boolean stopped;
@@ -87,6 +93,9 @@ public class NettyServer {
/** {@code null} if SSL is not {@link SslConfigurationSchema#enabled}. */
private final @Nullable SslContext sslContext;
+ /** Guarded by {@link #startStopLock}. */
+ private final Set<SocketChannel> acceptedChannels = new HashSet<>();
+
/**
* Constructor.
*
@@ -131,20 +140,22 @@ public class NettyServer {
ServerBootstrap bootstrap =
bootstrapFactory.createServerBootstrap();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) {
- var sessionSerializationService = new
PerSessionSerializationService(serializationService);
+ @Override
+ public void initChannel(SocketChannel ch) {
+ registerAcceptedChannelOrCloseIfStopped(ch);
- // Get handshake manager for the new channel.
- HandshakeManager manager = handshakeManager.get();
+ var sessionSerializationService = new
PerSessionSerializationService(serializationService);
- if (sslContext != null) {
- PipelineUtils.setup(ch.pipeline(),
sessionSerializationService, manager, messageListener, sslContext);
- } else {
- PipelineUtils.setup(ch.pipeline(),
sessionSerializationService, manager, messageListener);
- }
- }
- });
+ // Get handshake manager for the new channel.
+ HandshakeManager manager = handshakeManager.get();
+
+ if (sslContext != null) {
+ PipelineUtils.setup(ch.pipeline(),
sessionSerializationService, manager, messageListener, sslContext);
+ } else {
+ PipelineUtils.setup(ch.pipeline(),
sessionSerializationService, manager, messageListener);
+ }
+ }
+ });
int port = configuration.port();
String[] addresses = configuration.listenAddresses();
@@ -179,10 +190,10 @@ public class NettyServer {
.handle((channel, err) -> {
synchronized (startStopLock) {
if (channel != null) {
- serverCloseFuture =
NettyUtils.toCompletableFuture(channel.closeFuture());
+ serverChannelCloseFuture =
toCompletableFuture(channel.closeFuture());
}
- this.channel = (ServerChannel) channel;
+ this.serverChannel = (ServerChannel) channel;
if (err != null || stopped) {
Throwable stopErr = err != null ? err : new
CancellationException("Server was stopped");
@@ -199,13 +210,29 @@ public class NettyServer {
}
}
+ private void registerAcceptedChannelOrCloseIfStopped(SocketChannel ch) {
+ synchronized (startStopLock) {
+ if (stopped) {
+ ch.close();
+ } else {
+ acceptedChannels.add(ch);
+
+ ch.closeFuture().addListener((ChannelFutureListener) future ->
{
+ synchronized (startStopLock) {
+ acceptedChannels.remove(ch);
+ }
+ });
+ }
+ }
+ }
+
/**
* Returns address to which the server is bound (might be an 'any
local'/wildcard address if bound to all interfaces).
*
* @return Gets the local address of the server.
*/
public SocketAddress address() {
- return Objects.requireNonNull(channel, "Not started
yet").localAddress();
+ return Objects.requireNonNull(serverChannel, "Not started
yet").localAddress();
}
/**
@@ -225,16 +252,29 @@ public class NettyServer {
return nullCompletedFuture();
}
- return serverStartFuture.handle((unused, throwable) -> {
- synchronized (startStopLock) {
- ServerChannel localChannel = channel;
- if (localChannel != null) {
- localChannel.close();
- }
+ return serverStartFuture
+ .handle((unused, throwable) -> {
+ synchronized (startStopLock) {
+ ServerChannel localServerChannel = serverChannel;
+ if (localServerChannel != null) {
+ localServerChannel.close();
+ }
- return serverCloseFuture == null ?
CompletableFutures.<Void>nullCompletedFuture() : serverCloseFuture;
- }
- }).thenCompose(Function.identity());
+ return
requireNonNullElse(serverChannelCloseFuture,
CompletableFutures.<Void>nullCompletedFuture());
+ }
+ })
+ .thenCompose(Function.identity())
+ .thenCompose(unused -> {
+ synchronized (startStopLock) {
+ List<CompletableFuture<Void>> closeFutures = new
ArrayList<>();
+
+ for (Channel acceptedChannel : new
HashSet<>(acceptedChannels)) {
+
closeFutures.add(toCompletableFuture(acceptedChannel.close()));
+ }
+
+ return CompletableFutures.allOf(closeFutures);
+ }
+ });
}
}
@@ -245,8 +285,15 @@ public class NettyServer {
*/
@TestOnly
public boolean isRunning() {
- var channel0 = channel;
+ ServerChannel channel0 = serverChannel;
return channel0 != null && channel0.isOpen();
}
+
+ @TestOnly
+ boolean hasAcceptedChannels() {
+ synchronized (startStopLock) {
+ return !acceptedChannels.isEmpty();
+ }
+ }
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index d4aa2679182..b71bd4c93fa 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -20,11 +20,13 @@ package org.apache.ignite.internal.network.netty;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -43,8 +45,11 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
+import java.io.IOException;
+import java.io.OutputStream;
import java.net.ConnectException;
import java.net.Socket;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -267,6 +272,27 @@ public class NettyServerTest extends
BaseIgniteAbstractTest {
order.verify(handshakeManager, timeout()).onMessage(any());
}
+ @Test
+ public void acceptedChannelsGetClosedOnStop() throws Exception {
+ server = getServer(true);
+
+ try (
+ var socket = new Socket("localhost", 3344);
+ OutputStream out = socket.getOutputStream()
+ ) {
+ out.write(2);
+ out.flush();
+
+ await().until(server::hasAcceptedChannels);
+
+ assertThat(server.stop(), willCompleteSuccessfully());
+
+ assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
+ assertThrows(IOException.class, () ->
socket.getInputStream().read());
+ });
+ }
+ }
+
private HandshakeManager mockHandshakeManager() {
HandshakeManager handshakeManager = mock(HandshakeManager.class);