This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a06522dce2d IGNITE-28459 Fixed unhandled NIO Session Requests during
node stop (#12999)
a06522dce2d is described below
commit a06522dce2d39a9ce22b775157776e36c25476e4
Author: Mikhail Petrov <[email protected]>
AuthorDate: Thu May 7 00:27:52 2026 +0300
IGNITE-28459 Fixed unhandled NIO Session Requests during node stop (#12999)
---
.../ignite/internal/util/nio/GridNioServer.java | 465 ++++++++++++---------
.../ignite/internal/util/nio/GridNioWorker.java | 5 +-
.../util/nio/GridSelectorNioSessionImpl.java | 6 +-
3 files changed, 263 insertions(+), 213 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e1268673c14..797290677db 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -585,7 +585,12 @@ public class GridNioServer<T> {
NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl,
NioOperation.CLOSE);
- impl.offerStateChange(fut);
+ try {
+ impl.offerStateChange(fut);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
return fut;
}
@@ -772,7 +777,12 @@ public class GridNioServer<T> {
ses0.procWrite.set(true);
// Wake up worker.
- ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
+ try {
+
ses0.offerStateChange((GridNioServer.SessionChangeRequest)fut0);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to notify NIO Server while resending
messages [rmtNode=" + recoveryDesc.node().id() + ']', e);
+ }
}
}
@@ -804,8 +814,13 @@ public class GridNioServer<T> {
SessionMoveFuture fut = new SessionMoveFuture(ses0, to);
- if (!ses0.offerMove(clientWorkers.get(from), fut))
- fut.onDone(false);
+ try {
+ if (!ses0.offerMove(clientWorkers.get(from), fut))
+ fut.onDone(false);
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
}
/**
@@ -813,7 +828,7 @@ public class GridNioServer<T> {
* @param op Operation.
* @return Future for operation.
*/
- private IgniteInternalFuture<?> pauseResumeReads(GridNioSession ses,
NioOperation op) {
+ private IgniteInternalFuture<?> pauseResumeReads(GridNioSession ses,
NioOperation op) throws IgniteCheckedException {
assert ses instanceof GridSelectorNioSessionImpl;
assert op == NioOperation.PAUSE_READ || op == NioOperation.RESUME_READ;
@@ -874,9 +889,14 @@ public class GridNioServer<T> {
opFut.msg = p;
- clientWorkers.get(i).offer(opFut);
+ try {
+ clientWorkers.get(i).offer(opFut);
- fut.add(opFut);
+ fut.add(opFut);
+ }
+ catch (IgniteCheckedException e) {
+ fut.add(new GridFinishedFuture<>(e));
+ }
}
fut.markInitialized();
@@ -919,9 +939,14 @@ public class GridNioServer<T> {
opFut.msg = p;
- clientWorkers.get(i).offer(opFut);
+ try {
+ clientWorkers.get(i).offer(opFut);
- fut.add(opFut);
+ fut.add(opFut);
+ }
+ catch (IgniteCheckedException e) {
+ fut.add(new GridFinishedFuture<>(e));
+ }
}
fut.markInitialized();
@@ -967,7 +992,7 @@ public class GridNioServer<T> {
return new GridFinishedFuture<>(
new IgniteCheckedException("Failed to create session,
server is stopped."));
}
- catch (IOException e) {
+ catch (IgniteCheckedException | IOException e) {
return new GridFinishedFuture<>(e);
}
}
@@ -977,7 +1002,10 @@ public class GridNioServer<T> {
* @param meta Session meta.
*/
public IgniteInternalFuture<GridNioSession> cancelConnect(final
SocketChannel ch, Map<Integer, ?> meta) {
- if (!closed) {
+ if (closed)
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed
to cancel connection, server is stopped"));
+
+ try {
NioOperationFuture<GridNioSession> req = new
NioOperationFuture<>(ch, false, meta);
req.op = NioOperation.CANCEL_CONNECT;
@@ -990,9 +1018,9 @@ public class GridNioServer<T> {
return req;
}
- else
- return new GridFinishedFuture<>(
- new IgniteCheckedException("Failed to cancel connection,
server is stopped."));
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(new IgniteCheckedException("Failed
to cancel connection", e));
+ }
}
/**
@@ -1083,7 +1111,7 @@ public class GridNioServer<T> {
* @param req Request to balance.
* @param meta Session metadata.
*/
- private synchronized void offerBalanced(NioOperationFuture req, @Nullable
Map<Integer, Object> meta) {
+ private synchronized void offerBalanced(NioOperationFuture req, @Nullable
Map<Integer, Object> meta) throws IgniteCheckedException {
assert req.operation() == NioOperation.REGISTER || req.operation() ==
NioOperation.CONNECT : req;
assert req.socketChannel() != null : req;
@@ -2038,7 +2066,10 @@ public class GridNioServer<T> {
*
* @param req Change request.
*/
- @Override public void offer(SessionChangeRequest req) {
+ @Override public void offer(SessionChangeRequest req) throws
IgniteCheckedException {
+ if (isCancelled())
+ throw new IgniteCheckedException("NIO client worker has been
stopped");
+
changeReqs.offer(req);
if (select)
@@ -2046,7 +2077,10 @@ public class GridNioServer<T> {
}
/** {@inheritDoc} */
- @Override public void offer(Collection<SessionChangeRequest> reqs) {
+ @Override public void offer(Collection<SessionChangeRequest> reqs)
throws IgniteCheckedException {
+ if (isCancelled())
+ throw new IgniteCheckedException("NIO client worker has been
stopped");
+
for (SessionChangeRequest req : reqs)
changeReqs.offer(req);
@@ -2078,185 +2112,19 @@ public class GridNioServer<T> {
*
* @throws IgniteCheckedException If IOException occurred or thread
was unable to add worker to workers pool.
*/
- @SuppressWarnings("unchecked")
private void bodyInternal() throws IgniteCheckedException,
InterruptedException {
try {
long lastIdleCheck = U.currentTimeMillis();
- mainLoop:
- while (!closed && selector.isOpen()) {
- SessionChangeRequest req0;
+ while (selector.isOpen() && !(isCancelled() &&
changeReqs.isEmpty())) {
+ SessionChangeRequest req;
updateHeartbeat();
- while ((req0 = changeReqs.poll()) != null) {
+ while ((req = changeReqs.poll()) != null) {
updateHeartbeat();
- switch (req0.operation()) {
- case CONNECT: {
- NioOperationFuture fut =
(NioOperationFuture)req0;
-
- SocketChannel ch = fut.socketChannel();
-
- try {
- ch.register(selector,
SelectionKey.OP_CONNECT, fut);
- }
- catch (IOException e) {
- fut.onDone(new
IgniteCheckedException("Failed to register channel on selector", e));
- }
-
- break;
- }
-
- case CANCEL_CONNECT: {
- NioOperationFuture req =
(NioOperationFuture)req0;
-
- SocketChannel ch = req.socketChannel();
-
- SelectionKey key = ch.keyFor(selector);
-
- if (key != null)
- key.cancel();
-
- U.closeQuiet(ch);
-
- req.onDone();
-
- break;
- }
-
- case REGISTER: {
- register((NioOperationFuture)req0);
-
- break;
- }
-
- case MOVE: {
- SessionMoveFuture f = (SessionMoveFuture)req0;
-
- GridSelectorNioSessionImpl ses = f.session();
-
- if (idx == f.toIdx) {
- assert f.movedSocketChannel() != null : f;
-
- boolean add = workerSessions.add(ses);
-
- assert add;
-
- ses.finishMoveSession(this);
-
- if (idx % 2 == 0)
- readerMoveCnt.incrementAndGet();
- else
- writerMoveCnt.incrementAndGet();
-
- SelectionKey key =
f.movedSocketChannel().register(selector,
- SelectionKey.OP_READ |
SelectionKey.OP_WRITE,
- ses);
-
- ses.key(key);
-
- ses.procWrite.set(true);
-
- f.onDone(true);
- }
- else {
- assert f.movedSocketChannel() == null : f;
-
- if (workerSessions.remove(ses)) {
- ses.startMoveSession(this);
-
- SelectionKey key = ses.key();
-
- assert key.channel() != null : key;
-
-
f.movedSocketChannel((SocketChannel)key.channel());
-
- key.cancel();
- commitKeyCancellation();
-
-
clientWorkers.get(f.toIndex()).offer(f);
- }
- else
- f.onDone(false);
- }
-
- break;
- }
-
- case REQUIRE_WRITE: {
-
registerWrite((GridSelectorNioSessionImpl)req0.session());
-
- break;
- }
-
- case CLOSE: {
- NioOperationFuture req =
(NioOperationFuture)req0;
-
- if (close(req.session(), null))
- req.onDone(true);
- else
- req.onDone(false);
-
- break;
- }
-
- case PAUSE_READ: {
- NioOperationFuture req =
(NioOperationFuture)req0;
-
- SelectionKey key = req.session().key();
-
- if (key.isValid()) {
- key.interestOps(key.interestOps() &
(~SelectionKey.OP_READ));
-
- GridSelectorNioSessionImpl ses =
(GridSelectorNioSessionImpl)key.attachment();
-
- ses.readsPaused(true);
-
- req.onDone(true);
- }
- else
- req.onDone(false);
-
- break;
- }
-
- case RESUME_READ: {
- NioOperationFuture req =
(NioOperationFuture)req0;
-
- SelectionKey key = req.session().key();
-
- if (key.isValid()) {
- key.interestOps(key.interestOps() |
SelectionKey.OP_READ);
-
- GridSelectorNioSessionImpl ses =
(GridSelectorNioSessionImpl)key.attachment();
-
- ses.readsPaused(false);
-
- req.onDone(true);
- }
- else
- req.onDone(false);
-
- break;
- }
-
- case DUMP_STATS: {
- NioOperationFuture req =
(NioOperationFuture)req0;
-
- IgnitePredicate<GridNioSession> p =
- req.msg instanceof IgnitePredicate ?
(IgnitePredicate<GridNioSession>)req.msg : null;
-
- StringBuilder sb = new StringBuilder();
-
- try {
- dumpStats(sb, p, p != null);
- }
- finally {
- req.onDone(sb.toString());
- }
- }
- }
+ processSessionChangedRequest(req);
}
for (long i = 0; i < selectorSpins &&
selector.selectedKeys().isEmpty(); i++) {
@@ -2277,7 +2145,7 @@ public class GridNioServer<T> {
}
if (!changeReqs.isEmpty())
- continue mainLoop;
+ break;
// Just in case we do busy selects.
long now = U.currentTimeMillis();
@@ -2289,14 +2157,14 @@ public class GridNioServer<T> {
}
if (isCancelled())
- return;
+ break;
}
// Falling to blocking select.
select = true;
try {
- if (!changeReqs.isEmpty())
+ if (!changeReqs.isEmpty() || isCancelled())
continue;
blockingSectionBegin();
@@ -2370,6 +2238,180 @@ public class GridNioServer<T> {
}
}
+ /** */
+ private void processSessionChangedRequest(SessionChangeRequest req0)
throws IgniteCheckedException, IOException {
+ switch (req0.operation()) {
+ case CONNECT: {
+ NioOperationFuture fut = (NioOperationFuture)req0;
+
+ SocketChannel ch = fut.socketChannel();
+
+ try {
+ ch.register(selector, SelectionKey.OP_CONNECT, fut);
+ }
+ catch (IOException e) {
+ fut.onDone(new IgniteCheckedException("Failed to
register channel on selector", e));
+ }
+
+ break;
+ }
+
+ case CANCEL_CONNECT: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SocketChannel ch = req.socketChannel();
+
+ SelectionKey key = ch.keyFor(selector);
+
+ if (key != null)
+ key.cancel();
+
+ U.closeQuiet(ch);
+
+ req.onDone();
+
+ break;
+ }
+
+ case REGISTER: {
+ register((NioOperationFuture)req0);
+
+ break;
+ }
+
+ case MOVE: {
+ SessionMoveFuture f = (SessionMoveFuture)req0;
+
+ GridSelectorNioSessionImpl ses = f.session();
+
+ if (idx == f.toIdx) {
+ assert f.movedSocketChannel() != null : f;
+
+ boolean add = workerSessions.add(ses);
+
+ assert add;
+
+ ses.finishMoveSession(this);
+
+ if (idx % 2 == 0)
+ readerMoveCnt.incrementAndGet();
+ else
+ writerMoveCnt.incrementAndGet();
+
+ SelectionKey key =
f.movedSocketChannel().register(selector,
+ SelectionKey.OP_READ | SelectionKey.OP_WRITE,
+ ses);
+
+ ses.key(key);
+
+ ses.procWrite.set(true);
+
+ f.onDone(true);
+ }
+ else {
+ assert f.movedSocketChannel() == null : f;
+
+ if (workerSessions.remove(ses)) {
+ ses.startMoveSession(this);
+
+ SelectionKey key = ses.key();
+
+ assert key.channel() != null : key;
+
+ f.movedSocketChannel((SocketChannel)key.channel());
+
+ key.cancel();
+ commitKeyCancellation();
+
+ clientWorkers.get(f.toIndex()).offer(f);
+ }
+ else
+ f.onDone(false);
+ }
+
+ break;
+ }
+
+ case REQUIRE_WRITE: {
+ registerWrite((GridSelectorNioSessionImpl)req0.session());
+
+ break;
+ }
+
+ case CLOSE: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ if (close(req.session(), null))
+ req.onDone(true);
+ else
+ req.onDone(false);
+
+ break;
+ }
+
+ case PAUSE_READ: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SelectionKey key = req.session().key();
+
+ if (key.isValid()) {
+ key.interestOps(key.interestOps() &
(~SelectionKey.OP_READ));
+
+ GridSelectorNioSessionImpl ses =
(GridSelectorNioSessionImpl)key.attachment();
+
+ ses.readsPaused(true);
+
+ req.onDone(true);
+ }
+ else
+ req.onDone(false);
+
+ break;
+ }
+
+ case RESUME_READ: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ SelectionKey key = req.session().key();
+
+ if (key.isValid()) {
+ key.interestOps(key.interestOps() |
SelectionKey.OP_READ);
+
+ GridSelectorNioSessionImpl ses =
(GridSelectorNioSessionImpl)key.attachment();
+
+ ses.readsPaused(false);
+
+ req.onDone(true);
+ }
+ else
+ req.onDone(false);
+
+ break;
+ }
+
+ case DUMP_STATS: {
+ NioOperationFuture req = (NioOperationFuture)req0;
+
+ IgnitePredicate<GridNioSession> p =
+ req.msg instanceof IgnitePredicate ?
(IgnitePredicate<GridNioSession>)req.msg : null;
+
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ dumpStats(sb, p, p != null);
+ }
+ finally {
+ req.onDone(sb.toString());
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return closed || super.isCancelled();
+ }
+
/**
* Makes sure that pending key cancellations are executed and the
corresponding channels can be
* re-registered with our selector without causing {@link
java.nio.channels.CancelledKeyException}s.
@@ -2765,32 +2807,32 @@ public class GridNioServer<T> {
SelectionKey key;
- if (!sockCh.isRegistered()) {
- assert fut.op == NioOperation.REGISTER : fut.op;
+ try {
+ if (!sockCh.isRegistered()) {
+ assert fut.op == NioOperation.REGISTER : fut.op;
- key = sockCh.register(selector, SelectionKey.OP_READ, ses);
+ key = sockCh.register(selector, SelectionKey.OP_READ,
ses);
- ses.key(key);
+ ses.key(key);
- resend(ses);
- }
- else {
- assert fut.op == NioOperation.CONNECT : fut.op;
+ resend(ses);
+ }
+ else {
+ assert fut.op == NioOperation.CONNECT : fut.op;
- key = sockCh.keyFor(selector);
+ key = sockCh.keyFor(selector);
- key.attach(ses);
+ key.attach(ses);
- key.interestOps(key.interestOps() &
(~SelectionKey.OP_CONNECT));
- key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+ key.interestOps(key.interestOps() &
(~SelectionKey.OP_CONNECT));
+ key.interestOps(key.interestOps() |
SelectionKey.OP_READ);
- ses.key(key);
- }
+ ses.key(key);
+ }
- sessions.add(ses);
- workerSessions.add(ses);
+ sessions.add(ses);
+ workerSessions.add(ses);
- try {
filterChain.onSessionOpened(ses);
fut.onDone(ses);
@@ -3231,7 +3273,14 @@ public class GridNioServer<T> {
* @param sockCh Socket channel to be registered on one of the
selectors.
*/
private void addRegistrationRequest(SocketChannel sockCh) {
- offerBalanced(new NioOperationFuture<>(sockCh, true, null), null);
+ try {
+ offerBalanced(new NioOperationFuture<>(sockCh, true, null),
null);
+ }
+ catch (IgniteCheckedException e) {
+ U.warn(log, "Incoming connection was rejected [addr=" +
sockCh.socket().getRemoteSocketAddress() + ']', e);
+
+ U.close(sockCh, log);
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
index 3419b4cd307..a2f2d4334cd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioWorker.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
import org.jetbrains.annotations.Nullable;
/**
@@ -28,12 +29,12 @@ interface GridNioWorker {
/**
* @param req Change request.
*/
- public void offer(GridNioServer.SessionChangeRequest req);
+ public void offer(GridNioServer.SessionChangeRequest req) throws
IgniteCheckedException;
/**
* @param reqs Change requests.
*/
- public void offer(Collection<GridNioServer.SessionChangeRequest> reqs);
+ public void offer(Collection<GridNioServer.SessionChangeRequest> reqs)
throws IgniteCheckedException;
/**
* @param ses Session.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index d8adbfd84d1..561941903b6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -232,7 +232,7 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
* @param fut Move future.
* @return {@code True} if session move was scheduled.
*/
- boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest
fut) {
+ boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest
fut) throws IgniteCheckedException {
synchronized (this) {
if (log.isDebugEnabled())
log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
@@ -251,7 +251,7 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
/**
* @param fut Future.
*/
- void offerStateChange(GridNioServer.SessionChangeRequest fut) {
+ void offerStateChange(GridNioServer.SessionChangeRequest fut) throws
IgniteCheckedException {
synchronized (this) {
if (log.isDebugEnabled())
log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
@@ -295,7 +295,7 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
/**
* @param moveTo New session worker.
*/
- void finishMoveSession(GridNioWorker moveTo) {
+ void finishMoveSession(GridNioWorker moveTo) throws IgniteCheckedException
{
synchronized (this) {
assert worker == null;