This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b152feeec2 IGNITE-27639 Close accepted data channels explicitly on 
stop (#7458)
5b152feeec2 is described below

commit 5b152feeec2c35b0a66b481dfc29714f18298630
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jan 23 16:08:25 2026 +0400

    IGNITE-27639 Close accepted data channels explicitly on stop (#7458)
---
 .../ignite/internal/network/netty/NettyServer.java | 101 +++++++++++++++------
 .../internal/network/netty/NettyServerTest.java    |  26 ++++++
 2 files changed, 100 insertions(+), 27 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 17013239321..624b72f685d 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static java.util.Objects.requireNonNullElse;
+import static 
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Network.BIND_ERR;
@@ -24,13 +26,17 @@ import static 
org.apache.ignite.lang.ErrorGroups.Network.BIND_ERR;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -75,11 +81,11 @@ public class NettyServer {
 
     /** Server socket channel. */
     @Nullable
-    private volatile ServerChannel channel;
+    private volatile ServerChannel serverChannel;
 
     /** Server close future. */
     @Nullable
-    private CompletableFuture<Void> serverCloseFuture;
+    private CompletableFuture<Void> serverChannelCloseFuture;
 
     /** Flag indicating if {@link #stop()} has been called. */
     private boolean stopped;
@@ -87,6 +93,9 @@ public class NettyServer {
     /** {@code null} if SSL is not {@link SslConfigurationSchema#enabled}. */
     private final @Nullable SslContext sslContext;
 
+    /** Guarded by {@link #startStopLock}. */
+    private final Set<SocketChannel> acceptedChannels = new HashSet<>();
+
     /**
      * Constructor.
      *
@@ -131,20 +140,22 @@ public class NettyServer {
             ServerBootstrap bootstrap = 
bootstrapFactory.createServerBootstrap();
 
             bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        public void initChannel(SocketChannel ch) {
-                            var sessionSerializationService = new 
PerSessionSerializationService(serializationService);
+                @Override
+                public void initChannel(SocketChannel ch) {
+                    registerAcceptedChannelOrCloseIfStopped(ch);
 
-                            // Get handshake manager for the new channel.
-                            HandshakeManager manager = handshakeManager.get();
+                    var sessionSerializationService = new 
PerSessionSerializationService(serializationService);
 
-                            if (sslContext != null) {
-                                PipelineUtils.setup(ch.pipeline(), 
sessionSerializationService, manager, messageListener, sslContext);
-                            } else {
-                                PipelineUtils.setup(ch.pipeline(), 
sessionSerializationService, manager, messageListener);
-                            }
-                        }
-                    });
+                    // Get handshake manager for the new channel.
+                    HandshakeManager manager = handshakeManager.get();
+
+                    if (sslContext != null) {
+                        PipelineUtils.setup(ch.pipeline(), 
sessionSerializationService, manager, messageListener, sslContext);
+                    } else {
+                        PipelineUtils.setup(ch.pipeline(), 
sessionSerializationService, manager, messageListener);
+                    }
+                }
+            });
 
             int port = configuration.port();
             String[] addresses = configuration.listenAddresses();
@@ -179,10 +190,10 @@ public class NettyServer {
                     .handle((channel, err) -> {
                         synchronized (startStopLock) {
                             if (channel != null) {
-                                serverCloseFuture = 
NettyUtils.toCompletableFuture(channel.closeFuture());
+                                serverChannelCloseFuture = 
toCompletableFuture(channel.closeFuture());
                             }
 
-                            this.channel = (ServerChannel) channel;
+                            this.serverChannel = (ServerChannel) channel;
 
                             if (err != null || stopped) {
                                 Throwable stopErr = err != null ? err : new 
CancellationException("Server was stopped");
@@ -199,13 +210,29 @@ public class NettyServer {
         }
     }
 
+    private void registerAcceptedChannelOrCloseIfStopped(SocketChannel ch) {
+        synchronized (startStopLock) {
+            if (stopped) {
+                ch.close();
+            } else {
+                acceptedChannels.add(ch);
+
+                ch.closeFuture().addListener((ChannelFutureListener) future -> 
{
+                    synchronized (startStopLock) {
+                        acceptedChannels.remove(ch);
+                    }
+                });
+            }
+        }
+    }
+
     /**
      * Returns address to which the server is bound (might be an 'any 
local'/wildcard address if bound to all interfaces).
      *
      * @return Gets the local address of the server.
      */
     public SocketAddress address() {
-        return Objects.requireNonNull(channel, "Not started 
yet").localAddress();
+        return Objects.requireNonNull(serverChannel, "Not started 
yet").localAddress();
     }
 
     /**
@@ -225,16 +252,29 @@ public class NettyServer {
                 return nullCompletedFuture();
             }
 
-            return serverStartFuture.handle((unused, throwable) -> {
-                synchronized (startStopLock) {
-                    ServerChannel localChannel = channel;
-                    if (localChannel != null) {
-                        localChannel.close();
-                    }
+            return serverStartFuture
+                    .handle((unused, throwable) -> {
+                        synchronized (startStopLock) {
+                            ServerChannel localServerChannel = serverChannel;
+                            if (localServerChannel != null) {
+                                localServerChannel.close();
+                            }
 
-                    return serverCloseFuture == null ? 
CompletableFutures.<Void>nullCompletedFuture() : serverCloseFuture;
-                }
-            }).thenCompose(Function.identity());
+                            return 
requireNonNullElse(serverChannelCloseFuture, 
CompletableFutures.<Void>nullCompletedFuture());
+                        }
+                    })
+                    .thenCompose(Function.identity())
+                    .thenCompose(unused -> {
+                        synchronized (startStopLock) {
+                            List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>();
+
+                            for (Channel acceptedChannel : new 
HashSet<>(acceptedChannels)) {
+                                
closeFutures.add(toCompletableFuture(acceptedChannel.close()));
+                            }
+
+                            return CompletableFutures.allOf(closeFutures);
+                        }
+                    });
         }
     }
 
@@ -245,8 +285,15 @@ public class NettyServer {
      */
     @TestOnly
     public boolean isRunning() {
-        var channel0 = channel;
+        ServerChannel channel0 = serverChannel;
 
         return channel0 != null && channel0.isOpen();
     }
+
+    @TestOnly
+    boolean hasAcceptedChannels() {
+        synchronized (startStopLock) {
+            return !acceptedChannels.isEmpty();
+        }
+    }
 }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index d4aa2679182..b71bd4c93fa 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -20,11 +20,13 @@ package org.apache.ignite.internal.network.netty;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -43,8 +45,11 @@ import io.netty.channel.ServerChannel;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.Socket;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -267,6 +272,27 @@ public class NettyServerTest extends 
BaseIgniteAbstractTest {
         order.verify(handshakeManager, timeout()).onMessage(any());
     }
 
+    @Test
+    public void acceptedChannelsGetClosedOnStop() throws Exception {
+        server = getServer(true);
+
+        try (
+                var socket = new Socket("localhost", 3344);
+                OutputStream out = socket.getOutputStream()
+        ) {
+            out.write(2);
+            out.flush();
+
+            await().until(server::hasAcceptedChannels);
+
+            assertThat(server.stop(), willCompleteSuccessfully());
+
+            assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
+                assertThrows(IOException.class, () -> 
socket.getInputStream().read());
+            });
+        }
+    }
+
     private HandshakeManager mockHandshakeManager() {
         HandshakeManager handshakeManager = mock(HandshakeManager.class);
 

Reply via email to