This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 93c3636213e IGNITE-27154 Extend topology-aware RAFT client (#7152)
93c3636213e is described below
commit 93c3636213eb926d828b67cef714951b2cbd0925
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Dec 5 19:10:23 2025 +0300
IGNITE-27154 Extend topology-aware RAFT client (#7152)
---
.../internal/raft/service/RaftGroupService.java | 5 +-
modules/replicator/build.gradle | 3 +
.../PhysicalTopologyAwareRaftGroupService.java | 511 +++++++++++++++++++++
.../raft/client/TopologyAwareRaftGroupService.java | 1 +
...PhysicalTopologyAwareRaftGroupServiceTest.java} | 275 ++++-------
.../AbstractTopologyAwareGroupServiceTest.java | 1 +
6 files changed, 614 insertions(+), 182 deletions(-)
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index 72b939f75a6..2dfd5e87a0b 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -57,10 +57,9 @@ public interface RaftGroupService extends RaftCommandRunner {
@Nullable Peer leader();
/**
- * Returns a list of voting peers or {@code null} if it has not been yet
initialized. The order is corresponding to the time of joining
- * to the replication group.
+ * Returns a list of voting peers. The order is corresponding to the time
of joining to the replication group.
*/
- @Nullable List<Peer> peers();
+ List<Peer> peers();
/**
* Returns a list of leaners or {@code null} if it has not been yet
initialized. The order is corresponding to the time of joining to
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index 3b6c111ae8b..f37aa4a4207 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -58,10 +58,13 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-metrics'))
integrationTestImplementation testFixtures(project(':ignite-raft'))
+ testImplementation libs.awaitility
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-placement-driver-api'))
testImplementation testFixtures(project(':ignite-failure-handler'))
testImplementation testFixtures(project(':ignite-raft'))
+ testImplementation testFixtures(project(':ignite-configuration'))
+ testImplementation testFixtures(project(':ignite-network'))
testFixturesAnnotationProcessor
project(':ignite-network-annotation-processor')
testFixturesImplementation project(':ignite-network-api')
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
new file mode 100644
index 00000000000..4a8512cf659
--- /dev/null
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.client;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyEventHandler;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ExceptionFactory;
+import org.apache.ignite.internal.raft.LeaderElectionListener;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeerUnavailableException;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupServiceImpl;
+import org.apache.ignite.internal.raft.ThrottlingContextHolder;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.service.LeaderWithTerm;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import
org.apache.ignite.raft.jraft.rpc.CliRequests.SubscriptionLeaderChangeRequest;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The RAFT group service is based on the cluster physical topology. This
service has ability to subscribe of a RAFT group leader update.
+ */
+public class PhysicalTopologyAwareRaftGroupService implements RaftGroupService
{
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(TopologyAwareRaftGroupService.class);
+
+ /** Raft message factory. */
+ private static final RaftMessagesFactory MESSAGES_FACTORY = Loza.FACTORY;
+
+ /** General leader election listener. */
+ private final ServerEventHandler generalLeaderElectionListener;
+
+ /** Failure manager. */
+ private final FailureManager failureManager;
+
+ /** Cluster service. */
+ private final ClusterService clusterService;
+
+ /** RPC RAFT client. */
+ private final RaftGroupService raftClient;
+
+ /** Executor to invoke RPC requests. */
+ private final Executor executor;
+
+ /** RAFT configuration. */
+ private final RaftConfiguration raftConfiguration;
+
+ /**
+ * Constructor.
+ *
+ * @param failureManager Failure manager.
+ * @param clusterService Cluster service.
+ * @param executor Executor to invoke RPC requests and notify listeners.
+ * @param raftConfiguration RAFT configuration.
+ * @param raftClient RPC RAFT client.
+ * @param eventsClientListener Events client listener.
+ */
+ private PhysicalTopologyAwareRaftGroupService(
+ FailureManager failureManager,
+ ClusterService clusterService,
+ Executor executor,
+ RaftConfiguration raftConfiguration,
+ RaftGroupService raftClient,
+ RaftGroupEventsClientListener eventsClientListener
+ ) {
+ this.failureManager = failureManager;
+ this.clusterService = clusterService;
+ this.executor = executor;
+ this.raftConfiguration = raftConfiguration;
+ this.raftClient = raftClient;
+
+ this.generalLeaderElectionListener = new ServerEventHandler(executor);
+
+ eventsClientListener.addLeaderElectionListener(raftClient.groupId(),
generalLeaderElectionListener);
+
+ clusterService.topologyService().addEventHandler(new
TopologyEventHandler() {
+ @Override
+ public void onAppeared(InternalClusterNode member) {
+ CompletableFuture<Boolean> fut =
changeNodeSubscriptionIfNeed(member, true);
+
+ requestLeaderManually(clusterService, executor, raftClient,
fut);
+ }
+ });
+
+ ArrayList<CompletableFuture<?>> futures = new ArrayList<>();
+
+ for (InternalClusterNode member :
clusterService.topologyService().allMembers()) {
+ futures.add(changeNodeSubscriptionIfNeed(member, true));
+ }
+
+ requestLeaderManually(clusterService, executor, raftClient,
CompletableFutures.allOf(futures));
+ }
+
+ /**
+ * Requests the leader information for the RAFT group.
+ * TODO: IGNITE-27256 Remove the method after implementing a notification
after subscription.
+ *
+ * @param clusterService Cluster service to retrieve topology information.
+ * @param executor Executor to run asynchronous tasks.
+ * @param raftClient RAFT client to interact with the RAFT group.
+ * @param subscriptionsFut Future representing the completion of
subscription updates.
+ */
+ private void requestLeaderManually(
+ ClusterService clusterService,
+ Executor executor,
+ RaftGroupService raftClient,
+ CompletableFuture<?> subscriptionsFut
+ ) {
+ subscriptionsFut.thenRunAsync(
+ () ->
raftClient.refreshAndGetLeaderWithTerm().whenCompleteAsync(
+ (leaderWithTerm, throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Could not refresh and get leader
with term [grp={}].", groupId(), throwable);
+ }
+
+ InternalClusterNode leaderHost =
clusterService.topologyService()
+
.getByConsistentId(leaderWithTerm.leader().consistentId());
+
+ if (leaderHost != null) {
+ generalLeaderElectionListener.onLeaderElected(
+ leaderHost,
+ leaderWithTerm.term()
+ );
+ } else {
+ LOG.warn("Leader host occurred to leave the
topology [nodeId = {}].",
+
leaderWithTerm.leader().consistentId());
+ }
+ }, executor),
+ executor);
+ }
+
+ private CompletableFuture<Boolean>
changeNodeSubscriptionIfNeed(InternalClusterNode member, boolean subscribe) {
+ Peer peer = new Peer(member.name());
+
+ if (peers().contains(peer)) {
+ SubscriptionLeaderChangeRequest msg =
subscriptionLeaderChangeRequest(subscribe);
+
+ return sendMessage(member, msg).whenComplete((isSent, err) -> {
+ if (err != null) {
+ failureManager.process(new FailureContext(err, "Could not
change subscription to leader updates [grp="
+ + groupId() + "]."));
+ }
+
+ LOG.info("Subscription status changed for the peer [grp={},
consistentId={}, subscribe={}, isSent={}].",
+ groupId(), member.name(), subscribe, isSent);
+ });
+ }
+
+ return CompletableFutures.booleanCompletedFuture(false);
+ }
+
+ /**
+ * Starts a new instance of the PhysicalTopologyAwareRaftGroupService.
+ *
+ * @param groupId The ID of the RAFT group.
+ * @param cluster Cluster service for communication.
+ * @param factory Factory for creating RAFT messages.
+ * @param raftConfiguration Configuration for the RAFT group.
+ * @param configuration Peers and learners configuration.
+ * @param executor Executor for asynchronous tasks.
+ * @param eventsClientListener Listener for RAFT group events.
+ * @param cmdMarshaller Marshaller for RAFT commands.
+ * @param stoppingExceptionFactory Factory for creating stopping
exceptions.
+ * @param throttlingContextHolder Context holder for throttling.
+ * @param failureManager Manager for handling failures.
+ * @return A new instance of PhysicalTopologyAwareRaftGroupService.
+ */
+ public static PhysicalTopologyAwareRaftGroupService start(
+ ReplicationGroupId groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ RaftConfiguration raftConfiguration,
+ PeersAndLearners configuration,
+ ScheduledExecutorService executor,
+ RaftGroupEventsClientListener eventsClientListener,
+ Marshaller cmdMarshaller,
+ ExceptionFactory stoppingExceptionFactory,
+ ThrottlingContextHolder throttlingContextHolder,
+ FailureManager failureManager
+ ) {
+ return new PhysicalTopologyAwareRaftGroupService(
+ failureManager,
+ cluster,
+ executor,
+ raftConfiguration,
+ RaftGroupServiceImpl.start(
+ groupId,
+ cluster,
+ factory,
+ raftConfiguration,
+ configuration,
+ executor,
+ cmdMarshaller,
+ stoppingExceptionFactory,
+ throttlingContextHolder
+ ),
+ eventsClientListener
+ );
+ }
+
+ private void finishSubscriptions() {
+ for (InternalClusterNode member :
clusterService.topologyService().allMembers()) {
+ changeNodeSubscriptionIfNeed(member, false);
+ }
+ }
+
+ public void subscribeLeader(LeaderElectionListener callback) {
+ generalLeaderElectionListener.addCallbackAndNotify(callback);
+ }
+
+ public void unsubscribeLeader(LeaderElectionListener callback) {
+ generalLeaderElectionListener.removeCallbackAndNotify(callback);
+ }
+
+ private SubscriptionLeaderChangeRequest
subscriptionLeaderChangeRequest(boolean subscribe) {
+ return MESSAGES_FACTORY.subscriptionLeaderChangeRequest()
+ .groupId(groupId())
+ .subscribe(subscribe)
+ .build();
+ }
+
+ private CompletableFuture<Boolean> sendMessage(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg) {
+ var msgSendFut = new CompletableFuture<Boolean>();
+
+ sendWithRetry(node, msg, msgSendFut);
+
+ return msgSendFut;
+ }
+
+ private void sendWithRetry(InternalClusterNode node,
SubscriptionLeaderChangeRequest msg, CompletableFuture<Boolean> msgSendFut) {
+ Long responseTimeout =
raftConfiguration.responseTimeoutMillis().value();
+
+ clusterService.messagingService().invoke(node, msg,
responseTimeout).whenCompleteAsync((unused, invokeThrowable) -> {
+ if (invokeThrowable == null) {
+ msgSendFut.complete(true);
+
+ return;
+ }
+
+ Throwable invokeCause = unwrapCause(invokeThrowable);
+ if (!msg.subscribe()) {
+ // We don't want to propagate exceptions when unsubscribing
(if it's not an Error!).
+ if (invokeCause instanceof Error) {
+ msgSendFut.completeExceptionally(invokeThrowable);
+ } else {
+ LOG.debug("An exception while trying to unsubscribe.",
invokeThrowable);
+
+ msgSendFut.complete(false);
+ }
+ } else if (recoverable(invokeCause)) {
+ sendWithRetry(node, msg, msgSendFut);
+ } else if (invokeCause instanceof RecipientLeftException) {
+ LOG.info(
+ "Could not subscribe to leader update from a specific
node, because the node had left the cluster: [node={}].",
+ node
+ );
+
+ msgSendFut.complete(false);
+ } else if (invokeCause instanceof NodeStoppingException) {
+ msgSendFut.complete(false);
+ } else {
+ LOG.error("Could not send the subscribe message to the node:
[node={}, msg={}].", invokeThrowable, node, msg);
+
+ msgSendFut.completeExceptionally(invokeThrowable);
+ }
+ }, executor);
+ }
+
+ private static boolean recoverable(Throwable t) {
+ t = unwrapCause(t);
+
+ return t instanceof TimeoutException
+ || t instanceof IOException
+ || t instanceof PeerUnavailableException
+ || t instanceof RecipientLeftException;
+ }
+
+ @Override
+ public ReplicationGroupId groupId() {
+ return raftClient.groupId();
+ }
+
+ @Override
+ public @Nullable Peer leader() {
+ return raftClient.leader();
+ }
+
+ @Override
+ public List<Peer> peers() {
+ return raftClient.peers();
+ }
+
+ @Override
+ public @Nullable List<Peer> learners() {
+ return raftClient.learners();
+ }
+
+ @Override
+ public CompletableFuture<Void> refreshLeader() {
+ return raftClient.refreshLeader();
+ }
+
+ @Override
+ public CompletableFuture<LeaderWithTerm> refreshAndGetLeaderWithTerm() {
+ return raftClient.refreshAndGetLeaderWithTerm();
+ }
+
+ @Override
+ public CompletableFuture<Void> refreshMembers(boolean onlyAlive) {
+ return raftClient.refreshMembers(onlyAlive);
+ }
+
+ @Override
+ public CompletableFuture<Void> addPeer(Peer peer, long sequenceToken) {
+ return raftClient.addPeer(peer, sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> removePeer(Peer peer, long sequenceToken) {
+ return raftClient.removePeer(peer, sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> changePeersAndLearners(PeersAndLearners
peersAndLearners, long term, long sequenceToken) {
+ return raftClient.changePeersAndLearners(peersAndLearners, term,
sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void>
changePeersAndLearnersAsync(PeersAndLearners peersAndLearners, long term, long
sequenceToken) {
+ return raftClient.changePeersAndLearnersAsync(peersAndLearners, term,
sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> addLearners(Collection<Peer> learners, long
sequenceToken) {
+ return raftClient.addLearners(learners, sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeLearners(Collection<Peer> learners,
long sequenceToken) {
+ return raftClient.removeLearners(learners, sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> resetLearners(Collection<Peer> learners,
long sequenceToken) {
+ return raftClient.resetLearners(learners, sequenceToken);
+ }
+
+ @Override
+ public CompletableFuture<Void> snapshot(Peer peer, boolean forced) {
+ return raftClient.snapshot(peer, forced);
+ }
+
+ @Override
+ public CompletableFuture<Void> transferLeadership(Peer newLeader) {
+ return raftClient.transferLeadership(newLeader);
+ }
+
+ @Override
+ public void shutdown() {
+ finishSubscriptions();
+
+ raftClient.shutdown();
+ }
+
+ @Override
+ public CompletableFuture<Long> readIndex() {
+ return raftClient.readIndex();
+ }
+
+ @Override
+ public ClusterService clusterService() {
+ return raftClient.clusterService();
+ }
+
+ @Override
+ public void updateConfiguration(PeersAndLearners configuration) {
+ raftClient.updateConfiguration(configuration);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> run(Command cmd) {
+ return raftClient.run(cmd);
+ }
+
+ @Override
+ public <R> CompletableFuture<R> run(Command cmd, long timeoutMillis) {
+ return raftClient.run(cmd, timeoutMillis);
+ }
+
+ /**
+ * Leader election handler.
+ */
+ private static class ServerEventHandler implements LeaderElectionListener {
+ private final Executor executor;
+
+ CompletableFuture<Void> fut = CompletableFutures.nullCompletedFuture();
+
+ /** A leader elected callback. */
+ private final ArrayList<LeaderElectionListener> callbacks = new
ArrayList<>();
+
+ /** Last elected leader. */
+ private InternalClusterNode leaderNode = null;
+
+ /** Term of the last elected leader. */
+ private long leaderTerm = -1;
+
+ /**
+ * Constructor.
+ *
+ * @param executor Executor.
+ */
+ ServerEventHandler(Executor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Notifies about a new leader elected, if it did not make before.
+ *
+ * @param node Node.
+ * @param term Term.
+ */
+ @Override
+ public synchronized void onLeaderElected(InternalClusterNode node,
long term) {
+ if (term > leaderTerm) {
+ leaderTerm = term;
+ leaderNode = node;
+
+ if (callbacks.isEmpty()) {
+ return;
+ }
+
+ ArrayList<LeaderElectionListener> listeners = new
ArrayList<>(callbacks);
+
+ // Avoid notifying in the synchronized block.
+ if (fut.isDone()) {
+ fut = runAsync(() -> {
+ for (LeaderElectionListener listener : listeners) {
+ listener.onLeaderElected(node, term);
+ }
+ }, executor);
+ } else {
+ fut = fut.thenRunAsync(() -> {
+ for (LeaderElectionListener listener : listeners) {
+ listener.onLeaderElected(node, term);
+ }
+ }, executor);
+ }
+ }
+ }
+
+ synchronized void addCallbackAndNotify(LeaderElectionListener
callback) {
+ callbacks.add(callback);
+
+ if (leaderTerm != -1) {
+ long finalLeaderTerm = this.leaderTerm;
+ InternalClusterNode finalLeaderNode = this.leaderNode;
+
+ // Notify about the current leader outside of the synchronized
block.
+ if (fut.isDone()) {
+ fut = runAsync(() ->
callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), executor);
+ } else {
+ fut = fut.thenRunAsync(() ->
callback.onLeaderElected(finalLeaderNode, finalLeaderTerm), executor);
+ }
+ }
+ }
+
+ synchronized void removeCallbackAndNotify(LeaderElectionListener
callback) {
+ callbacks.remove(callback);
+ }
+ }
+}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 2699db88ecd..aeab8e01454 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -61,6 +61,7 @@ import org.jetbrains.annotations.Nullable;
/**
* RAFT client aware of a logical topology to handle distributed events.
+ * TODO: IGNITE-27257 Refactor the class to make it more readable and
maintainable.
*/
public class TopologyAwareRaftGroupService implements RaftGroupService {
diff --git
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
similarity index 64%
copy from
modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
copy to
modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
index 589cac5bc5c..5b529d27e02 100644
---
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceTest.java
@@ -21,12 +21,11 @@ import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.raft.TestThrottlingContextHolder.throttlingContextHolder;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
@@ -41,15 +40,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
-import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.raft.LeaderElectionListener;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -64,8 +63,8 @@ import
org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
-import org.apache.ignite.internal.topology.TestLogicalTopologyService;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.NetworkAddress;
@@ -81,23 +80,22 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
- * Abstract class containing test scenarios for {@link
TopologyAwareRaftGroupService} related test classes.
+ * Tests for {@link PhysicalTopologyAwareRaftGroupService}.
*/
@ExtendWith(ConfigurationExtension.class)
-public abstract class AbstractTopologyAwareGroupServiceTest extends
IgniteAbstractTest {
+public class PhysicalTopologyAwareRaftGroupServiceTest extends
IgniteAbstractTest {
/** RAFT message factory. */
private static final RaftMessagesFactory FACTORY = new
RaftMessagesFactory();
/** Base node port. */
private static final int PORT_BASE = 1234;
- /** Wait timeout, in milliseconds. */
- protected static final int WAIT_TIMEOUT_MILLIS = 10_000;
+ private static final TestReplicationGroupId GROUP_ID = new
TestReplicationGroupId("group_1");
- protected static final TestReplicationGroupId GROUP_ID = new
TestReplicationGroupId("group_1");
+ private static final FailureManager NOOP_FAILURE_PROCESSOR = new
FailureManager(new NoOpFailureHandler());
/** RPC executor. */
- protected final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(
+ private final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(
20,
IgniteThreadFactory.create("Test", "Raft-Group-Client", log)
);
@@ -108,63 +106,23 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
private final Map<NetworkAddress, LogStorageFactory> logStorageFactories =
new HashMap<>();
- private final List<TopologyAwareRaftGroupService> raftClients = new
ArrayList<>();
+ private final List<PhysicalTopologyAwareRaftGroupService> raftClients =
new ArrayList<>();
@InjectConfiguration
- protected RaftConfiguration raftConfiguration;
+ private RaftConfiguration raftConfiguration;
@AfterEach
- protected void tearDown() throws Exception {
+ public void afterTest() throws Exception {
IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
stopCluster();
}
- /**
- * The method is called after every node of the cluster starts.
- *
- * @param nodeName Node name.
- * @param clusterService Cluster service.
- * @param dataPath Data path for raft node.
- * @param peersAndLearners Peers and learners.
- * @param eventsClientListener Raft events listener for client.
- * @param logicalTopologyService Logical topology service.
- */
- protected abstract void afterNodeStart(
- String nodeName,
- ClusterService clusterService,
- Path dataPath,
- PeersAndLearners peersAndLearners,
- RaftGroupEventsClientListener eventsClientListener,
- LogicalTopologyService logicalTopologyService
- );
-
- /**
- * Checks the condition after cluster and raft clients initialization.
- *
- * @param leaderName Current leader name.
- */
- protected abstract void afterClusterInit(String leaderName) throws
InterruptedException;
-
- /**
- * Checks the condition after leader change.
- *
- * @param leaderName Current leader name.
- */
- protected abstract void afterLeaderChange(String leaderName) throws
InterruptedException;
-
- /**
- * The method is called after every node of the cluster stops.
- *
- * @param nodeName Node name.
- */
- protected abstract void afterNodeStop(String nodeName) throws Exception;
-
@Test
public void testOneNodeReplicationGroup(TestInfo testInfo) throws
Exception {
int nodes = 2;
- TopologyAwareRaftGroupService raftClient = startCluster(
+ PhysicalTopologyAwareRaftGroupService raftClient = startCluster(
testInfo,
addr -> true,
nodes,
@@ -175,21 +133,24 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
CompletableFuture<InternalClusterNode> leaderFut = new
CompletableFuture<>();
- subscribeLeader(raftClient, (node, term) -> leaderFut.complete(node),
"New leader: {}");
+ raftClient.subscribeLeader((node, term) -> {
+ log.info("Received leader node: {}", node);
- InternalClusterNode leader = leaderFut.get(WAIT_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
+ leaderFut.complete(node);
+ });
- assertNotNull(leader);
+ assertThat(leaderFut, willCompleteSuccessfully());
+
+ InternalClusterNode leader = leaderFut.get();
- afterClusterInit(leader.name());
+ assertNotNull(leader);
// Below we check that leaderElectionCallback is called once only.
AtomicInteger leaderElectionCallbackCallsCounter = new
AtomicInteger(0);
raftClient.subscribeLeader((leader0, term) ->
leaderElectionCallbackCallsCounter.incrementAndGet());
- // Leader election callback triggering is asynchronous, thus it's
required to give it some time to be called. With 1 second await
- // interval the test will fail 100/100 if there's no fix.
- Thread.sleep(1_000);
+ // Leader election callback triggering is asynchronous, thus it's
required to give it some time to be called.
+ await().until(() -> leaderElectionCallbackCallsCounter.get() == 1);
assertEquals(1, leaderElectionCallbackCallsCounter.get());
}
@@ -205,11 +166,11 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
* client.
* @return Raft clients.
*/
- private IgniteBiTuple<TopologyAwareRaftGroupService,
TopologyAwareRaftGroupService>
startClusterWithClientsAndSubscribeToLeaderChange(
+ private IgniteBiTuple<PhysicalTopologyAwareRaftGroupService,
PhysicalTopologyAwareRaftGroupService> startClusterWithClientsAndSubscribe(
TestInfo testInfo,
AtomicReference<InternalClusterNode> leaderRef,
AtomicReference<InternalClusterNode> leaderRefNoInitialNotify
- ) throws Exception {
+ ) {
int nodes = 3;
assertTrue(clusterServices.isEmpty());
@@ -218,16 +179,16 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
Predicate<NetworkAddress> isServerAddress = addr -> true;
// Start cluster and the first topology aware client.
- TopologyAwareRaftGroupService raftClient = startCluster(
+ PhysicalTopologyAwareRaftGroupService firstRaftClient = startCluster(
testInfo,
isServerAddress,
nodes,
PORT_BASE
);
- assertNotNull(raftClient);
+ assertNotNull(firstRaftClient);
- raftClient.refreshLeader().get();
+ assertThat(firstRaftClient.refreshLeader(),
willCompleteSuccessfully());
// Start client service for the second client.
int clientPort = PORT_BASE + nodes + 1;
@@ -236,18 +197,12 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
assertThat(clientClusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
// Start the second topology aware client, that should not get the
initial leader notification.
- TopologyAwareRaftGroupService raftClientNoInitialNotify =
startTopologyAwareClient(
+ PhysicalTopologyAwareRaftGroupService secondRaftClient =
startTopologyAwareClient(
clientClusterService,
- clusterServices,
- isServerAddress,
- nodes,
- null,
- new TestLogicalTopologyService(clientClusterService),
- false
+ peersAndLearners(testInfo, isServerAddress, nodes),
+ null
);
- raftClientNoInitialNotify.refreshLeader().get();
-
List<NetworkAddress> clientAddress = findLocalAddresses(clientPort,
clientPort + 1);
assertEquals(1, clientAddress.size());
clusterServices.put(clientAddress.get(0), clientClusterService);
@@ -255,52 +210,49 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
AtomicInteger callsCount = new AtomicInteger();
// Subscribing clients.
- subscribeLeader(raftClient, (node, term) -> leaderRef.set(node), "New
leader: {}");
+ firstRaftClient.subscribeLeader((leader, term) -> {
+ log.info("First client got notification [node={}, term={}].",
leader, term);
- for (int i = 0; i < 2; i++) {
- unsubscribeLeader(raftClientNoInitialNotify);
+ leaderRef.set(leader);
+ });
- subscribeLeader(raftClientNoInitialNotify, (node, term) -> {
- callsCount.incrementAndGet();
- leaderRefNoInitialNotify.set(node);
- }, "New leader (client without initial notification): {}");
- }
+ secondRaftClient.subscribeLeader((node, term) -> {
+ log.info("Second client got notification [node={}, term={}].",
node, term);
- // Checking invariants.
- assertTrue(callsCount.get() <= 1);
+ callsCount.incrementAndGet();
+ leaderRefNoInitialNotify.set(node);
+ });
- assertTrue(waitForCondition(() -> leaderRef.get() != null,
WAIT_TIMEOUT_MILLIS));
+ await().until(() -> leaderRef.get() != null);
InternalClusterNode leader = leaderRef.get();
- assertNotNull(leader);
-
log.info("Leader: " + leader);
- afterClusterInit(leader.name());
+ // Checking invariants.
+ await().until(() ->
leaderRef.get().equals(leaderRefNoInitialNotify.get()));
+ assertEquals(1, callsCount.get());
- raftClients.add(raftClientNoInitialNotify);
+ raftClients.add(secondRaftClient);
- return new IgniteBiTuple<>(raftClient, raftClientNoInitialNotify);
+ return new IgniteBiTuple<>(firstRaftClient, secondRaftClient);
}
@Test
public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws
Exception {
- AtomicReference<InternalClusterNode> leaderRef = new
AtomicReference<>();
- AtomicReference<InternalClusterNode> leaderRefNoInitialNotify = new
AtomicReference<>();
-
- IgniteBiTuple<TopologyAwareRaftGroupService,
TopologyAwareRaftGroupService> raftClients =
- startClusterWithClientsAndSubscribeToLeaderChange(
- testInfo,
- leaderRef,
- leaderRefNoInitialNotify
- );
+ AtomicReference<InternalClusterNode> firstLeaderRef = new
AtomicReference<>();
+ AtomicReference<InternalClusterNode> secondLeaderRef = new
AtomicReference<>();
- TopologyAwareRaftGroupService raftClientNoInitialNotify =
raftClients.get2();
+ IgniteBiTuple<PhysicalTopologyAwareRaftGroupService,
PhysicalTopologyAwareRaftGroupService> raftClients =
+ startClusterWithClientsAndSubscribe(
+ testInfo,
+ firstLeaderRef,
+ secondLeaderRef
+ );
- InternalClusterNode leader = leaderRef.get();
+ PhysicalTopologyAwareRaftGroupService secondRaftClient =
raftClients.get2();
- assertNull(leaderRefNoInitialNotify.get());
+ InternalClusterNode leader = firstLeaderRef.get();
// Forcing the leader change by stopping the actual leader.
var raftServerToStop = raftServers.remove(new
NetworkAddress("localhost", leader.address().port()));
@@ -308,8 +260,6 @@ public abstract class AbstractTopologyAwareGroupServiceTest
extends IgniteAbstra
ComponentContext componentContext = new ComponentContext();
assertThat(raftServerToStop.stopAsync(componentContext),
willCompleteSuccessfully());
- afterNodeStop(leader.name());
-
var logStorageToStop = logStorageFactories.remove(new
NetworkAddress("localhost", leader.address().port()));
assertThat(logStorageToStop.stopAsync(componentContext),
willCompleteSuccessfully());
@@ -322,40 +272,34 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
if (leader.address().port() != PORT_BASE) {
// leaderRef is updated through raftClient hosted on PORT_BASE,
thus if corresponding node was stopped (and it will be stopped
// if it occurred to be a leader) leaderRef won't be updated.
- assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()),
WAIT_TIMEOUT_MILLIS));
+ await().until(() -> !leader.equals(firstLeaderRef.get()));
}
- assertTrue(waitForCondition(
- () -> leaderRefNoInitialNotify.get() != null &&
!leader.equals(leaderRefNoInitialNotify.get()),
- WAIT_TIMEOUT_MILLIS)
- );
- log.info("New Leader: " + leaderRefNoInitialNotify.get());
+ await().until(() -> secondLeaderRef.get() != null &&
!leader.equals(secondLeaderRef.get()));
- afterLeaderChange(leaderRefNoInitialNotify.get().name());
+ log.info("New Leader: " + secondLeaderRef.get());
- raftClientNoInitialNotify.refreshLeader().get();
+ secondRaftClient.refreshLeader().get();
- assertEquals(raftClientNoInitialNotify.leader().consistentId(),
leaderRefNoInitialNotify.get().name());
+ assertEquals(secondRaftClient.leader().consistentId(),
secondLeaderRef.get().name());
}
@Test
public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
AtomicReference<InternalClusterNode> leaderRef = new
AtomicReference<>();
- AtomicReference<InternalClusterNode> leaderRefNoInitialNotify = new
AtomicReference<>();
+ AtomicReference<InternalClusterNode> secondLeaderRef = new
AtomicReference<>();
- IgniteBiTuple<TopologyAwareRaftGroupService,
TopologyAwareRaftGroupService> raftClients =
- startClusterWithClientsAndSubscribeToLeaderChange(
+ IgniteBiTuple<PhysicalTopologyAwareRaftGroupService,
PhysicalTopologyAwareRaftGroupService> raftClients =
+ startClusterWithClientsAndSubscribe(
testInfo,
leaderRef,
- leaderRefNoInitialNotify
+ secondLeaderRef
);
- TopologyAwareRaftGroupService raftClient = raftClients.get1();
+ PhysicalTopologyAwareRaftGroupService raftClient = raftClients.get1();
InternalClusterNode leader = leaderRef.get();
- assertNull(leaderRefNoInitialNotify.get());
-
// Forcing the leader change by transferring leadership.
Peer newLeaderPeer = raftClient.peers().stream().filter(peer ->
!leader.name().equals(peer.consistentId())).findAny().get();
@@ -366,16 +310,11 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
String leaderId = newLeaderPeer.consistentId();
// Waiting for the notifications to check.
- assertTrue(waitForCondition(() ->
leaderId.equals(leaderRef.get().name()), WAIT_TIMEOUT_MILLIS));
- assertTrue(waitForCondition(
- () -> leaderRefNoInitialNotify.get() != null &&
leaderId.equals(leaderRefNoInitialNotify.get().name()),
- WAIT_TIMEOUT_MILLIS
- ));
+ await().until(() -> leaderId.equals(leaderRef.get().name()));
+ await().until(() -> secondLeaderRef.get() != null &&
leaderId.equals(secondLeaderRef.get().name()));
log.info("New Leader: " + leaderRef.get());
- afterLeaderChange(leaderRef.get().name());
-
raftClient.refreshLeader().get();
assertEquals(raftClient.leader().consistentId(),
leaderRef.get().name());
@@ -388,7 +327,7 @@ public abstract class AbstractTopologyAwareGroupServiceTest
extends IgniteAbstra
*/
private void stopCluster() throws Exception {
if (!CollectionUtils.nullOrEmpty(raftClients)) {
- raftClients.forEach(TopologyAwareRaftGroupService::shutdown);
+
raftClients.forEach(PhysicalTopologyAwareRaftGroupService::shutdown);
raftClients.clear();
}
@@ -421,7 +360,7 @@ public abstract class AbstractTopologyAwareGroupServiceTest
extends IgniteAbstra
* @param clientPort Port of node where a client will start.
* @return Topology aware client.
*/
- private @Nullable TopologyAwareRaftGroupService startCluster(
+ private @Nullable PhysicalTopologyAwareRaftGroupService startCluster(
TestInfo testInfo,
Predicate<NetworkAddress> isServerAddress,
int nodes,
@@ -431,24 +370,35 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
var nodeFinder = new StaticNodeFinder(addresses);
- TopologyAwareRaftGroupService raftClient = null;
+ PeersAndLearners peersAndLearners = peersAndLearners(testInfo,
isServerAddress, nodes);
+
+ PhysicalTopologyAwareRaftGroupService raftClient = null;
+
+ RaftGroupEventsClientListener clientRaftListener = null;
for (NetworkAddress addr : addresses) {
var cluster = clusterService(testInfo, addr.port(), nodeFinder);
assertThat(cluster.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ // Starting the topology-aware client earlier than the other
components to provoke a situation where the client starts earlier
+ // than the other nodes but works anyway.
+ if (addr.port() == clientPort) {
+ clientRaftListener = new RaftGroupEventsClientListener();
+
+ raftClient = startTopologyAwareClient(cluster,
peersAndLearners, clientRaftListener);
+
+ raftClients.add(raftClient);
+ }
+
clusterServices.put(addr, cluster);
}
- PeersAndLearners peersAndLearners = peersAndLearners(clusterServices,
isServerAddress, nodes);
-
for (NetworkAddress addr : addresses) {
ClusterService cluster = clusterServices.get(addr);
- LogicalTopologyService logicalTopologyService = new
TestLogicalTopologyService(cluster);
-
- RaftGroupEventsClientListener eventsClientListener = new
RaftGroupEventsClientListener();
+ RaftGroupEventsClientListener eventsClientListener =
+ addr.port() == clientPort ? clientRaftListener : new
RaftGroupEventsClientListener();
if (isServerAddress.test(addr)) { // RAFT server node
var localPeer = peersAndLearners.peers().stream()
@@ -490,31 +440,16 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
);
raftServers.put(addr, raftServer);
-
- afterNodeStart(localPeer.consistentId(), cluster, dataPath,
peersAndLearners, eventsClientListener, logicalTopologyService);
- }
-
- if (addr.port() == clientPort) {
- assertTrue(isServerAddress.test(addr));
-
- raftClient = startTopologyAwareClient(cluster,
clusterServices, isServerAddress, nodes, eventsClientListener,
- logicalTopologyService, true);
-
- raftClients.add(raftClient);
}
}
return raftClient;
}
- private TopologyAwareRaftGroupService startTopologyAwareClient(
+ private PhysicalTopologyAwareRaftGroupService startTopologyAwareClient(
ClusterService localClusterService,
- Map<NetworkAddress, ClusterService> clusterServices,
- Predicate<NetworkAddress> isServerAddress,
- int nodes,
- RaftGroupEventsClientListener eventsClientListener,
- LogicalTopologyService logicalTopologyService,
- boolean notifyOnSubscription
+ PeersAndLearners peersAndLearners,
+ RaftGroupEventsClientListener eventsClientListener
) {
if (eventsClientListener == null) {
eventsClientListener = new RaftGroupEventsClientListener();
@@ -531,46 +466,28 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
var commandsMarshaller = new
ThreadLocalOptimizedMarshaller(localClusterService.serializationRegistry());
- return TopologyAwareRaftGroupService.start(
+ return PhysicalTopologyAwareRaftGroupService.start(
GROUP_ID,
localClusterService,
FACTORY,
raftConfiguration,
- peersAndLearners(clusterServices, isServerAddress, nodes),
+ peersAndLearners,
executor,
- logicalTopologyService,
eventsClientListener,
- notifyOnSubscription,
commandsMarshaller,
StoppingExceptionFactories.indicateComponentStop(),
- throttlingContextHolder()
+ throttlingContextHolder(),
+ NOOP_FAILURE_PROCESSOR
);
}
private static PeersAndLearners peersAndLearners(
- Map<NetworkAddress, ClusterService> clusterServices,
+ TestInfo testInfo,
Predicate<NetworkAddress> isServerAddress,
int nodes
) {
return PeersAndLearners.fromConsistentIds(
findLocalAddresses(PORT_BASE, PORT_BASE +
nodes).stream().filter(isServerAddress)
- .map(netAddr ->
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
- toSet()));
- }
-
- private void subscribeLeader(TopologyAwareRaftGroupService client,
LeaderElectionListener callback, String logMessage) {
- CompletableFuture<Void> future = client.subscribeLeader((node, term)
-> {
- callback.onLeaderElected(node, term);
-
- log.info(logMessage, node);
- });
-
- assertThat(future, willCompleteSuccessfully());
- }
-
- private static void unsubscribeLeader(TopologyAwareRaftGroupService
client) {
- CompletableFuture<Void> future = client.unsubscribeLeader();
-
- assertThat(future, willCompleteSuccessfully());
+ .map(netAddr -> IgniteTestUtils.testNodeName(testInfo,
netAddr.port())).collect(toSet()));
}
}
diff --git
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
index 589cac5bc5c..942c8b9c8c7 100644
---
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
@@ -82,6 +82,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Abstract class containing test scenarios for {@link
TopologyAwareRaftGroupService} related test classes.
+ * TODO: IGNITE-27257 Refactor the class to make it more readable and
maintainable.
*/
@ExtendWith(ConfigurationExtension.class)
public abstract class AbstractTopologyAwareGroupServiceTest extends
IgniteAbstractTest {