This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 93c3636213e IGNITE-27154 Extend topology-aware RAFT client (#7152)
93c3636213e is described below

commit 93c3636213eb926d828b67cef714951b2cbd0925
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Dec 5 19:10:23 2025 +0300

    IGNITE-27154 Extend topology-aware RAFT client (#7152)
---
 .../internal/raft/service/RaftGroupService.java    |   5 +-
 modules/replicator/build.gradle                    |   3 +
 .../PhysicalTopologyAwareRaftGroupService.java     | 511 +++++++++++++++++++++
 .../raft/client/TopologyAwareRaftGroupService.java |   1 +
 ...PhysicalTopologyAwareRaftGroupServiceTest.java} | 275 ++++-------
 .../AbstractTopologyAwareGroupServiceTest.java     |   1 +
 6 files changed, 614 insertions(+), 182 deletions(-)

diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index 72b939f75a6..2dfd5e87a0b 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -57,10 +57,9 @@ public interface RaftGroupService extends RaftCommandRunner {
     @Nullable Peer leader();
 
     /**
-     * Returns a list of voting peers or {@code null} if it has not been yet 
initialized. The order is corresponding to the time of joining
-     *      to the replication group.
+     * Returns a list of voting peers. The order is corresponding to the time 
of joining to the replication group.
      */
-    @Nullable List<Peer> peers();
+    List<Peer> peers();
 
     /**
      * Returns a list of leaners or {@code null} if it has not been yet 
initialized. The order is corresponding to the time of joining to
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index 3b6c111ae8b..f37aa4a4207 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -58,10 +58,13 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-metrics'))
     integrationTestImplementation testFixtures(project(':ignite-raft'))
 
+    testImplementation libs.awaitility
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-placement-driver-api'))
     testImplementation testFixtures(project(':ignite-failure-handler'))
     testImplementation testFixtures(project(':ignite-raft'))
+    testImplementation testFixtures(project(':ignite-configuration'))
+    testImplementation testFixtures(project(':ignite-network'))
 
     testFixturesAnnotationProcessor 
project(':ignite-network-annotation-processor')
     testFixturesImplementation project(':ignite-network-api')
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
new file mode 100644
index 00000000000..4a8512cf659
--- /dev/null
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyEventHandler;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
+import org.apache.ignite.internal.raft.ThrottlingContextHolder;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import 
org.apache.ignite.raft.jraft.rpc.CliRequests.SubscriptionLeaderChangeRequest;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The RAFT group service is based on the cluster physical topology. This 
service has ability to subscribe of a RAFT group leader update.
+ */
+public class PhysicalTopologyAwareRaftGroupService implements RaftGroupService 
{
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(TopologyAwareRaftGroupService.class);
+
+    /** Raft message factory. */
+    private static final RaftMessagesFactory MESSAGES_FACTORY = Loza.FACTORY;
+
+    /** General leader election listener. */
+    private final ServerEventHandler generalLeaderElectionListener;
+
+    /** Failure manager. */
+    private final FailureManager failureManager;
+
+    /** Cluster service. */
+    private final ClusterService clusterService;
+
+    /** RPC RAFT client. */
+    private final RaftGroupService raftClient;
+
+    /** Executor to invoke RPC requests. */
+    private final Executor executor;
+
+    /** RAFT configuration. */
+    private final RaftConfiguration raftConfiguration;
+
+    /**
+     * Constructor.
+     *
+     * @param failureManager Failure manager.
+     * @param clusterService Cluster service.
+     * @param executor Executor to invoke RPC requests and notify listeners.
+     * @param raftConfiguration RAFT configuration.
+     * @param raftClient RPC RAFT client.
+     * @param eventsClientListener Events client listener.
+     */
+    private PhysicalTopologyAwareRaftGroupService(
+            FailureManager failureManager,
+            ClusterService clusterService,
+            Executor executor,
+            RaftConfiguration raftConfiguration,
+            RaftGroupService raftClient,
+            RaftGroupEventsClientListener eventsClientListener
+    ) {
+        this.failureManager = failureManager;
+        this.clusterService = clusterService;
+        this.executor = executor;
+        this.raftConfiguration = raftConfiguration;
+        this.raftClient = raftClient;
+
+        this.generalLeaderElectionListener = new ServerEventHandler(executor);
+
+        eventsClientListener.addLeaderElectionListener(raftClient.groupId(), 
generalLeaderElectionListener);
+
+        clusterService.topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(InternalClusterNode member) {
+                CompletableFuture<Boolean> fut = 
changeNodeSubscriptionIfNeed(member, true);
+
+                requestLeaderManually(clusterService, executor, raftClient, 
fut);
+            }
+        });
+
+        ArrayList<CompletableFuture<?>> futures = new ArrayList<>();
+
+        for (InternalClusterNode member : 
clusterService.topologyService().allMembers()) {
+            futures.add(changeNodeSubscriptionIfNeed(member, true));
+        }
+
+        requestLeaderManually(clusterService, executor, raftClient, 
CompletableFutures.allOf(futures));
+    }
+
+    /**
+     * Requests the leader information for the RAFT group.
+     * TODO: IGNITE-27256 Remove the method after implementing a notification 
after subscription.
+     *
+     * @param clusterService Cluster service to retrieve topology information.
+     * @param executor Executor to run asynchronous tasks.
+     * @param raftClient RAFT client to interact with the RAFT group.
+     * @param subscriptionsFut Future representing the completion of 
subscription updates.
+     */
+    private void requestLeaderManually(
+            ClusterService clusterService,
+            Executor executor,
+            RaftGroupService raftClient,
+            CompletableFuture<?> subscriptionsFut
+    ) {
+        subscriptionsFut.thenRunAsync(
+                () -> 
raftClient.refreshAndGetLeaderWithTerm().whenCompleteAsync(
+                        (leaderWithTerm, throwable) -> {
+                            if (throwable != null) {
+                                LOG.warn("Could not refresh and get leader 
with term [grp={}].", groupId(), throwable);
+                            }
+
+                            InternalClusterNode leaderHost = 
clusterService.topologyService()
+                                    
.getByConsistentId(leaderWithTerm.leader().consistentId());
+
+                            if (leaderHost != null) {
+                                generalLeaderElectionListener.onLeaderElected(
+                                        leaderHost,
+                                        leaderWithTerm.term()
+                                );
+                            } else {
+                                LOG.warn("Leader host occurred to leave the 
topology [nodeId = {}].",
+                                        
leaderWithTerm.leader().consistentId());
+                            }
+                        }, executor),
+                executor);
+    }
+
+    private CompletableFuture<Boolean> 
changeNodeSubscriptionIfNeed(InternalClusterNode member, boolean subscribe) {
+        Peer peer = new Peer(member.name());
+
+        if (peers().contains(peer)) {
+            SubscriptionLeaderChangeRequest msg = 
subscriptionLeaderChangeRequest(subscribe);
+
+            return sendMessage(member, msg).whenComplete((isSent, err) -> {
+                if (err != null) {
+                    failureManager.process(new FailureContext(err, "Could not 
change subscription to leader updates [grp="
+                            + groupId() + "]."));
+                }
+
+                LOG.info("Subscription status changed for the peer [grp={}, 
consistentId={}, subscribe={}, isSent={}].",
+                        groupId(), member.name(), subscribe, isSent);
+            });
+        }
+
+        return CompletableFutures.booleanCompletedFuture(false);
+    }
+
+    /**
+     * Starts a new instance of the PhysicalTopologyAwareRaftGroupService.
+     *
+     * @param groupId The ID of the RAFT group.
+     * @param cluster Cluster service for communication.
+     * @param factory Factory for creating RAFT messages.
+     * @param raftConfiguration Configuration for the RAFT group.
+     * @param configuration Peers and learners configuration.
+     * @param executor Executor for asynchronous tasks.
+     * @param eventsClientListener Listener for RAFT group events.
+     * @param cmdMarshaller Marshaller for RAFT commands.
+     * @param stoppingExceptionFactory Factory for creating stopping 
exceptions.
+     * @param throttlingContextHolder Context holder for throttling.
+     * @param failureManager Manager for handling failures.
+     * @return A new instance of PhysicalTopologyAwareRaftGroupService.
+     */
+    public static PhysicalTopologyAwareRaftGroupService start(
+            ReplicationGroupId groupId,
+            ClusterService cluster,
+            RaftMessagesFactory factory,
+            RaftConfiguration raftConfiguration,
+            PeersAndLearners configuration,
+            ScheduledExecutorService executor,
+            RaftGroupEventsClientListener eventsClientListener,
+            Marshaller cmdMarshaller,
+            ExceptionFactory stoppingExceptionFactory,
+            ThrottlingContextHolder throttlingContextHolder,
+            FailureManager failureManager
+    ) {
+        return new PhysicalTopologyAwareRaftGroupService(
+                failureManager,
+                cluster,
+                executor,
+                raftConfiguration,
+                RaftGroupServiceImpl.start(
+                        groupId,
+                        cluster,
+                        factory,
+                        raftConfiguration,
+                        configuration,
+                        executor,
+                        cmdMarshaller,
+                        stoppingExceptionFactory,
+                        throttlingContextHolder
+                ),
+                eventsClientListener
+        );
+    }
+
+    private void finishSubscriptions() {
+        for (InternalClusterNode member : 
clusterService.topologyService().allMembers()) {
+            changeNodeSubscriptionIfNeed(member, false);
+        }
+    }
+
+    public void subscribeLeader(LeaderElectionListener callback) {
+        generalLeaderElectionListener.addCallbackAndNotify(callback);
+    }
+
+    public void unsubscribeLeader(LeaderElectionListener callback) {
+        generalLeaderElectionListener.removeCallbackAndNotify(callback);
+    }
+
+    private SubscriptionLeaderChangeRequest 
subscriptionLeaderChangeRequest(boolean subscribe) {
+        return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
+                .groupId(groupId())
+                .subscribe(subscribe)
+                .build();
+    }
+
+    private CompletableFuture<Boolean> sendMessage(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg) {
+        var msgSendFut = new CompletableFuture<Boolean>();
+
+        sendWithRetry(node, msg, msgSendFut);
+
+        return msgSendFut;
+    }
+
+    private void sendWithRetry(InternalClusterNode node, 
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
+        Long responseTimeout = 
raftConfiguration.responseTimeoutMillis().value();
+
+        clusterService.messagingService().invoke(node, msg, 
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
+            if (invokeThrowable == null) {
+                msgSendFut.complete(true);
+
+                return;
+            }
+
+            Throwable invokeCause = unwrapCause(invokeThrowable);
+            if (!msg.subscribe()) {
+                // We don't want to propagate exceptions when unsubscribing 
(if it's not an Error!).
+                if (invokeCause instanceof Error) {
+                    msgSendFut.completeExceptionally(invokeThrowable);
+                } else {
+                    LOG.debug("An exception while trying to unsubscribe.", 
invokeThrowable);
+
+                    msgSendFut.complete(false);
+                }
+            } else if (recoverable(invokeCause)) {
+                sendWithRetry(node, msg, msgSendFut);
+            } else if (invokeCause instanceof RecipientLeftException) {
+                LOG.info(
+                        "Could not subscribe to leader update from a specific 
node, because the node had left the cluster: [node={}].",
+                        node
+                );
+
+                msgSendFut.complete(false);
+            } else if (invokeCause instanceof NodeStoppingException) {
+                msgSendFut.complete(false);
+            } else {
+                LOG.error("Could not send the subscribe message to the node: 
[node={}, msg={}].", invokeThrowable, node, msg);
+
+                msgSendFut.completeExceptionally(invokeThrowable);
+            }
+        }, executor);
+    }
+
+    private static boolean recoverable(Throwable t) {
+        t = unwrapCause(t);
+
+        return t instanceof TimeoutException
+                || t instanceof IOException
+                || t instanceof PeerUnavailableException
+                || t instanceof RecipientLeftException;
+    }
+
+    @Override
+    public ReplicationGroupId groupId() {
+        return raftClient.groupId();
+    }
+
+    @Override
+    public @Nullable Peer leader() {
+        return raftClient.leader();
+    }
+
+    @Override
+    public List<Peer> peers() {
+        return raftClient.peers();
+    }
+
+    @Override
+    public @Nullable List<Peer> learners() {
+        return raftClient.learners();
+    }
+
+    @Override
+    public CompletableFuture<Void> refreshLeader() {
+        return raftClient.refreshLeader();
+    }
+
+    @Override
+    public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
+        return raftClient.refreshAndGetLeaderWithTerm();
+    }
+
+    @Override
+    public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
+        return raftClient.refreshMembers(onlyAlive);
+    }
+
+    @Override
+    public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
+        return raftClient.addPeer(peer, sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
+        return raftClient.removePeer(peer, sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners 
peersAndLearners, long term, long sequenceToken) {
+        return raftClient.changePeersAndLearners(peersAndLearners, term, 
sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> 
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long 
sequenceToken) {
+        return raftClient.changePeersAndLearnersAsync(peersAndLearners, term, 
sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> addLearners(Collection<Peer> learners, long 
sequenceToken) {
+        return raftClient.addLearners(learners, sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeLearners(Collection<Peer> learners, 
long sequenceToken) {
+        return raftClient.removeLearners(learners, sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> resetLearners(Collection<Peer> learners, 
long sequenceToken) {
+        return raftClient.resetLearners(learners, sequenceToken);
+    }
+
+    @Override
+    public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
+        return raftClient.snapshot(peer, forced);
+    }
+
+    @Override
+    public CompletableFuture<Void> transferLeadership(Peer newLeader) {
+        return raftClient.transferLeadership(newLeader);
+    }
+
+    @Override
+    public void shutdown() {
+        finishSubscriptions();
+
+        raftClient.shutdown();
+    }
+
+    @Override
+    public CompletableFuture<Long> readIndex() {
+        return raftClient.readIndex();
+    }
+
+    @Override
+    public ClusterService clusterService() {
+        return raftClient.clusterService();
+    }
+
+    @Override
+    public void updateConfiguration(PeersAndLearners configuration) {
+        raftClient.updateConfiguration(configuration);
+    }
+
+    @Override
+    public <R> CompletableFuture<R> run(Command cmd) {
+        return raftClient.run(cmd);
+    }
+
+    @Override
+    public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+        return raftClient.run(cmd, timeoutMillis);
+    }
+
+    /**
+     * Leader election handler.
+     */
+    private static class ServerEventHandler implements LeaderElectionListener {
+        private final Executor executor;
+
+        CompletableFuture<Void> fut = CompletableFutures.nullCompletedFuture();
+
+        /** A leader elected callback. */
+        private final ArrayList<LeaderElectionListener> callbacks = new 
ArrayList<>();
+
+        /** Last elected leader. */
+        private InternalClusterNode leaderNode = null;
+
+        /** Term of the last elected leader. */
+        private long leaderTerm = -1;
+
+        /**
+         * Constructor.
+         *
+         * @param executor Executor.
+         */
+        ServerEventHandler(Executor executor) {
+            this.executor = executor;
+        }
+
+        /**
+         * Notifies about a new leader elected, if it did not make before.
+         *
+         * @param node Node.
+         * @param term Term.
+         */
+        @Override
+        public synchronized void onLeaderElected(InternalClusterNode node, 
long term) {
+            if (term > leaderTerm) {
+                leaderTerm = term;
+                leaderNode = node;
+
+                if (callbacks.isEmpty()) {
+                    return;
+                }
+
+                ArrayList<LeaderElectionListener> listeners = new 
ArrayList<>(callbacks);
+
+                // Avoid notifying in the synchronized block.
+                if (fut.isDone()) {
+                    fut = runAsync(() -> {
+                        for (LeaderElectionListener listener : listeners) {
+                            listener.onLeaderElected(node, term);
+                        }
+                    }, executor);
+                } else {
+                    fut = fut.thenRunAsync(() -> {
+                        for (LeaderElectionListener listener : listeners) {
+                            listener.onLeaderElected(node, term);
+                        }
+                    }, executor);
+                }
+            }
+        }
+
+        synchronized void addCallbackAndNotify(LeaderElectionListener 
callback) {
+            callbacks.add(callback);
+
+            if (leaderTerm != -1) {
+                long finalLeaderTerm = this.leaderTerm;
+                InternalClusterNode finalLeaderNode = this.leaderNode;
+
+                // Notify about the current leader outside of the synchronized 
block.
+                if (fut.isDone()) {
+                    fut = runAsync(() -> 
callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), executor);
+                } else {
+                    fut = fut.thenRunAsync(() -> 
callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), executor);
+                }
+            }
+        }
+
+        synchronized void removeCallbackAndNotify(LeaderElectionListener 
callback) {
+            callbacks.remove(callback);
+        }
+    }
+}
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 2699db88ecd..aeab8e01454 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -61,6 +61,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * RAFT client aware of a logical topology to handle distributed events.
+ * TODO: IGNITE-27257 Refactor the class to make it more readable and 
maintainable.
  */
 public class TopologyAwareRaftGroupService implements RaftGroupService {
 
diff --git 
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
similarity index 64%
copy from 
modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
copy to 
modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
index 589cac5bc5c..5b529d27e02 100644
--- 
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
@@ -21,12 +21,11 @@ import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.raft.TestThrottlingContextHolder.throttlingContextHolder;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Path;
@@ -41,15 +40,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.raft.LeaderElectionListener;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftNodeId;
@@ -64,8 +63,8 @@ import 
org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.topology.TestLogicalTopologyService;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.NetworkAddress;
@@ -81,23 +80,22 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
- * Abstract class containing test scenarios for {@link 
TopologyAwareRaftGroupService} related test classes.
+ * Tests for {@link PhysicalTopologyAwareRaftGroupService}.
  */
 @ExtendWith(ConfigurationExtension.class)
-public abstract class AbstractTopologyAwareGroupServiceTest extends 
IgniteAbstractTest {
+public class PhysicalTopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
     /** RAFT message factory. */
     private static final RaftMessagesFactory FACTORY = new 
RaftMessagesFactory();
 
     /** Base node port. */
     private static final int PORT_BASE = 1234;
 
-    /** Wait timeout, in milliseconds. */
-    protected static final int WAIT_TIMEOUT_MILLIS = 10_000;
+    private static final TestReplicationGroupId GROUP_ID = new 
TestReplicationGroupId("group_1");
 
-    protected static final TestReplicationGroupId GROUP_ID = new 
TestReplicationGroupId("group_1");
+    private static final FailureManager NOOP_FAILURE_PROCESSOR = new 
FailureManager(new NoOpFailureHandler());
 
     /** RPC executor. */
-    protected final ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(
+    private final ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(
             20,
             IgniteThreadFactory.create("Test", "Raft-Group-Client", log)
     );
@@ -108,63 +106,23 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
 
     private final Map<NetworkAddress, LogStorageFactory> logStorageFactories = 
new HashMap<>();
 
-    private final List<TopologyAwareRaftGroupService> raftClients = new 
ArrayList<>();
+    private final List<PhysicalTopologyAwareRaftGroupService> raftClients = 
new ArrayList<>();
 
     @InjectConfiguration
-    protected RaftConfiguration raftConfiguration;
+    private RaftConfiguration raftConfiguration;
 
     @AfterEach
-    protected void tearDown() throws Exception {
+    public void afterTest() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
 
         stopCluster();
     }
 
-    /**
-     * The method is called after every node of the cluster starts.
-     *
-     * @param nodeName Node name.
-     * @param clusterService Cluster service.
-     * @param dataPath Data path for raft node.
-     * @param peersAndLearners Peers and learners.
-     * @param eventsClientListener Raft events listener for client.
-     * @param logicalTopologyService Logical topology service.
-     */
-    protected abstract void afterNodeStart(
-            String nodeName,
-            ClusterService clusterService,
-            Path dataPath,
-            PeersAndLearners peersAndLearners,
-            RaftGroupEventsClientListener eventsClientListener,
-            LogicalTopologyService logicalTopologyService
-    );
-
-    /**
-     * Checks the condition after cluster and raft clients initialization.
-     *
-     * @param leaderName Current leader name.
-     */
-    protected abstract void afterClusterInit(String leaderName) throws 
InterruptedException;
-
-    /**
-     * Checks the condition after leader change.
-     *
-     * @param leaderName Current leader name.
-     */
-    protected abstract void afterLeaderChange(String leaderName) throws 
InterruptedException;
-
-    /**
-     * The method is called after every node of the cluster stops.
-     *
-     * @param nodeName Node name.
-     */
-    protected abstract void afterNodeStop(String nodeName) throws Exception;
-
     @Test
     public void testOneNodeReplicationGroup(TestInfo testInfo) throws 
Exception {
         int nodes = 2;
 
-        TopologyAwareRaftGroupService raftClient = startCluster(
+        PhysicalTopologyAwareRaftGroupService raftClient = startCluster(
                 testInfo,
                 addr -> true,
                 nodes,
@@ -175,21 +133,24 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
 
         CompletableFuture<InternalClusterNode> leaderFut = new 
CompletableFuture<>();
 
-        subscribeLeader(raftClient, (node, term) -> leaderFut.complete(node), 
"New leader: {}");
+        raftClient.subscribeLeader((node, term) -> {
+            log.info("Received leader node: {}", node);
 
-        InternalClusterNode leader = leaderFut.get(WAIT_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
+            leaderFut.complete(node);
+        });
 
-        assertNotNull(leader);
+        assertThat(leaderFut, willCompleteSuccessfully());
+
+        InternalClusterNode leader = leaderFut.get();
 
-        afterClusterInit(leader.name());
+        assertNotNull(leader);
 
         // Below we check that leaderElectionCallback is called once only.
         AtomicInteger leaderElectionCallbackCallsCounter = new 
AtomicInteger(0);
         raftClient.subscribeLeader((leader0, term) -> 
leaderElectionCallbackCallsCounter.incrementAndGet());
 
-        // Leader election callback triggering is asynchronous, thus it's 
required to give it some time to be called. With 1 second await
-        // interval the test will fail 100/100 if there's no fix.
-        Thread.sleep(1_000);
+        // Leader election callback triggering is asynchronous, thus it's 
required to give it some time to be called.
+        await().until(() -> leaderElectionCallbackCallsCounter.get() == 1);
 
         assertEquals(1, leaderElectionCallbackCallsCounter.get());
     }
@@ -205,11 +166,11 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
      *     client.
      * @return Raft clients.
      */
-    private IgniteBiTuple<TopologyAwareRaftGroupService, 
TopologyAwareRaftGroupService> 
startClusterWithClientsAndSubscribeToLeaderChange(
+    private IgniteBiTuple<PhysicalTopologyAwareRaftGroupService, 
PhysicalTopologyAwareRaftGroupService> startClusterWithClientsAndSubscribe(
             TestInfo testInfo,
             AtomicReference<InternalClusterNode> leaderRef,
             AtomicReference<InternalClusterNode> leaderRefNoInitialNotify
-    ) throws Exception {
+    ) {
         int nodes = 3;
 
         assertTrue(clusterServices.isEmpty());
@@ -218,16 +179,16 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
         Predicate<NetworkAddress> isServerAddress = addr -> true;
 
         // Start cluster and the first topology aware client.
-        TopologyAwareRaftGroupService raftClient = startCluster(
+        PhysicalTopologyAwareRaftGroupService firstRaftClient = startCluster(
                 testInfo,
                 isServerAddress,
                 nodes,
                 PORT_BASE
         );
 
-        assertNotNull(raftClient);
+        assertNotNull(firstRaftClient);
 
-        raftClient.refreshLeader().get();
+        assertThat(firstRaftClient.refreshLeader(), 
willCompleteSuccessfully());
 
         // Start client service for the second client.
         int clientPort = PORT_BASE + nodes + 1;
@@ -236,18 +197,12 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
         assertThat(clientClusterService.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
         // Start the second topology aware client, that should not get the 
initial leader notification.
-        TopologyAwareRaftGroupService raftClientNoInitialNotify = 
startTopologyAwareClient(
+        PhysicalTopologyAwareRaftGroupService secondRaftClient = 
startTopologyAwareClient(
                 clientClusterService,
-                clusterServices,
-                isServerAddress,
-                nodes,
-                null,
-                new TestLogicalTopologyService(clientClusterService),
-                false
+                peersAndLearners(testInfo, isServerAddress, nodes),
+                null
         );
 
-        raftClientNoInitialNotify.refreshLeader().get();
-
         List<NetworkAddress> clientAddress = findLocalAddresses(clientPort, 
clientPort + 1);
         assertEquals(1, clientAddress.size());
         clusterServices.put(clientAddress.get(0), clientClusterService);
@@ -255,52 +210,49 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
         AtomicInteger callsCount = new AtomicInteger();
 
         // Subscribing clients.
-        subscribeLeader(raftClient, (node, term) -> leaderRef.set(node), "New 
leader: {}");
+        firstRaftClient.subscribeLeader((leader, term) -> {
+            log.info("First client got notification [node={}, term={}].", 
leader, term);
 
-        for (int i = 0; i < 2; i++) {
-            unsubscribeLeader(raftClientNoInitialNotify);
+            leaderRef.set(leader);
+        });
 
-            subscribeLeader(raftClientNoInitialNotify, (node, term) -> {
-                callsCount.incrementAndGet();
-                leaderRefNoInitialNotify.set(node);
-            }, "New leader (client without initial notification): {}");
-        }
+        secondRaftClient.subscribeLeader((node, term) -> {
+            log.info("Second client got notification [node={}, term={}].", 
node, term);
 
-        // Checking invariants.
-        assertTrue(callsCount.get() <= 1);
+            callsCount.incrementAndGet();
+            leaderRefNoInitialNotify.set(node);
+        });
 
-        assertTrue(waitForCondition(() -> leaderRef.get() != null, 
WAIT_TIMEOUT_MILLIS));
+        await().until(() -> leaderRef.get() != null);
 
         InternalClusterNode leader = leaderRef.get();
 
-        assertNotNull(leader);
-
         log.info("Leader: " + leader);
 
-        afterClusterInit(leader.name());
+        // Checking invariants.
+        await().until(() -> 
leaderRef.get().equals(leaderRefNoInitialNotify.get()));
+        assertEquals(1, callsCount.get());
 
-        raftClients.add(raftClientNoInitialNotify);
+        raftClients.add(secondRaftClient);
 
-        return new IgniteBiTuple<>(raftClient, raftClientNoInitialNotify);
+        return new IgniteBiTuple<>(firstRaftClient, secondRaftClient);
     }
 
     @Test
     public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws 
Exception {
-        AtomicReference<InternalClusterNode> leaderRef = new 
AtomicReference<>();
-        AtomicReference<InternalClusterNode> leaderRefNoInitialNotify = new 
AtomicReference<>();
-
-        IgniteBiTuple<TopologyAwareRaftGroupService, 
TopologyAwareRaftGroupService> raftClients =
-                startClusterWithClientsAndSubscribeToLeaderChange(
-                    testInfo,
-                    leaderRef,
-                    leaderRefNoInitialNotify
-        );
+        AtomicReference<InternalClusterNode> firstLeaderRef = new 
AtomicReference<>();
+        AtomicReference<InternalClusterNode> secondLeaderRef = new 
AtomicReference<>();
 
-        TopologyAwareRaftGroupService raftClientNoInitialNotify = 
raftClients.get2();
+        IgniteBiTuple<PhysicalTopologyAwareRaftGroupService, 
PhysicalTopologyAwareRaftGroupService> raftClients =
+                startClusterWithClientsAndSubscribe(
+                        testInfo,
+                        firstLeaderRef,
+                        secondLeaderRef
+                );
 
-        InternalClusterNode leader = leaderRef.get();
+        PhysicalTopologyAwareRaftGroupService secondRaftClient = 
raftClients.get2();
 
-        assertNull(leaderRefNoInitialNotify.get());
+        InternalClusterNode leader = firstLeaderRef.get();
 
         // Forcing the leader change by stopping the actual leader.
         var raftServerToStop = raftServers.remove(new 
NetworkAddress("localhost", leader.address().port()));
@@ -308,8 +260,6 @@ public abstract class AbstractTopologyAwareGroupServiceTest 
extends IgniteAbstra
         ComponentContext componentContext = new ComponentContext();
         assertThat(raftServerToStop.stopAsync(componentContext), 
willCompleteSuccessfully());
 
-        afterNodeStop(leader.name());
-
         var logStorageToStop = logStorageFactories.remove(new 
NetworkAddress("localhost", leader.address().port()));
         assertThat(logStorageToStop.stopAsync(componentContext), 
willCompleteSuccessfully());
 
@@ -322,40 +272,34 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
         if (leader.address().port() != PORT_BASE) {
             // leaderRef is updated through raftClient hosted on PORT_BASE, 
thus if corresponding node was stopped (and it will be stopped
             // if it occurred to be a leader) leaderRef won't be updated.
-            assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 
WAIT_TIMEOUT_MILLIS));
+            await().until(() -> !leader.equals(firstLeaderRef.get()));
         }
-        assertTrue(waitForCondition(
-                () -> leaderRefNoInitialNotify.get() != null && 
!leader.equals(leaderRefNoInitialNotify.get()),
-                WAIT_TIMEOUT_MILLIS)
-        );
 
-        log.info("New Leader: " + leaderRefNoInitialNotify.get());
+        await().until(() -> secondLeaderRef.get() != null && 
!leader.equals(secondLeaderRef.get()));
 
-        afterLeaderChange(leaderRefNoInitialNotify.get().name());
+        log.info("New Leader: " + secondLeaderRef.get());
 
-        raftClientNoInitialNotify.refreshLeader().get();
+        secondRaftClient.refreshLeader().get();
 
-        assertEquals(raftClientNoInitialNotify.leader().consistentId(), 
leaderRefNoInitialNotify.get().name());
+        assertEquals(secondRaftClient.leader().consistentId(), 
secondLeaderRef.get().name());
     }
 
     @Test
     public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
         AtomicReference<InternalClusterNode> leaderRef = new 
AtomicReference<>();
-        AtomicReference<InternalClusterNode> leaderRefNoInitialNotify = new 
AtomicReference<>();
+        AtomicReference<InternalClusterNode> secondLeaderRef = new 
AtomicReference<>();
 
-        IgniteBiTuple<TopologyAwareRaftGroupService, 
TopologyAwareRaftGroupService> raftClients =
-                startClusterWithClientsAndSubscribeToLeaderChange(
+        IgniteBiTuple<PhysicalTopologyAwareRaftGroupService, 
PhysicalTopologyAwareRaftGroupService> raftClients =
+                startClusterWithClientsAndSubscribe(
                         testInfo,
                         leaderRef,
-                        leaderRefNoInitialNotify
+                        secondLeaderRef
                 );
 
-        TopologyAwareRaftGroupService raftClient = raftClients.get1();
+        PhysicalTopologyAwareRaftGroupService raftClient = raftClients.get1();
 
         InternalClusterNode leader = leaderRef.get();
 
-        assertNull(leaderRefNoInitialNotify.get());
-
         // Forcing the leader change by transferring leadership.
         Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> 
!leader.name().equals(peer.consistentId())).findAny().get();
 
@@ -366,16 +310,11 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
         String leaderId = newLeaderPeer.consistentId();
 
         // Waiting for the notifications to check.
-        assertTrue(waitForCondition(() -> 
leaderId.equals(leaderRef.get().name()), WAIT_TIMEOUT_MILLIS));
-        assertTrue(waitForCondition(
-                () -> leaderRefNoInitialNotify.get() != null && 
leaderId.equals(leaderRefNoInitialNotify.get().name()),
-                WAIT_TIMEOUT_MILLIS
-        ));
+        await().until(() -> leaderId.equals(leaderRef.get().name()));
+        await().until(() -> secondLeaderRef.get() != null && 
leaderId.equals(secondLeaderRef.get().name()));
 
         log.info("New Leader: " + leaderRef.get());
 
-        afterLeaderChange(leaderRef.get().name());
-
         raftClient.refreshLeader().get();
 
         assertEquals(raftClient.leader().consistentId(), 
leaderRef.get().name());
@@ -388,7 +327,7 @@ public abstract class AbstractTopologyAwareGroupServiceTest 
extends IgniteAbstra
      */
     private void stopCluster() throws Exception {
         if (!CollectionUtils.nullOrEmpty(raftClients)) {
-            raftClients.forEach(TopologyAwareRaftGroupService::shutdown);
+            
raftClients.forEach(PhysicalTopologyAwareRaftGroupService::shutdown);
 
             raftClients.clear();
         }
@@ -421,7 +360,7 @@ public abstract class AbstractTopologyAwareGroupServiceTest 
extends IgniteAbstra
      * @param clientPort      Port of node where a client will start.
      * @return Topology aware client.
      */
-    private @Nullable TopologyAwareRaftGroupService startCluster(
+    private @Nullable PhysicalTopologyAwareRaftGroupService startCluster(
             TestInfo testInfo,
             Predicate<NetworkAddress> isServerAddress,
             int nodes,
@@ -431,24 +370,35 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
 
         var nodeFinder = new StaticNodeFinder(addresses);
 
-        TopologyAwareRaftGroupService raftClient = null;
+        PeersAndLearners peersAndLearners = peersAndLearners(testInfo, 
isServerAddress, nodes);
+
+        PhysicalTopologyAwareRaftGroupService raftClient = null;
+
+        RaftGroupEventsClientListener clientRaftListener = null;
 
         for (NetworkAddress addr : addresses) {
             var cluster = clusterService(testInfo, addr.port(), nodeFinder);
 
             assertThat(cluster.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
+            // Starting the topology-aware client earlier than the other 
components to provoke a situation where the client starts earlier
+            // than the other nodes but works anyway.
+            if (addr.port() == clientPort) {
+                clientRaftListener = new RaftGroupEventsClientListener();
+
+                raftClient = startTopologyAwareClient(cluster, 
peersAndLearners, clientRaftListener);
+
+                raftClients.add(raftClient);
+            }
+
             clusterServices.put(addr, cluster);
         }
 
-        PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, 
isServerAddress, nodes);
-
         for (NetworkAddress addr : addresses) {
             ClusterService cluster = clusterServices.get(addr);
 
-            LogicalTopologyService logicalTopologyService = new 
TestLogicalTopologyService(cluster);
-
-            RaftGroupEventsClientListener eventsClientListener = new 
RaftGroupEventsClientListener();
+            RaftGroupEventsClientListener eventsClientListener =
+                    addr.port() == clientPort ? clientRaftListener : new 
RaftGroupEventsClientListener();
 
             if (isServerAddress.test(addr)) { // RAFT server node
                 var localPeer = peersAndLearners.peers().stream()
@@ -490,31 +440,16 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
                 );
 
                 raftServers.put(addr, raftServer);
-
-                afterNodeStart(localPeer.consistentId(), cluster, dataPath, 
peersAndLearners, eventsClientListener, logicalTopologyService);
-            }
-
-            if (addr.port() == clientPort) {
-                assertTrue(isServerAddress.test(addr));
-
-                raftClient = startTopologyAwareClient(cluster, 
clusterServices, isServerAddress, nodes, eventsClientListener,
-                        logicalTopologyService, true);
-
-                raftClients.add(raftClient);
             }
         }
 
         return raftClient;
     }
 
-    private TopologyAwareRaftGroupService startTopologyAwareClient(
+    private PhysicalTopologyAwareRaftGroupService startTopologyAwareClient(
             ClusterService localClusterService,
-            Map<NetworkAddress, ClusterService> clusterServices,
-            Predicate<NetworkAddress> isServerAddress,
-            int nodes,
-            RaftGroupEventsClientListener eventsClientListener,
-            LogicalTopologyService logicalTopologyService,
-            boolean notifyOnSubscription
+            PeersAndLearners peersAndLearners,
+            RaftGroupEventsClientListener eventsClientListener
     ) {
         if (eventsClientListener == null) {
             eventsClientListener = new RaftGroupEventsClientListener();
@@ -531,46 +466,28 @@ public abstract class 
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
 
         var commandsMarshaller = new 
ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry());
 
-        return TopologyAwareRaftGroupService.start(
+        return PhysicalTopologyAwareRaftGroupService.start(
                 GROUP_ID,
                 localClusterService,
                 FACTORY,
                 raftConfiguration,
-                peersAndLearners(clusterServices, isServerAddress, nodes),
+                peersAndLearners,
                 executor,
-                logicalTopologyService,
                 eventsClientListener,
-                notifyOnSubscription,
                 commandsMarshaller,
                 StoppingExceptionFactories.indicateComponentStop(),
-                throttlingContextHolder()
+                throttlingContextHolder(),
+                NOOP_FAILURE_PROCESSOR
         );
     }
 
     private static PeersAndLearners peersAndLearners(
-            Map<NetworkAddress, ClusterService> clusterServices,
+            TestInfo testInfo,
             Predicate<NetworkAddress> isServerAddress,
             int nodes
     ) {
         return PeersAndLearners.fromConsistentIds(
                 findLocalAddresses(PORT_BASE, PORT_BASE + 
nodes).stream().filter(isServerAddress)
-                        .map(netAddr -> 
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
-                                toSet()));
-    }
-
-    private void subscribeLeader(TopologyAwareRaftGroupService client, 
LeaderElectionListener callback, String logMessage) {
-        CompletableFuture<Void> future = client.subscribeLeader((node, term) 
-> {
-            callback.onLeaderElected(node, term);
-
-            log.info(logMessage, node);
-        });
-
-        assertThat(future, willCompleteSuccessfully());
-    }
-
-    private static void unsubscribeLeader(TopologyAwareRaftGroupService 
client) {
-        CompletableFuture<Void> future = client.unsubscribeLeader();
-
-        assertThat(future, willCompleteSuccessfully());
+                        .map(netAddr -> IgniteTestUtils.testNodeName(testInfo, 
netAddr.port())).collect(toSet()));
     }
 }
diff --git 
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
 
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
index 589cac5bc5c..942c8b9c8c7 100644
--- 
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++ 
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
@@ -82,6 +82,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Abstract class containing test scenarios for {@link 
TopologyAwareRaftGroupService} related test classes.
+ * TODO: IGNITE-27257 Refactor the class to make it more readable and 
maintainable.
  */
 @ExtendWith(ConfigurationExtension.class)
 public abstract class AbstractTopologyAwareGroupServiceTest extends 
IgniteAbstractTest {

Reply via email to