This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a06522dce2d IGNITE-28459 Fixed unhandled NIO Session Requests during 
node stop (#12999)
a06522dce2d is described below

commit a06522dce2d39a9ce22b775157776e36c25476e4
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu May 7 00:27:52 2026 +0300

    IGNITE-28459 Fixed unhandled NIO Session Requests during node stop (#12999)
---
 .../ignite/internal/util/nio/GridNioServer.java    | 465 ++++++++++++---------
 .../ignite/internal/util/nio/GridNioWorker.java    |   5 +-
 .../util/nio/GridSelectorNioSessionImpl.java       |   6 +-
 3 files changed, 263 insertions(+), 213 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e1268673c14..797290677db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -585,7 +585,12 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, 
NioOperation.CLOSE);
 
-        impl.offerStateChange(fut);
+        try {
+            impl.offerStateChange(fut);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
 
         return fut;
     }
@@ -772,7 +777,12 @@ public class GridNioServer<T> {
             ses0.procWrite.set(true);
 
             // Wake up worker.
-            ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
+            try {
+                
ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to notify NIO Server while resending 
messages [rmtNode=" + recoveryDesc.node().id() + ']', e);
+            }
         }
     }
 
@@ -804,8 +814,13 @@ public class GridNioServer<T> {
 
         SessionMoveFuture fut = new SessionMoveFuture(ses0, to);
 
-        if (!ses0.offerMove(clientWorkers.get(from), fut))
-            fut.onDone(false);
+        try {
+            if (!ses0.offerMove(clientWorkers.get(from), fut))
+                fut.onDone(false);
+        }
+        catch (IgniteCheckedException e) {
+            fut.onDone(e);
+        }
     }
 
     /**
@@ -813,7 +828,7 @@ public class GridNioServer<T> {
      * @param op Operation.
      * @return Future for operation.
      */
-    private IgniteInternalFuture<?> pauseResumeReads(GridNioSession ses, 
NioOperation op) {
+    private IgniteInternalFuture<?> pauseResumeReads(GridNioSession ses, 
NioOperation op) throws IgniteCheckedException {
         assert ses instanceof GridSelectorNioSessionImpl;
         assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
 
@@ -874,9 +889,14 @@ public class GridNioServer<T> {
 
             opFut.msg = p;
 
-            clientWorkers.get(i).offer(opFut);
+            try {
+                clientWorkers.get(i).offer(opFut);
 
-            fut.add(opFut);
+                fut.add(opFut);
+            }
+            catch (IgniteCheckedException e) {
+                fut.add(new GridFinishedFuture<>(e));
+            }
         }
 
         fut.markInitialized();
@@ -919,9 +939,14 @@ public class GridNioServer<T> {
 
             opFut.msg = p;
 
-            clientWorkers.get(i).offer(opFut);
+            try {
+                clientWorkers.get(i).offer(opFut);
 
-            fut.add(opFut);
+                fut.add(opFut);
+            }
+            catch (IgniteCheckedException e) {
+                fut.add(new GridFinishedFuture<>(e));
+            }
         }
 
         fut.markInitialized();
@@ -967,7 +992,7 @@ public class GridNioServer<T> {
                 return new GridFinishedFuture<>(
                     new IgniteCheckedException("Failed to create session, 
server is stopped."));
         }
-        catch (IOException e) {
+        catch (IgniteCheckedException | IOException e) {
             return new GridFinishedFuture<>(e);
         }
     }
@@ -977,7 +1002,10 @@ public class GridNioServer<T> {
      * @param meta Session meta.
      */
     public IgniteInternalFuture<GridNioSession> cancelConnect(final 
SocketChannel ch, Map<Integer, ?> meta) {
-        if (!closed) {
+        if (closed)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed 
to cancel connection, server is stopped"));
+
+        try {
             NioOperationFuture<GridNioSession> req = new 
NioOperationFuture<>(ch, false, meta);
 
             req.op = NioOperation.CANCEL_CONNECT;
@@ -990,9 +1018,9 @@ public class GridNioServer<T> {
 
             return req;
         }
-        else
-            return new GridFinishedFuture<>(
-                new IgniteCheckedException("Failed to cancel connection, 
server is stopped."));
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed 
to cancel connection", e));
+        }
     }
 
     /**
@@ -1083,7 +1111,7 @@ public class GridNioServer<T> {
      * @param req Request to balance.
      * @param meta Session metadata.
      */
-    private synchronized void offerBalanced(NioOperationFuture req, @Nullable 
Map<Integer, Object> meta) {
+    private synchronized void offerBalanced(NioOperationFuture req, @Nullable 
Map<Integer, Object> meta) throws IgniteCheckedException {
         assert req.operation() == NioOperation.REGISTER || req.operation() == 
NioOperation.CONNECT : req;
         assert req.socketChannel() != null : req;
 
@@ -2038,7 +2066,10 @@ public class GridNioServer<T> {
          *
          * @param req Change request.
          */
-        @Override public void offer(SessionChangeRequest req) {
+        @Override public void offer(SessionChangeRequest req) throws 
IgniteCheckedException {
+            if (isCancelled())
+                throw new IgniteCheckedException("NIO client worker has been 
stopped");
+
             changeReqs.offer(req);
 
             if (select)
@@ -2046,7 +2077,10 @@ public class GridNioServer<T> {
         }
 
         /** {@inheritDoc} */
-        @Override public void offer(Collection<SessionChangeRequest> reqs) {
+        @Override public void offer(Collection<SessionChangeRequest> reqs) 
throws IgniteCheckedException {
+            if (isCancelled())
+                throw new IgniteCheckedException("NIO client worker has been 
stopped");
+
             for (SessionChangeRequest req : reqs)
                 changeReqs.offer(req);
 
@@ -2078,185 +2112,19 @@ public class GridNioServer<T> {
          *
          * @throws IgniteCheckedException If IOException occurred or thread 
was unable to add worker to workers pool.
          */
-        @SuppressWarnings("unchecked")
         private void bodyInternal() throws IgniteCheckedException, 
InterruptedException {
             try {
                 long lastIdleCheck = U.currentTimeMillis();
 
-                mainLoop:
-                while (!closed && selector.isOpen()) {
-                    SessionChangeRequest req0;
+                while (selector.isOpen() && !(isCancelled() && 
changeReqs.isEmpty())) {
+                    SessionChangeRequest req;
 
                     updateHeartbeat();
 
-                    while ((req0 = changeReqs.poll()) != null) {
+                    while ((req = changeReqs.poll()) != null) {
                         updateHeartbeat();
 
-                        switch (req0.operation()) {
-                            case CONNECT: {
-                                NioOperationFuture fut = 
(NioOperationFuture)req0;
-
-                                SocketChannel ch = fut.socketChannel();
-
-                                try {
-                                    ch.register(selector, 
SelectionKey.OP_CONNECT, fut);
-                                }
-                                catch (IOException e) {
-                                    fut.onDone(new 
IgniteCheckedException("Failed to register channel on selector", e));
-                                }
-
-                                break;
-                            }
-
-                            case CANCEL_CONNECT: {
-                                NioOperationFuture req = 
(NioOperationFuture)req0;
-
-                                SocketChannel ch = req.socketChannel();
-
-                                SelectionKey key = ch.keyFor(selector);
-
-                                if (key != null)
-                                    key.cancel();
-
-                                U.closeQuiet(ch);
-
-                                req.onDone();
-
-                                break;
-                            }
-
-                            case REGISTER: {
-                                register((NioOperationFuture)req0);
-
-                                break;
-                            }
-
-                            case MOVE: {
-                                SessionMoveFuture f = (SessionMoveFuture)req0;
-
-                                GridSelectorNioSessionImpl ses = f.session();
-
-                                if (idx == f.toIdx) {
-                                    assert f.movedSocketChannel() != null : f;
-
-                                    boolean add = workerSessions.add(ses);
-
-                                    assert add;
-
-                                    ses.finishMoveSession(this);
-
-                                    if (idx % 2 == 0)
-                                        readerMoveCnt.incrementAndGet();
-                                    else
-                                        writerMoveCnt.incrementAndGet();
-
-                                    SelectionKey key = 
f.movedSocketChannel().register(selector,
-                                        SelectionKey.OP_READ | 
SelectionKey.OP_WRITE,
-                                        ses);
-
-                                    ses.key(key);
-
-                                    ses.procWrite.set(true);
-
-                                    f.onDone(true);
-                                }
-                                else {
-                                    assert f.movedSocketChannel() == null : f;
-
-                                    if (workerSessions.remove(ses)) {
-                                        ses.startMoveSession(this);
-
-                                        SelectionKey key = ses.key();
-
-                                        assert key.channel() != null : key;
-
-                                        
f.movedSocketChannel((SocketChannel)key.channel());
-
-                                        key.cancel();
-                                        commitKeyCancellation();
-
-                                        
clientWorkers.get(f.toIndex()).offer(f);
-                                    }
-                                    else
-                                        f.onDone(false);
-                                }
-
-                                break;
-                            }
-
-                            case REQUIRE_WRITE: {
-                                
registerWrite((GridSelectorNioSessionImpl)req0.session());
-
-                                break;
-                            }
-
-                            case CLOSE: {
-                                NioOperationFuture req = 
(NioOperationFuture)req0;
-
-                                if (close(req.session(), null))
-                                    req.onDone(true);
-                                else
-                                    req.onDone(false);
-
-                                break;
-                            }
-
-                            case PAUSE_READ: {
-                                NioOperationFuture req = 
(NioOperationFuture)req0;
-
-                                SelectionKey key = req.session().key();
-
-                                if (key.isValid()) {
-                                    key.interestOps(key.interestOps() & 
(~SelectionKey.OP_READ));
-
-                                    GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
-
-                                    ses.readsPaused(true);
-
-                                    req.onDone(true);
-                                }
-                                else
-                                    req.onDone(false);
-
-                                break;
-                            }
-
-                            case RESUME_READ: {
-                                NioOperationFuture req = 
(NioOperationFuture)req0;
-
-                                SelectionKey key = req.session().key();
-
-                                if (key.isValid()) {
-                                    key.interestOps(key.interestOps() | 
SelectionKey.OP_READ);
-
-                                    GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
-
-                                    ses.readsPaused(false);
-
-                                    req.onDone(true);
-                                }
-                                else
-                                    req.onDone(false);
-
-                                break;
-                            }
-
-                            case DUMP_STATS: {
-                                NioOperationFuture req = 
(NioOperationFuture)req0;
-
-                                IgnitePredicate<GridNioSession> p =
-                                    req.msg instanceof IgnitePredicate ? 
(IgnitePredicate<GridNioSession>)req.msg : null;
-
-                                StringBuilder sb = new StringBuilder();
-
-                                try {
-                                    dumpStats(sb, p, p != null);
-                                }
-                                finally {
-                                    req.onDone(sb.toString());
-                                }
-                            }
-                        }
+                        processSessionChangedRequest(req);
                     }
 
                     for (long i = 0; i < selectorSpins && 
selector.selectedKeys().isEmpty(); i++) {
@@ -2277,7 +2145,7 @@ public class GridNioServer<T> {
                         }
 
                         if (!changeReqs.isEmpty())
-                            continue mainLoop;
+                            break;
 
                         // Just in case we do busy selects.
                         long now = U.currentTimeMillis();
@@ -2289,14 +2157,14 @@ public class GridNioServer<T> {
                         }
 
                         if (isCancelled())
-                            return;
+                            break;
                     }
 
                     // Falling to blocking select.
                     select = true;
 
                     try {
-                        if (!changeReqs.isEmpty())
+                        if (!changeReqs.isEmpty() || isCancelled())
                             continue;
 
                         blockingSectionBegin();
@@ -2370,6 +2238,180 @@ public class GridNioServer<T> {
             }
         }
 
+        /** */
+        private void processSessionChangedRequest(SessionChangeRequest req0) 
throws IgniteCheckedException, IOException {
+            switch (req0.operation()) {
+                case CONNECT: {
+                    NioOperationFuture fut = (NioOperationFuture)req0;
+
+                    SocketChannel ch = fut.socketChannel();
+
+                    try {
+                        ch.register(selector, SelectionKey.OP_CONNECT, fut);
+                    }
+                    catch (IOException e) {
+                        fut.onDone(new IgniteCheckedException("Failed to 
register channel on selector", e));
+                    }
+
+                    break;
+                }
+
+                case CANCEL_CONNECT: {
+                    NioOperationFuture req = (NioOperationFuture)req0;
+
+                    SocketChannel ch = req.socketChannel();
+
+                    SelectionKey key = ch.keyFor(selector);
+
+                    if (key != null)
+                        key.cancel();
+
+                    U.closeQuiet(ch);
+
+                    req.onDone();
+
+                    break;
+                }
+
+                case REGISTER: {
+                    register((NioOperationFuture)req0);
+
+                    break;
+                }
+
+                case MOVE: {
+                    SessionMoveFuture f = (SessionMoveFuture)req0;
+
+                    GridSelectorNioSessionImpl ses = f.session();
+
+                    if (idx == f.toIdx) {
+                        assert f.movedSocketChannel() != null : f;
+
+                        boolean add = workerSessions.add(ses);
+
+                        assert add;
+
+                        ses.finishMoveSession(this);
+
+                        if (idx % 2 == 0)
+                            readerMoveCnt.incrementAndGet();
+                        else
+                            writerMoveCnt.incrementAndGet();
+
+                        SelectionKey key = 
f.movedSocketChannel().register(selector,
+                            SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+                            ses);
+
+                        ses.key(key);
+
+                        ses.procWrite.set(true);
+
+                        f.onDone(true);
+                    }
+                    else {
+                        assert f.movedSocketChannel() == null : f;
+
+                        if (workerSessions.remove(ses)) {
+                            ses.startMoveSession(this);
+
+                            SelectionKey key = ses.key();
+
+                            assert key.channel() != null : key;
+
+                            f.movedSocketChannel((SocketChannel)key.channel());
+
+                            key.cancel();
+                            commitKeyCancellation();
+
+                            clientWorkers.get(f.toIndex()).offer(f);
+                        }
+                        else
+                            f.onDone(false);
+                    }
+
+                    break;
+                }
+
+                case REQUIRE_WRITE: {
+                    registerWrite((GridSelectorNioSessionImpl)req0.session());
+
+                    break;
+                }
+
+                case CLOSE: {
+                    NioOperationFuture req = (NioOperationFuture)req0;
+
+                    if (close(req.session(), null))
+                        req.onDone(true);
+                    else
+                        req.onDone(false);
+
+                    break;
+                }
+
+                case PAUSE_READ: {
+                    NioOperationFuture req = (NioOperationFuture)req0;
+
+                    SelectionKey key = req.session().key();
+
+                    if (key.isValid()) {
+                        key.interestOps(key.interestOps() & 
(~SelectionKey.OP_READ));
+
+                        GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
+
+                        ses.readsPaused(true);
+
+                        req.onDone(true);
+                    }
+                    else
+                        req.onDone(false);
+
+                    break;
+                }
+
+                case RESUME_READ: {
+                    NioOperationFuture req = (NioOperationFuture)req0;
+
+                    SelectionKey key = req.session().key();
+
+                    if (key.isValid()) {
+                        key.interestOps(key.interestOps() | 
SelectionKey.OP_READ);
+
+                        GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
+
+                        ses.readsPaused(false);
+
+                        req.onDone(true);
+                    }
+                    else
+                        req.onDone(false);
+
+                    break;
+                }
+
+                case DUMP_STATS: {
+                    NioOperationFuture req = (NioOperationFuture)req0;
+
+                    IgnitePredicate<GridNioSession> p =
+                        req.msg instanceof IgnitePredicate ? 
(IgnitePredicate<GridNioSession>)req.msg : null;
+
+                    StringBuilder sb = new StringBuilder();
+
+                    try {
+                        dumpStats(sb, p, p != null);
+                    }
+                    finally {
+                        req.onDone(sb.toString());
+                    }
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isCancelled() {
+            return closed || super.isCancelled();
+        }
+
         /**
          * Makes sure that pending key cancellations are executed and the 
corresponding channels can be
          * re-registered with our selector without causing {@link 
java.nio.channels.CancelledKeyException}s.
@@ -2765,32 +2807,32 @@ public class GridNioServer<T> {
 
                 SelectionKey key;
 
-                if (!sockCh.isRegistered()) {
-                    assert fut.op == NioOperation.REGISTER : fut.op;
+                try {
+                    if (!sockCh.isRegistered()) {
+                        assert fut.op == NioOperation.REGISTER : fut.op;
 
-                    key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+                        key = sockCh.register(selector, SelectionKey.OP_READ, 
ses);
 
-                    ses.key(key);
+                        ses.key(key);
 
-                    resend(ses);
-                }
-                else {
-                    assert fut.op == NioOperation.CONNECT : fut.op;
+                        resend(ses);
+                    }
+                    else {
+                        assert fut.op == NioOperation.CONNECT : fut.op;
 
-                    key = sockCh.keyFor(selector);
+                        key = sockCh.keyFor(selector);
 
-                    key.attach(ses);
+                        key.attach(ses);
 
-                    key.interestOps(key.interestOps() & 
(~SelectionKey.OP_CONNECT));
-                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                        key.interestOps(key.interestOps() & 
(~SelectionKey.OP_CONNECT));
+                        key.interestOps(key.interestOps() | 
SelectionKey.OP_READ);
 
-                    ses.key(key);
-                }
+                        ses.key(key);
+                    }
 
-                sessions.add(ses);
-                workerSessions.add(ses);
+                    sessions.add(ses);
+                    workerSessions.add(ses);
 
-                try {
                     filterChain.onSessionOpened(ses);
 
                     fut.onDone(ses);
@@ -3231,7 +3273,14 @@ public class GridNioServer<T> {
          * @param sockCh Socket channel to be registered on one of the 
selectors.
          */
         private void addRegistrationRequest(SocketChannel sockCh) {
-            offerBalanced(new NioOperationFuture<>(sockCh, true, null), null);
+            try {
+                offerBalanced(new NioOperationFuture<>(sockCh, true, null), 
null);
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Incoming connection was rejected [addr=" + 
sockCh.socket().getRemoteSocketAddress() + ']', e);
+
+                U.close(sockCh, log);
+            }
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index 3419b4cd307..a2f2d4334cd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
 
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -28,12 +29,12 @@ interface GridNioWorker {
     /**
      * @param req Change request.
      */
-    public void offer(GridNioServer.SessionChangeRequest req);
+    public void offer(GridNioServer.SessionChangeRequest req) throws 
IgniteCheckedException;
 
     /**
      * @param reqs Change requests.
      */
-    public void offer(Collection<GridNioServer.SessionChangeRequest> reqs);
+    public void offer(Collection<GridNioServer.SessionChangeRequest> reqs) 
throws IgniteCheckedException;
 
     /**
      * @param ses Session.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index d8adbfd84d1..561941903b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -232,7 +232,7 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
      * @param fut Move future.
      * @return {@code True} if session move was scheduled.
      */
-    boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest 
fut) {
+    boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest 
fut) throws IgniteCheckedException {
         synchronized (this) {
             if (log.isDebugEnabled())
                 log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
@@ -251,7 +251,7 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
     /**
      * @param fut Future.
      */
-    void offerStateChange(GridNioServer.SessionChangeRequest fut) {
+    void offerStateChange(GridNioServer.SessionChangeRequest fut) throws 
IgniteCheckedException {
         synchronized (this) {
             if (log.isDebugEnabled())
                 log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
@@ -295,7 +295,7 @@ public class GridSelectorNioSessionImpl extends 
GridNioSessionImpl implements Gr
     /**
      * @param moveTo New session worker.
      */
-    void finishMoveSession(GridNioWorker moveTo) {
+    void finishMoveSession(GridNioWorker moveTo) throws IgniteCheckedException 
{
         synchronized (this) {
             assert worker == null;
 

Reply via email to