This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ee26ae637c IGNITE-18640 Implement placement driver best-effort single actor selector and fail-over (#1692) ee26ae637c is described below commit ee26ae637c7b200c993b9a7dee36f2dbebdda4df Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Tue Feb 21 15:56:43 2023 +0200 IGNITE-18640 Implement placement driver best-effort single actor selector and fail-over (#1692) --- .../internal/placementdriver/ActiveActorTest.java | 88 +++++++++ .../client/TopologyAwareRaftGroupServiceTest.java | 212 +++++++++++++++++---- .../placementdriver/PlacementDriverManager.java | 142 +++++++++++++- .../raft/client/TopologyAwareRaftGroupService.java | 72 ++++--- .../java/org/apache/ignite/internal/raft/Loza.java | 39 +++- .../raft/server/impl/RaftServiceEventListener.java | 17 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 33 +++- 7 files changed, 526 insertions(+), 77 deletions(-) diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java new file mode 100644 index 0000000000..4eb18b7e85 --- /dev/null +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest; +import org.apache.ignite.network.ClusterService; +import org.junit.jupiter.api.AfterEach; + +/** + * Placement driver active actor test. + */ +public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest { + private Map<String, PlacementDriverManager> placementDriverManagers = new HashMap<>(); + + @AfterEach + @Override + protected void afterTest() throws Exception { + List<AutoCloseable> closeables = placementDriverManagers.values().stream().map(p -> (AutoCloseable) p::stop).collect(toList()); + + closeAll(closeables); + + placementDriverManagers.clear(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override + protected void afterNodeStart(String nodeName, ClusterService clusterService, Set<String> placementDriverNodesNames) { + PlacementDriverManager placementDriverManager = new PlacementDriverManager( + TestReplicationGroup.GROUP_ID, + clusterService, + raftConfiguration, + () -> completedFuture(placementDriverNodesNames), + new LogicalTopologyServiceTestImpl(clusterService), + executor + ); + + placementDriverManager.start(); + + placementDriverManagers.put(nodeName, placementDriverManager); + } + + /** {@inheritDoc} */ + @Override + protected boolean afterInitCheckCondition(String leaderName) { + return checkSingleActiveActor(leaderName); + } + + /** {@inheritDoc} */ + @Override + protected boolean afterLeaderChangeCheckCondition(String leaderName) { + return checkSingleActiveActor(leaderName); + } + + private boolean checkSingleActiveActor(String leaderName) { + for (Map.Entry<String, PlacementDriverManager> e : placementDriverManagers.entrySet()) { + if (e.getValue().isActiveActor() != e.getKey().equals(leaderName)) { + return false; + } + } + + return true; + } +} diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java index caef80fffe..14cc2537bd 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.raft.client; +import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest.TestReplicationGroup.GROUP_ID; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -31,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Predicate; @@ -53,7 +56,6 @@ import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.testframework.IgniteAbstractTest; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; @@ -62,6 +64,7 @@ import org.apache.ignite.network.StaticNodeFinder; import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.utils.ClusterServiceTestUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; @@ -78,10 +81,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { private static final int PORT_BASE = 1234; @InjectConfiguration - private RaftConfiguration raftConfiguration; + protected RaftConfiguration raftConfiguration; /** RPC executor. */ - private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log)); + protected ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", log)); @Test public void testOneNodeReplicationGroup(TestInfo testInfo) throws Exception { @@ -99,15 +102,15 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>(); - raftClient.subscribeLeader(node -> { - leaderFut.complete(node); - }); + raftClient.subscribeLeader((node, term) -> leaderFut.complete(node)); ClusterNode leader = leaderFut.get(10, TimeUnit.SECONDS); assertNotNull(leader); assertEquals(PORT_BASE, leader.address().port()); + afterInitCheckConditionWithWait(leader.name()); + stopCluster(clusterServices, raftServers, raftClient, 2); } @@ -115,23 +118,45 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws Exception { var clusterServices = new HashMap<NetworkAddress, ClusterService>(); var raftServers = new HashMap<NetworkAddress, JraftServerImpl>(); + Predicate<NetworkAddress> isServerAddress = addr -> addr.port() < PORT_BASE + 3; TopologyAwareRaftGroupService raftClient = startCluster( testInfo, clusterServices, raftServers, - addr -> addr.port() < PORT_BASE + 3, + isServerAddress, 4, PORT_BASE + 3 ); + raftClient.refreshLeader().get(); + + TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient( + clusterServices.entrySet().iterator().next().getValue(), + clusterServices, + isServerAddress, + 4, + false + ); + AtomicReference<ClusterNode> leaderRef = new AtomicReference<>(); + AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>(); + AtomicInteger callsCount = new AtomicInteger(); + + raftClient.subscribeLeader((node, term) -> leaderRef.set(node)); - raftClient.subscribeLeader(node -> { - leaderRef.set(node); - }); + for (int i = 0; i < 2; i++) { + raftClientNoInitialNotify.unsubscribeLeader(); - assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() != null, 10_000)); + raftClientNoInitialNotify.subscribeLeader((node, term) -> { + callsCount.incrementAndGet(); + leaderRefNoInitialNotify.set(node); + }); + } + + assertTrue(callsCount.get() <= 1); + + assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000)); ClusterNode leader = leaderRef.get(); @@ -139,16 +164,21 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { log.info("Leader: " + leader); + afterInitCheckConditionWithWait(leader.name()); + var raftServiceToStop = raftServers.remove(new NetworkAddress("localhost", leader.address().port())); raftServiceToStop.stopRaftNodes(GROUP_ID); raftServiceToStop.stop(); clusterServices.remove(new NetworkAddress("localhost", leader.address().port())).stop(); - assertTrue(IgniteTestUtils.waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000)); + assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 10_000)); + assertTrue(waitForCondition(() -> !leader.equals(leaderRefNoInitialNotify.get()), 1000)); log.info("New Leader: " + leaderRef.get()); + afterLeaderChangeCheckConditionWithWait(leaderRef.get().name()); + raftClient.refreshLeader().get(); assertEquals(raftClient.leader().consistentId(), leaderRef.get().name()); @@ -160,23 +190,45 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { public void testChangeLeaderForce(TestInfo testInfo) throws Exception { var clusterServices = new HashMap<NetworkAddress, ClusterService>(); var raftServers = new HashMap<NetworkAddress, JraftServerImpl>(); + Predicate<NetworkAddress> isServerAddress = addr -> addr.port() < PORT_BASE + 3; TopologyAwareRaftGroupService raftClient = startCluster( testInfo, clusterServices, raftServers, - addr -> addr.port() < PORT_BASE + 3, + isServerAddress, 4, PORT_BASE + 3 ); + raftClient.refreshLeader().get(); + + TopologyAwareRaftGroupService raftClientNoInitialNotify = startTopologyAwareClient( + clusterServices.entrySet().iterator().next().getValue(), + clusterServices, + isServerAddress, + 4, + false + ); + AtomicReference<ClusterNode> leaderRef = new AtomicReference<>(); + AtomicReference<ClusterNode> leaderRefNoInitialNotify = new AtomicReference<>(); + AtomicInteger callsCount = new AtomicInteger(); + + raftClient.subscribeLeader((node, term) -> leaderRef.set(node)); + + for (int i = 0; i < 2; i++) { + raftClientNoInitialNotify.unsubscribeLeader(); + + raftClientNoInitialNotify.subscribeLeader((node, term) -> { + callsCount.incrementAndGet(); + leaderRefNoInitialNotify.set(node); + }); + } - raftClient.subscribeLeader(node -> { - leaderRef.set(node); - }); + assertTrue(callsCount.get() <= 1); - assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() != null, 10_000)); + assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000)); ClusterNode leader = leaderRef.get(); @@ -184,16 +236,25 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { log.info("Leader: " + leader); + afterInitCheckConditionWithWait(leader.name()); + Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> !leader.name().equals(peer.consistentId())).findAny().get(); log.info("Peer to transfer leader: " + newLeaderPeer); raftClient.transferLeadership(newLeaderPeer).get(); - assertTrue(IgniteTestUtils.waitForCondition(() -> newLeaderPeer.consistentId().equals(leaderRef.get().name()), 10_000)); + String leaderId = newLeaderPeer.consistentId(); + + assertTrue(waitForCondition(() -> leaderId.equals(leaderRef.get().name()), 10_000)); + assertTrue(waitForCondition( + () -> leaderRefNoInitialNotify.get() != null && leaderId.equals(leaderRefNoInitialNotify.get().name()), 1000) + ); log.info("New Leader: " + leaderRef.get()); + afterLeaderChangeCheckConditionWithWait(leaderRef.get().name()); + raftClient.refreshLeader().get(); assertEquals(raftClient.leader().consistentId(), leaderRef.get().name()); @@ -266,14 +327,13 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { clusterServices.put(addr, cluster); } + PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, isServerAddress, nodes); + + Set<String> placementDriverNodesNames = peersAndLearners.peers().stream().map(Peer::consistentId).collect(toSet()); + for (NetworkAddress addr : addresses) { var cluster = clusterServices.get(addr); - PeersAndLearners peersAndLearners = PeersAndLearners.fromConsistentIds( - addresses.stream().filter(isServerAddress) - .map(netAddr -> clusterServices.get(netAddr).topologyService().localMember().name()).collect( - Collectors.toSet())); - if (isServerAddress.test(addr)) { //RAFT server node var localPeer = peersAndLearners.peers().stream() .filter(peer -> peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get(); @@ -289,24 +349,49 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { ); raftServers.put(addr, raftServer); + + afterNodeStart(localPeer.consistentId(), cluster, placementDriverNodesNames); } if (addr.port() == clientPort) { - raftClient = (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start( - GROUP_ID, - cluster, - FACTORY, - raftConfiguration, - peersAndLearners, - true, - executor, - new LogicalTopologyServiceTestImpl(cluster) - ).join(); + raftClient = startTopologyAwareClient(cluster, clusterServices, isServerAddress, nodes, true); } } + return raftClient; } + private TopologyAwareRaftGroupService startTopologyAwareClient( + ClusterService localClusterService, + HashMap<NetworkAddress, ClusterService> clusterServices, + Predicate<NetworkAddress> isServerAddress, + int nodes, + boolean notifyOnSubscription + ) { + return (TopologyAwareRaftGroupService) TopologyAwareRaftGroupService.start( + GROUP_ID, + localClusterService, + FACTORY, + raftConfiguration, + peersAndLearners(clusterServices, isServerAddress, nodes), + true, + executor, + new LogicalTopologyServiceTestImpl(localClusterService), + notifyOnSubscription + ).join(); + } + + private static PeersAndLearners peersAndLearners( + HashMap<NetworkAddress, ClusterService> clusterServices, + Predicate<NetworkAddress> isServerAddress, + int nodes + ) { + return PeersAndLearners.fromConsistentIds( + getNetworkAddresses(nodes).stream().filter(isServerAddress) + .map(netAddr -> clusterServices.get(netAddr).topologyService().localMember().name()).collect( + toSet())); + } + /** * Generates a node address for each node. * @@ -320,6 +405,62 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { return addresses; } + @AfterEach + protected void afterTest() throws Exception { + // No-op. + } + + /** + * The method is called after every node of the cluster starts. + * + * @param nodeName Node name. + * @param clusterService Cluster service. + * @param placementDriverNodesNames Names of all nodes in raft group. + */ + protected void afterNodeStart(String nodeName, ClusterService clusterService, Set<String> placementDriverNodesNames) { + // No-op. + } + + /** + * Checks the condition after cluster and raft clients initialization, waiting for this condition. + * + * @param leaderName Current leader name. + * @throws InterruptedException If failed. + */ + private void afterInitCheckConditionWithWait(String leaderName) throws InterruptedException { + assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 10_000)); + } + + /** + * Checks the condition after cluster and raft clients initialization. + * + * @param leaderName Current leader name. + * @return Condition result. + */ + protected boolean afterInitCheckCondition(String leaderName) { + return true; + } + + /** + * Checks the condition after leader change, waiting for this condition. + * + * @param leaderName Current leader name. + * @throws InterruptedException If failed. + */ + private void afterLeaderChangeCheckConditionWithWait(String leaderName) throws InterruptedException { + assertTrue(waitForCondition(() -> afterLeaderChangeCheckCondition(leaderName), 10_000)); + } + + /** + * Checks the condition after leader change. + * + * @param leaderName Current leader name. + * @return Condition result. + */ + protected boolean afterLeaderChangeCheckCondition(String leaderName) { + return true; + } + /** * Replication test group class. */ @@ -360,7 +501,10 @@ public class TopologyAwareRaftGroupServiceTest extends IgniteAbstractTest { } } - private static class LogicalTopologyServiceTestImpl implements LogicalTopologyService { + /** + * Test implementation of {@link LogicalTopologyService}. + */ + protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService { private final ClusterService clusterService; public LogicalTopologyServiceTestImpl(ClusterService clusterService) { diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index a30a9c040f..cbf07def6e 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -17,10 +17,25 @@ package org.apache.ignite.internal.placementdriver; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.manager.IgniteComponent; -import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.raft.PeersAndLearners; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.raft.jraft.RaftMessagesFactory; +import org.jetbrains.annotations.TestOnly; /** * Placement driver manager. @@ -31,21 +46,100 @@ public class PlacementDriverManager implements IgniteComponent { /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + private final RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory(); + /** Prevents double stopping of the component. */ private final AtomicBoolean isStopped = new AtomicBoolean(); + private final ReplicationGroupId replicationGroupId; + + private final ClusterService clusterService; + + private final Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider; + + /** + * Raft client future. Can contain null, if this node is not in placement driver group. + */ + private final CompletableFuture<TopologyAwareRaftGroupService> raftClientFuture; + + private final ScheduledExecutorService raftClientExecutor; + + private final LogicalTopologyService logicalTopologyService; + + private final RaftConfiguration raftConfiguration; + + private volatile boolean isActiveActor; + + private volatile long lastTermSeen = -1; + /** * The constructor. * - * @param metaStorageMgr Meta Storage manager. + * @param replicationGroupId Id of placement driver group. + * @param clusterService Cluster service. + * @param raftConfiguration Raft configuration. + * @param placementDriverNodesNamesProvider Provider of the set of placement driver nodes' names. + * @param logicalTopologyService Logical topology service. + * @param raftClientExecutor Raft client executor. */ - public PlacementDriverManager(MetaStorageManager metaStorageMgr) { + public PlacementDriverManager( + ReplicationGroupId replicationGroupId, + ClusterService clusterService, + RaftConfiguration raftConfiguration, + Supplier<CompletableFuture<Set<String>>> placementDriverNodesNamesProvider, + LogicalTopologyService logicalTopologyService, + ScheduledExecutorService raftClientExecutor + ) { + this.replicationGroupId = replicationGroupId; + this.clusterService = clusterService; + this.raftConfiguration = raftConfiguration; + this.placementDriverNodesNamesProvider = placementDriverNodesNamesProvider; + this.logicalTopologyService = logicalTopologyService; + this.raftClientExecutor = raftClientExecutor; + + raftClientFuture = new CompletableFuture<>(); } /** {@inheritDoc} */ @Override public void start() { + placementDriverNodesNamesProvider.get() + .thenCompose(placementDriverNodes -> { + String thisNodeName = clusterService.topologyService().localMember().name(); + + if (placementDriverNodes.contains(thisNodeName)) { + return TopologyAwareRaftGroupService.start( + replicationGroupId, + clusterService, + raftMessagesFactory, + raftConfiguration, + PeersAndLearners.fromConsistentIds(placementDriverNodes), + true, + raftClientExecutor, + logicalTopologyService, + true + ).thenCompose(client -> { + TopologyAwareRaftGroupService topologyAwareClient = (TopologyAwareRaftGroupService) client; + return topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v -> topologyAwareClient); + }); + } else { + return completedFuture(null); + } + }) + .whenComplete((client, ex) -> { + if (ex == null) { + raftClientFuture.complete(client); + } else { + raftClientFuture.completeExceptionally(ex); + } + }); + } + + /** {@inheritDoc} */ + @Override + public void beforeNodeStop() { + withRaftClientIfPresent(c -> c.unsubscribeLeader().join()); } /** {@inheritDoc} */ @@ -56,5 +150,47 @@ public class PlacementDriverManager implements IgniteComponent { } busyLock.block(); + + withRaftClientIfPresent(TopologyAwareRaftGroupService::shutdown); + } + + private void withRaftClientIfPresent(Consumer<TopologyAwareRaftGroupService> closure) { + raftClientFuture.thenAccept(client -> { + if (client != null) { + closure.accept(client); + } + }); + } + + private void onLeaderChange(ClusterNode leader, Long term) { + if (term > lastTermSeen) { + if (leader.equals(clusterService.topologyService().localMember())) { + takeOverActiveActor(); + } else { + stepDownActiveActor(); + } + + lastTermSeen = term; + } + } + + /** + * Takes over active actor of placement driver group. + */ + private void takeOverActiveActor() { + isActiveActor = true; + } + + + /** + * Steps down as active actor. + */ + private void stepDownActiveActor() { + isActiveActor = false; + } + + @TestOnly + boolean isActiveActor() { + return isActiveActor; } } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java index 47d16c65cf..d77a1f58b5 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java @@ -25,7 +25,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; @@ -78,6 +78,11 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { /** RAFT configuration. */ private final RaftConfiguration raftConfiguration; + /** + * Whether to notify callback after subscription to pass the current leader and term into it, even if the leader + * did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). + */ + private final boolean notifyOnSubscription; /** * The constructor. @@ -87,6 +92,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param executor RPC executor. * @param raftClient RPC RAFT client. * @param logicalTopologyService Logical topology. + * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, + * even if the leader did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). */ private TopologyAwareRaftGroupService( ClusterService cluster, @@ -94,7 +101,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { ScheduledExecutorService executor, RaftConfiguration raftConfiguration, RaftGroupService raftClient, - LogicalTopologyService logicalTopologyService + LogicalTopologyService logicalTopologyService, + boolean notifyOnSubscription ) { this.clusterService = cluster; this.factory = factory; @@ -103,6 +111,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { this.raftClient = raftClient; this.logicalTopologyService = logicalTopologyService; this.serverEventHandler = new ServerEventHandler(); + this.notifyOnSubscription = notifyOnSubscription; cluster.messagingService().addMessageHandler(RaftMessageGroup.class, (message, senderConsistentId, correlationId) -> { if (message instanceof LeaderChangeNotification) { @@ -154,6 +163,8 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * @param getLeader True to get the group's leader upon service creation. * @param executor RPC executor. * @param logicalTopologyService Logical topology service. + * @param notifyOnSubscription Whether to notify callback after subscription to pass the current leader and term into it, + * even if the leader did not change in that moment (see {@link #subscribeLeader(BiConsumer)}). * @return Future to create a raft client. */ public static CompletableFuture<RaftGroupService> start( @@ -164,11 +175,12 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { PeersAndLearners configuration, boolean getLeader, ScheduledExecutorService executor, - LogicalTopologyService logicalTopologyService + LogicalTopologyService logicalTopologyService, + boolean notifyOnSubscription ) { return RaftGroupServiceImpl.start(groupId, cluster, factory, raftConfiguration, configuration, getLeader, executor) .thenApply(raftGroupService -> new TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration, - raftGroupService, logicalTopologyService)); + raftGroupService, logicalTopologyService, notifyOnSubscription)); } /** @@ -247,8 +259,9 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * Assigns a closure to call when a leader will is elected. * * @param callback Callback closure. + * @return Future that is completed when all subscription messages to peers are sent. */ - public void subscribeLeader(Consumer<ClusterNode> callback) { + public CompletableFuture<Void> subscribeLeader(BiConsumer<ClusterNode, Long> callback) { assert !serverEventHandler.isSubscribed() : "The node already subscribed"; int peers = peers().size(); @@ -272,38 +285,51 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { } } - CompletableFuture.allOf(futs).whenCompleteAsync((unused, throwable) -> { - if (throwable != null) { - throw new IgniteException(Common.UNEXPECTED_ERR, throwable); - } - - refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> { - if (leaderWithTerm.leader() != null) { - serverEventHandler.onLeaderElected( - clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()), - leaderWithTerm.term() - ); + if (notifyOnSubscription) { + return CompletableFuture.allOf(futs).whenCompleteAsync((unused, throwable) -> { + if (throwable != null) { + throw new IgniteException(Common.UNEXPECTED_ERR, throwable); } + + refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> { + if (leaderWithTerm.leader() != null) { + serverEventHandler.onLeaderElected( + clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()), + leaderWithTerm.term() + ); + } + }, executor); }, executor); - }, executor); + } else { + return CompletableFuture.allOf(futs); + } } /** * Unsubscribe of notification about a leader elected. + * + * @return Future that is completed when all messages about cancelling subscription to peers are sent. */ - public void unsubscribeLeader() { + public CompletableFuture<Void> unsubscribeLeader() { serverEventHandler.setOnLeaderElectedCallback(null); - for (Peer peer : peers()) { + var peers = peers(); + var futs = new CompletableFuture[peers.size()]; + + for (int i = 0; i < peers.size(); i++) { + Peer peer = peers.get(i); + ClusterNode node = clusterService.topologyService().getByConsistentId(peer.consistentId()); if (node != null) { - sendSubscribeMessage(node, factory.subscriptionLeaderChangeRequest() + futs[i] = sendSubscribeMessage(node, factory.subscriptionLeaderChangeRequest() .groupId(groupId()) .subscribe(false) .build()); } } + + return CompletableFuture.allOf(futs); } @Override @@ -409,7 +435,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { private long term = 0; /** A leader elected callback. */ - private Consumer<ClusterNode> onLeaderElectedCallback; + private BiConsumer<ClusterNode, Long> onLeaderElectedCallback; /** * Notifies about a new leader elected, if it did not make before. @@ -421,7 +447,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { if (onLeaderElectedCallback != null && term > this.term) { this.term = term; - onLeaderElectedCallback.accept(node); + onLeaderElectedCallback.accept(node, term); } } @@ -430,7 +456,7 @@ public class TopologyAwareRaftGroupService implements RaftGroupService { * * @param onLeaderElectedCallback A callback closure. */ - public synchronized void setOnLeaderElectedCallback(Consumer<ClusterNode> onLeaderElectedCallback) { + public synchronized void setOnLeaderElectedCallback(BiConsumer<ClusterNode, Long> onLeaderElectedCallback) { this.onLeaderElectedCallback = onLeaderElectedCallback; } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 18e6836780..27d2c1d638 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -22,7 +22,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.logger.IgniteLogger; @@ -37,7 +36,6 @@ import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteStringFormatter; import org.apache.ignite.lang.NodeStoppingException; @@ -62,7 +60,7 @@ public class Loza implements RaftManager { public static final String CLIENT_POOL_NAME = "Raft-Group-Client"; /** Raft client pool size. Size was taken from jraft's TimeManager. */ - private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20); + public static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20); /** Logger. */ private static final IgniteLogger LOG = Loggers.forClass(Loza.class); @@ -94,12 +92,14 @@ public class Loza implements RaftManager { * @param raftConfiguration Raft configuration. * @param dataPath Data path. * @param clock A hybrid logical clock. + * @param executor Executor for raft group services. */ public Loza( ClusterService clusterNetSvc, RaftConfiguration raftConfiguration, Path dataPath, - HybridClock clock + HybridClock clock, + ScheduledExecutorService executor ) { this.clusterNetSvc = clusterNetSvc; this.raftConfiguration = raftConfiguration; @@ -112,9 +112,32 @@ public class Loza implements RaftManager { this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options); - this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE, - new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(), - CLIENT_POOL_NAME), LOG + this.executor = executor; + } + + /** + * The constructor. + * + * @param clusterNetSvc Cluster network service. + * @param raftConfiguration Raft configuration. + * @param dataPath Data path. + * @param clock A hybrid logical clock. + */ + public Loza( + ClusterService clusterNetSvc, + RaftConfiguration raftConfiguration, + Path dataPath, + HybridClock clock + ) { + this( + clusterNetSvc, + raftConfiguration, + dataPath, + clock, + new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE, + new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(), + CLIENT_POOL_NAME), LOG + ) ) ); } @@ -136,8 +159,6 @@ public class Loza implements RaftManager { busyLock.block(); - IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); - raftServer.stop(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java index cfe26eadc4..a538012a48 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java @@ -17,12 +17,13 @@ package org.apache.ignite.internal.raft.server.impl; +import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.network.ClusterNode; /** @@ -48,9 +49,13 @@ public class RaftServiceEventListener { actions = new HashSet<>(); } - actions.add(notifyAction); + var finalActions = actions; nodesSubscriptions.compute(subscriber, (node, nodeActions) -> { + if (!nullOrEmpty(finalActions) && !nullOrEmpty(nodeActions)) { + return nodeActions; + } + if (nodeActions == null) { nodeActions = new HashSet<>(); } @@ -60,6 +65,8 @@ public class RaftServiceEventListener { return nodeActions; }); + actions.add(notifyAction); + return actions; }); } @@ -77,7 +84,7 @@ public class RaftServiceEventListener { grpNodeActions.retainAll(nodeActions); - if (CollectionUtils.nullOrEmpty(grpNodeActions)) { + if (nullOrEmpty(grpNodeActions)) { return nodeActions; } @@ -88,14 +95,14 @@ public class RaftServiceEventListener { nodeActions.remove(actionToRemove); actions.remove(actionToRemove); - if (CollectionUtils.nullOrEmpty(nodeActions)) { + if (nullOrEmpty(nodeActions)) { return null; } return nodeActions; }); - if (CollectionUtils.nullOrEmpty(actions)) { + if (nullOrEmpty(actions)) { return null; } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index afe6311409..8ac95b3be7 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.app; +import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME; +import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -29,6 +32,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; @@ -71,6 +77,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.metrics.configuration.MetricConfiguration; import org.apache.ignite.internal.metrics.rest.MetricRestFactory; @@ -108,6 +115,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.TxMessageGroup; +import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.VaultService; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; @@ -253,6 +261,8 @@ public class IgniteImpl implements Ignite { private final RestAddressReporter restAddressReporter; + private final ScheduledExecutorService raftExecutorService; + /** * The Constructor. * @@ -310,11 +320,20 @@ public class IgniteImpl implements Ignite { clock = new HybridClockImpl(); + RaftConfiguration raftConfiguration = nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY); + + raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE, + new NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterSvc.localConfiguration().getName(), + CLIENT_POOL_NAME), LOG + ) + ); + raftMgr = new Loza( clusterSvc, - nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY), + raftConfiguration, workDir, - clock + clock, + raftExecutorService ); LockManager lockMgr = new HeapLockManager(); @@ -354,7 +373,14 @@ public class IgniteImpl implements Ignite { new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH)) ); - placementDriverMgr = new PlacementDriverManager(metaStorageMgr); + placementDriverMgr = new PlacementDriverManager( + MetastorageGroupId.INSTANCE, + clusterSvc, + raftConfiguration, + cmgMgr::metaStorageNodes, + logicalTopologyService, + raftExecutorService + ); this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, vaultMgr); @@ -675,6 +701,7 @@ public class IgniteImpl implements Ignite { public void stop() { lifecycleManager.stopNode(); restAddressReporter.removeReport(); + IgniteUtils.shutdownAndAwaitTermination(raftExecutorService, 10, TimeUnit.SECONDS); } /** {@inheritDoc} */