rpuch commented on code in PR #6370:
URL: https://github.com/apache/ignite-3/pull/6370#discussion_r2263141987
##########
modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java:
##########
@@ -176,6 +175,7 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
@Override
public void onDisappeared(ClusterNode member) {
connectionMgr.handleNodeLeft(member.id());
+
nettyBootstrapFactory.handshakeEventLoopSwitcher().nodeLeftTopology(member);
Review Comment:
We need to make sure that switcher's `nodeLeftTopology()` will be called
after all channels are closed in `connectionMgr.handleNodeLeft(member.id())`.
The latter happens asynchronously, so we need to also:
1. Make `ConnectionManager#handleNodeLeft()` return a future
2. That future completes when all channels that
`ConnectionManager#handleNodeLeft()` finish being closed
Then we'll be able to call
`nettyBootstrapFactory.handshakeEventLoopSwitcher().nodeLeftTopology(member)`
after the future completion
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -465,22 +465,25 @@ private void handshake(ChannelHandlerContext ctx,
ClientMessageUnpacker unpacker
return;
}
-
handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channelHandlerContext.channel(),
- () ->
authenticationManager.authenticateAsync(createAuthenticationRequest(clientHandshakeExtensions))
- .handleAsync((user, err) -> {
- if (err != null) {
- handshakeError(ctx, packer, err);
- } else {
- handshakeSuccess(ctx, packer,
user, clientFeatures, clientVer, clientCode);
- }
-
- return null;
- }, ctx.executor()))
- .exceptionally(t -> {
- handshakeError(ctx, packer, t);
-
- return null;
- });
+ AuthenticationRequest<?, ?> authReq =
createAuthenticationRequest(clientHandshakeExtensions);
+
+
handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channelHandlerContext.channel()).whenComplete((unused,
throwable) -> {
+ if (throwable != null) {
+ handshakeError(ctx, packer, throwable);
+
+ return;
+ }
+
+
authenticationManager.authenticateAsync(authReq).handleAsync((user, err) -> {
+ if (err != null) {
+ handshakeError(ctx, packer, err);
+ } else {
+ handshakeSuccess(ctx, packer, user, clientFeatures,
clientVer, clientCode);
+ }
+
+ return null;
+ }, ctx.executor());
+ });
Review Comment:
```suggestion
handshakeEventLoopSwitcher.switchEventLoopIfNeeded(channelHandlerContext.channel())
.thenCompose(unused ->
authenticationManager.authenticateAsync(authReq))
.handleAsync((user, err) -> {
if (err != null) {
handshakeError(ctx, packer, err);
} else {
handshakeSuccess(ctx, packer, user,
clientFeatures, clientVer, clientCode);
}
return null;
}, ctx.executor());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]