This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ee26ae637c IGNITE-18640 Implement placement driver best-effort single 
actor selector and fail-over (#1692)
ee26ae637c is described below

commit ee26ae637c7b200c993b9a7dee36f2dbebdda4df
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Tue Feb 21 15:56:43 2023 +0200

    IGNITE-18640 Implement placement driver best-effort single actor selector 
and fail-over (#1692)
---
 .../internal/placementdriver/ActiveActorTest.java  |  88 +++++++++
 .../client/TopologyAwareRaftGroupServiceTest.java  | 212 +++++++++++++++++----
 .../placementdriver/PlacementDriverManager.java    | 142 +++++++++++++-
 .../raft/client/TopologyAwareRaftGroupService.java |  72 ++++---
 .../java/org/apache/ignite/internal/raft/Loza.java |  39 +++-
 .../raft/server/impl/RaftServiceEventListener.java |  17 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  33 +++-
 7 files changed, 526 insertions(+), 77 deletions(-)

diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
new file mode 100644
index 0000000000..4eb18b7e85
--- /dev/null
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest;
+import org.apache.ignite.network.ClusterService;
+import org.junit.jupiter.api.AfterEach;
+
+/**
+ * Placement driver active actor test.
+ */
+public class ActiveActorTest extends TopologyAwareRaftGroupServiceTest {
+    private Map<String, PlacementDriverManager> placementDriverManagers = new 
HashMap<>();
+
+    @AfterEach
+    @Override
+    protected void afterTest() throws Exception {
+        List<AutoCloseable> closeables = 
placementDriverManagers.values().stream().map(p -> (AutoCloseable) 
p::stop).collect(toList());
+
+        closeAll(closeables);
+
+        placementDriverManagers.clear();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void afterNodeStart(String nodeName, ClusterService 
clusterService, Set<String> placementDriverNodesNames) {
+        PlacementDriverManager placementDriverManager = new 
PlacementDriverManager(
+                TestReplicationGroup.GROUP_ID,
+                clusterService,
+                raftConfiguration,
+                () -> completedFuture(placementDriverNodesNames),
+                new LogicalTopologyServiceTestImpl(clusterService),
+                executor
+        );
+
+        placementDriverManager.start();
+
+        placementDriverManagers.put(nodeName, placementDriverManager);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected boolean afterInitCheckCondition(String leaderName) {
+        return checkSingleActiveActor(leaderName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected boolean afterLeaderChangeCheckCondition(String leaderName) {
+        return checkSingleActiveActor(leaderName);
+    }
+
+    private boolean checkSingleActiveActor(String leaderName) {
+        for (Map.Entry<String, PlacementDriverManager> e : 
placementDriverManagers.entrySet()) {
+            if (e.getValue().isActiveActor() != e.getKey().equals(leaderName)) 
{
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
index caef80fffe..14cc2537bd 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupServiceTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.raft.client;
 
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceTest.TestReplicationGroup.GROUP_ID;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -31,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -53,7 +56,6 @@ import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -62,6 +64,7 @@ import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -78,10 +81,10 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
     private static final int PORT_BASE = 1234;
 
     @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
+    protected RaftConfiguration raftConfiguration;
 
     /** RPC executor. */
-    private ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", 
log));
+    protected ScheduledExecutorService executor = new 
ScheduledThreadPoolExecutor(20, new NamedThreadFactory("Raft-Group-Client", 
log));
 
     @Test
     public void testOneNodeReplicationGroup(TestInfo testInfo) throws 
Exception {
@@ -99,15 +102,15 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
 
         CompletableFuture<ClusterNode> leaderFut = new CompletableFuture<>();
 
-        raftClient.subscribeLeader(node -> {
-            leaderFut.complete(node);
-        });
+        raftClient.subscribeLeader((node, term) -> leaderFut.complete(node));
 
         ClusterNode leader = leaderFut.get(10, TimeUnit.SECONDS);
 
         assertNotNull(leader);
         assertEquals(PORT_BASE, leader.address().port());
 
+        afterInitCheckConditionWithWait(leader.name());
+
         stopCluster(clusterServices, raftServers, raftClient, 2);
     }
 
@@ -115,23 +118,45 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
     public void testChangeLeaderWhenActualLeft(TestInfo testInfo) throws 
Exception {
         var clusterServices = new HashMap<NetworkAddress, ClusterService>();
         var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+        Predicate<NetworkAddress> isServerAddress = addr -> addr.port() < 
PORT_BASE + 3;
 
         TopologyAwareRaftGroupService raftClient = startCluster(
                 testInfo,
                 clusterServices,
                 raftServers,
-                addr -> addr.port() < PORT_BASE + 3,
+                isServerAddress,
                 4,
                 PORT_BASE + 3
         );
 
+        raftClient.refreshLeader().get();
+
+        TopologyAwareRaftGroupService raftClientNoInitialNotify = 
startTopologyAwareClient(
+                clusterServices.entrySet().iterator().next().getValue(),
+                clusterServices,
+                isServerAddress,
+                4,
+                false
+        );
+
         AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+        AtomicReference<ClusterNode> leaderRefNoInitialNotify = new 
AtomicReference<>();
+        AtomicInteger callsCount = new AtomicInteger();
+
+        raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
 
-        raftClient.subscribeLeader(node -> {
-            leaderRef.set(node);
-        });
+        for (int i = 0; i < 2; i++) {
+            raftClientNoInitialNotify.unsubscribeLeader();
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() != 
null, 10_000));
+            raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+                callsCount.incrementAndGet();
+                leaderRefNoInitialNotify.set(node);
+            });
+        }
+
+        assertTrue(callsCount.get() <= 1);
+
+        assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
 
         ClusterNode leader = leaderRef.get();
 
@@ -139,16 +164,21 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
 
         log.info("Leader: " + leader);
 
+        afterInitCheckConditionWithWait(leader.name());
+
         var raftServiceToStop = raftServers.remove(new 
NetworkAddress("localhost", leader.address().port()));
         raftServiceToStop.stopRaftNodes(GROUP_ID);
         raftServiceToStop.stop();
 
         clusterServices.remove(new NetworkAddress("localhost", 
leader.address().port())).stop();
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
!leader.equals(leaderRef.get()), 10_000));
+        assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()), 
10_000));
+        assertTrue(waitForCondition(() -> 
!leader.equals(leaderRefNoInitialNotify.get()), 1000));
 
         log.info("New Leader: " + leaderRef.get());
 
+        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
         raftClient.refreshLeader().get();
 
         assertEquals(raftClient.leader().consistentId(), 
leaderRef.get().name());
@@ -160,23 +190,45 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
     public void testChangeLeaderForce(TestInfo testInfo) throws Exception {
         var clusterServices = new HashMap<NetworkAddress, ClusterService>();
         var raftServers = new HashMap<NetworkAddress, JraftServerImpl>();
+        Predicate<NetworkAddress> isServerAddress = addr -> addr.port() < 
PORT_BASE + 3;
 
         TopologyAwareRaftGroupService raftClient = startCluster(
                 testInfo,
                 clusterServices,
                 raftServers,
-                addr -> addr.port() < PORT_BASE + 3,
+                isServerAddress,
                 4,
                 PORT_BASE + 3
         );
 
+        raftClient.refreshLeader().get();
+
+        TopologyAwareRaftGroupService raftClientNoInitialNotify = 
startTopologyAwareClient(
+                clusterServices.entrySet().iterator().next().getValue(),
+                clusterServices,
+                isServerAddress,
+                4,
+                false
+        );
+
         AtomicReference<ClusterNode> leaderRef = new AtomicReference<>();
+        AtomicReference<ClusterNode> leaderRefNoInitialNotify = new 
AtomicReference<>();
+        AtomicInteger callsCount = new AtomicInteger();
+
+        raftClient.subscribeLeader((node, term) -> leaderRef.set(node));
+
+        for (int i = 0; i < 2; i++) {
+            raftClientNoInitialNotify.unsubscribeLeader();
+
+            raftClientNoInitialNotify.subscribeLeader((node, term) -> {
+                callsCount.incrementAndGet();
+                leaderRefNoInitialNotify.set(node);
+            });
+        }
 
-        raftClient.subscribeLeader(node -> {
-            leaderRef.set(node);
-        });
+        assertTrue(callsCount.get() <= 1);
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> leaderRef.get() != 
null, 10_000));
+        assertTrue(waitForCondition(() -> leaderRef.get() != null, 10_000));
 
         ClusterNode leader = leaderRef.get();
 
@@ -184,16 +236,25 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
 
         log.info("Leader: " + leader);
 
+        afterInitCheckConditionWithWait(leader.name());
+
         Peer newLeaderPeer = raftClient.peers().stream().filter(peer -> 
!leader.name().equals(peer.consistentId())).findAny().get();
 
         log.info("Peer to transfer leader: " + newLeaderPeer);
 
         raftClient.transferLeadership(newLeaderPeer).get();
 
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
newLeaderPeer.consistentId().equals(leaderRef.get().name()), 10_000));
+        String leaderId = newLeaderPeer.consistentId();
+
+        assertTrue(waitForCondition(() -> 
leaderId.equals(leaderRef.get().name()), 10_000));
+        assertTrue(waitForCondition(
+                () -> leaderRefNoInitialNotify.get() != null && 
leaderId.equals(leaderRefNoInitialNotify.get().name()), 1000)
+        );
 
         log.info("New Leader: " + leaderRef.get());
 
+        afterLeaderChangeCheckConditionWithWait(leaderRef.get().name());
+
         raftClient.refreshLeader().get();
 
         assertEquals(raftClient.leader().consistentId(), 
leaderRef.get().name());
@@ -266,14 +327,13 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
             clusterServices.put(addr, cluster);
         }
 
+        PeersAndLearners peersAndLearners = peersAndLearners(clusterServices, 
isServerAddress, nodes);
+
+        Set<String> placementDriverNodesNames = 
peersAndLearners.peers().stream().map(Peer::consistentId).collect(toSet());
+
         for (NetworkAddress addr : addresses) {
             var cluster = clusterServices.get(addr);
 
-            PeersAndLearners peersAndLearners = 
PeersAndLearners.fromConsistentIds(
-                    addresses.stream().filter(isServerAddress)
-                            .map(netAddr -> 
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
-                                    Collectors.toSet()));
-
             if (isServerAddress.test(addr)) { //RAFT server node
                 var localPeer = peersAndLearners.peers().stream()
                         .filter(peer -> 
peer.consistentId().equals(cluster.topologyService().localMember().name())).findAny().get();
@@ -289,24 +349,49 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
                 );
 
                 raftServers.put(addr, raftServer);
+
+                afterNodeStart(localPeer.consistentId(), cluster, 
placementDriverNodesNames);
             }
 
             if (addr.port() == clientPort) {
-                raftClient = (TopologyAwareRaftGroupService) 
TopologyAwareRaftGroupService.start(
-                        GROUP_ID,
-                        cluster,
-                        FACTORY,
-                        raftConfiguration,
-                        peersAndLearners,
-                        true,
-                        executor,
-                        new LogicalTopologyServiceTestImpl(cluster)
-                ).join();
+                raftClient = startTopologyAwareClient(cluster, 
clusterServices, isServerAddress, nodes, true);
             }
         }
+
         return raftClient;
     }
 
+    private TopologyAwareRaftGroupService startTopologyAwareClient(
+            ClusterService localClusterService,
+            HashMap<NetworkAddress, ClusterService> clusterServices,
+            Predicate<NetworkAddress> isServerAddress,
+            int nodes,
+            boolean notifyOnSubscription
+    ) {
+        return (TopologyAwareRaftGroupService) 
TopologyAwareRaftGroupService.start(
+                GROUP_ID,
+                localClusterService,
+                FACTORY,
+                raftConfiguration,
+                peersAndLearners(clusterServices, isServerAddress, nodes),
+                true,
+                executor,
+                new LogicalTopologyServiceTestImpl(localClusterService),
+                notifyOnSubscription
+        ).join();
+    }
+
+    private static PeersAndLearners peersAndLearners(
+            HashMap<NetworkAddress, ClusterService> clusterServices,
+            Predicate<NetworkAddress> isServerAddress,
+            int nodes
+    ) {
+        return PeersAndLearners.fromConsistentIds(
+                getNetworkAddresses(nodes).stream().filter(isServerAddress)
+                        .map(netAddr -> 
clusterServices.get(netAddr).topologyService().localMember().name()).collect(
+                                toSet()));
+    }
+
     /**
      * Generates a node address for each node.
      *
@@ -320,6 +405,62 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
         return addresses;
     }
 
+    @AfterEach
+    protected void afterTest() throws Exception {
+        // No-op.
+    }
+
+    /**
+     * The method is called after every node of the cluster starts.
+     *
+     * @param nodeName Node name.
+     * @param clusterService Cluster service.
+     * @param placementDriverNodesNames Names of all nodes in raft group.
+     */
+    protected void afterNodeStart(String nodeName, ClusterService 
clusterService, Set<String> placementDriverNodesNames) {
+        // No-op.
+    }
+
+    /**
+     * Checks the condition after cluster and raft clients initialization, 
waiting for this condition.
+     *
+     * @param leaderName Current leader name.
+     * @throws InterruptedException If failed.
+     */
+    private void afterInitCheckConditionWithWait(String leaderName) throws 
InterruptedException {
+        assertTrue(waitForCondition(() -> afterInitCheckCondition(leaderName), 
10_000));
+    }
+
+    /**
+     * Checks the condition after cluster and raft clients initialization.
+     *
+     * @param leaderName Current leader name.
+     * @return Condition result.
+     */
+    protected boolean afterInitCheckCondition(String leaderName) {
+        return true;
+    }
+
+    /**
+     * Checks the condition after leader change, waiting for this condition.
+     *
+     * @param leaderName Current leader name.
+     * @throws InterruptedException If failed.
+     */
+    private void afterLeaderChangeCheckConditionWithWait(String leaderName) 
throws InterruptedException {
+        assertTrue(waitForCondition(() -> 
afterLeaderChangeCheckCondition(leaderName), 10_000));
+    }
+
+    /**
+     * Checks the condition after leader change.
+     *
+     * @param leaderName Current leader name.
+     * @return Condition result.
+     */
+    protected boolean afterLeaderChangeCheckCondition(String leaderName) {
+        return true;
+    }
+
     /**
      * Replication test group class.
      */
@@ -360,7 +501,10 @@ public class TopologyAwareRaftGroupServiceTest extends 
IgniteAbstractTest {
         }
     }
 
-    private static class LogicalTopologyServiceTestImpl implements 
LogicalTopologyService {
+    /**
+     * Test implementation of {@link LogicalTopologyService}.
+     */
+    protected static class LogicalTopologyServiceTestImpl implements 
LogicalTopologyService {
         private final ClusterService clusterService;
 
         public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index a30a9c040f..cbf07def6e 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -17,10 +17,25 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Placement driver manager.
@@ -31,21 +46,100 @@ public class PlacementDriverManager implements 
IgniteComponent {
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
+    private final RaftMessagesFactory raftMessagesFactory = new 
RaftMessagesFactory();
+
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final ReplicationGroupId replicationGroupId;
+
+    private final ClusterService clusterService;
+
+    private final Supplier<CompletableFuture<Set<String>>> 
placementDriverNodesNamesProvider;
+
+    /**
+     * Raft client future. Can contain null, if this node is not in placement 
driver group.
+     */
+    private final CompletableFuture<TopologyAwareRaftGroupService> 
raftClientFuture;
+
+    private final ScheduledExecutorService raftClientExecutor;
+
+    private final LogicalTopologyService logicalTopologyService;
+
+    private final RaftConfiguration raftConfiguration;
+
+    private volatile boolean isActiveActor;
+
+    private volatile long lastTermSeen = -1;
+
     /**
      * The constructor.
      *
-     * @param metaStorageMgr Meta Storage manager.
+     * @param replicationGroupId Id of placement driver group.
+     * @param clusterService Cluster service.
+     * @param raftConfiguration Raft configuration.
+     * @param placementDriverNodesNamesProvider Provider of the set of 
placement driver nodes' names.
+     * @param logicalTopologyService Logical topology service.
+     * @param raftClientExecutor Raft client executor.
      */
-    public PlacementDriverManager(MetaStorageManager metaStorageMgr) {
+    public PlacementDriverManager(
+            ReplicationGroupId replicationGroupId,
+            ClusterService clusterService,
+            RaftConfiguration raftConfiguration,
+            Supplier<CompletableFuture<Set<String>>> 
placementDriverNodesNamesProvider,
+            LogicalTopologyService logicalTopologyService,
+            ScheduledExecutorService raftClientExecutor
+    ) {
+        this.replicationGroupId = replicationGroupId;
+        this.clusterService = clusterService;
+        this.raftConfiguration = raftConfiguration;
+        this.placementDriverNodesNamesProvider = 
placementDriverNodesNamesProvider;
+        this.logicalTopologyService = logicalTopologyService;
+        this.raftClientExecutor = raftClientExecutor;
+
+        raftClientFuture = new CompletableFuture<>();
     }
 
     /** {@inheritDoc} */
     @Override
     public void start() {
+        placementDriverNodesNamesProvider.get()
+                .thenCompose(placementDriverNodes -> {
+                    String thisNodeName = 
clusterService.topologyService().localMember().name();
+
+                    if (placementDriverNodes.contains(thisNodeName)) {
+                        return TopologyAwareRaftGroupService.start(
+                                replicationGroupId,
+                                clusterService,
+                                raftMessagesFactory,
+                                raftConfiguration,
+                                
PeersAndLearners.fromConsistentIds(placementDriverNodes),
+                                true,
+                                raftClientExecutor,
+                                logicalTopologyService,
+                                true
+                            ).thenCompose(client -> {
+                                TopologyAwareRaftGroupService 
topologyAwareClient = (TopologyAwareRaftGroupService) client;
 
+                                return 
topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v -> 
topologyAwareClient);
+                            });
+                    } else {
+                        return completedFuture(null);
+                    }
+                })
+                .whenComplete((client, ex) -> {
+                    if (ex == null) {
+                        raftClientFuture.complete(client);
+                    } else {
+                        raftClientFuture.completeExceptionally(ex);
+                    }
+                });
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void beforeNodeStop() {
+        withRaftClientIfPresent(c -> c.unsubscribeLeader().join());
     }
 
     /** {@inheritDoc} */
@@ -56,5 +150,47 @@ public class PlacementDriverManager implements 
IgniteComponent {
         }
 
         busyLock.block();
+
+        withRaftClientIfPresent(TopologyAwareRaftGroupService::shutdown);
+    }
+
+    private void 
withRaftClientIfPresent(Consumer<TopologyAwareRaftGroupService> closure) {
+        raftClientFuture.thenAccept(client -> {
+            if (client != null) {
+                closure.accept(client);
+            }
+        });
+    }
+
+    private void onLeaderChange(ClusterNode leader, Long term) {
+        if (term > lastTermSeen) {
+            if (leader.equals(clusterService.topologyService().localMember())) 
{
+                takeOverActiveActor();
+            } else {
+                stepDownActiveActor();
+            }
+
+            lastTermSeen = term;
+        }
+    }
+
+    /**
+     * Takes over active actor of placement driver group.
+     */
+    private void takeOverActiveActor() {
+        isActiveActor = true;
+    }
+
+
+    /**
+     * Steps down as active actor.
+     */
+    private void stepDownActiveActor() {
+        isActiveActor = false;
+    }
+
+    @TestOnly
+    boolean isActiveActor() {
+        return isActiveActor;
     }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 47d16c65cf..d77a1f58b5 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -25,7 +25,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
@@ -78,6 +78,11 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
     /** RAFT configuration. */
     private final RaftConfiguration raftConfiguration;
 
+    /**
+     * Whether to notify callback after subscription to pass the current 
leader and term into it, even if the leader
+     * did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
+     */
+    private final boolean notifyOnSubscription;
 
     /**
      * The constructor.
@@ -87,6 +92,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param executor RPC executor.
      * @param raftClient RPC RAFT client.
      * @param logicalTopologyService Logical topology.
+     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it,
+     *        even if the leader did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
      */
     private TopologyAwareRaftGroupService(
             ClusterService cluster,
@@ -94,7 +101,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             ScheduledExecutorService executor,
             RaftConfiguration raftConfiguration,
             RaftGroupService raftClient,
-            LogicalTopologyService logicalTopologyService
+            LogicalTopologyService logicalTopologyService,
+            boolean notifyOnSubscription
     ) {
         this.clusterService = cluster;
         this.factory = factory;
@@ -103,6 +111,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
         this.raftClient = raftClient;
         this.logicalTopologyService = logicalTopologyService;
         this.serverEventHandler = new ServerEventHandler();
+        this.notifyOnSubscription = notifyOnSubscription;
 
         cluster.messagingService().addMessageHandler(RaftMessageGroup.class, 
(message, senderConsistentId, correlationId) -> {
             if (message instanceof LeaderChangeNotification) {
@@ -154,6 +163,8 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * @param getLeader True to get the group's leader upon service creation.
      * @param executor RPC executor.
      * @param logicalTopologyService Logical topology service.
+     * @param notifyOnSubscription Whether to notify callback after 
subscription to pass the current leader and term into it,
+     *        even if the leader did not change in that moment (see {@link 
#subscribeLeader(BiConsumer)}).
      * @return Future to create a raft client.
      */
     public static CompletableFuture<RaftGroupService> start(
@@ -164,11 +175,12 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             PeersAndLearners configuration,
             boolean getLeader,
             ScheduledExecutorService executor,
-            LogicalTopologyService logicalTopologyService
+            LogicalTopologyService logicalTopologyService,
+            boolean notifyOnSubscription
     ) {
         return RaftGroupServiceImpl.start(groupId, cluster, factory, 
raftConfiguration, configuration, getLeader, executor)
                 .thenApply(raftGroupService -> new 
TopologyAwareRaftGroupService(cluster, factory, executor, raftConfiguration,
-                        raftGroupService, logicalTopologyService));
+                        raftGroupService, logicalTopologyService, 
notifyOnSubscription));
     }
 
     /**
@@ -247,8 +259,9 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
      * Assigns a closure to call when a leader will is elected.
      *
      * @param callback Callback closure.
+     * @return Future that is completed when all subscription messages to 
peers are sent.
      */
-    public void subscribeLeader(Consumer<ClusterNode> callback) {
+    public CompletableFuture<Void> subscribeLeader(BiConsumer<ClusterNode, 
Long> callback) {
         assert !serverEventHandler.isSubscribed() : "The node already 
subscribed";
 
         int peers = peers().size();
@@ -272,38 +285,51 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             }
         }
 
-        CompletableFuture.allOf(futs).whenCompleteAsync((unused, throwable) -> 
{
-            if (throwable != null) {
-                throw new IgniteException(Common.UNEXPECTED_ERR, throwable);
-            }
-
-            refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm -> {
-                if (leaderWithTerm.leader() != null) {
-                    serverEventHandler.onLeaderElected(
-                            
clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
-                            leaderWithTerm.term()
-                    );
+        if (notifyOnSubscription) {
+            return CompletableFuture.allOf(futs).whenCompleteAsync((unused, 
throwable) -> {
+                if (throwable != null) {
+                    throw new IgniteException(Common.UNEXPECTED_ERR, 
throwable);
                 }
+
+                refreshAndGetLeaderWithTerm().thenAcceptAsync(leaderWithTerm 
-> {
+                    if (leaderWithTerm.leader() != null) {
+                        serverEventHandler.onLeaderElected(
+                                
clusterService.topologyService().getByConsistentId(leaderWithTerm.leader().consistentId()),
+                                leaderWithTerm.term()
+                        );
+                    }
+                }, executor);
             }, executor);
-        }, executor);
+        } else {
+            return CompletableFuture.allOf(futs);
+        }
     }
 
     /**
      * Unsubscribe of notification about a leader elected.
+     *
+     * @return Future that is completed when all messages about cancelling 
subscription to peers are sent.
      */
-    public void unsubscribeLeader() {
+    public CompletableFuture<Void> unsubscribeLeader() {
         serverEventHandler.setOnLeaderElectedCallback(null);
 
-        for (Peer peer : peers()) {
+        var peers = peers();
+        var futs = new CompletableFuture[peers.size()];
+
+        for (int i = 0; i < peers.size(); i++) {
+            Peer peer = peers.get(i);
+
             ClusterNode node = 
clusterService.topologyService().getByConsistentId(peer.consistentId());
 
             if (node != null) {
-                sendSubscribeMessage(node, 
factory.subscriptionLeaderChangeRequest()
+                futs[i] = sendSubscribeMessage(node, 
factory.subscriptionLeaderChangeRequest()
                         .groupId(groupId())
                         .subscribe(false)
                         .build());
             }
         }
+
+        return CompletableFuture.allOf(futs);
     }
 
     @Override
@@ -409,7 +435,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
         private long term = 0;
 
         /** A leader elected callback. */
-        private Consumer<ClusterNode> onLeaderElectedCallback;
+        private BiConsumer<ClusterNode, Long> onLeaderElectedCallback;
 
         /**
          * Notifies about a new leader elected, if it did not make before.
@@ -421,7 +447,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             if (onLeaderElectedCallback != null && term > this.term) {
                 this.term = term;
 
-                onLeaderElectedCallback.accept(node);
+                onLeaderElectedCallback.accept(node, term);
             }
         }
 
@@ -430,7 +456,7 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
          *
          * @param onLeaderElectedCallback A callback closure.
          */
-        public synchronized void 
setOnLeaderElectedCallback(Consumer<ClusterNode> onLeaderElectedCallback) {
+        public synchronized void 
setOnLeaderElectedCallback(BiConsumer<ClusterNode, Long> 
onLeaderElectedCallback) {
             this.onLeaderElectedCallback = onLeaderElectedCallback;
         }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 18e6836780..27d2c1d638 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -22,7 +22,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -37,7 +36,6 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -62,7 +60,7 @@ public class Loza implements RaftManager {
     public static final String CLIENT_POOL_NAME = "Raft-Group-Client";
 
     /** Raft client pool size. Size was taken from jraft's TimeManager. */
-    private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
+    public static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(Loza.class);
@@ -94,12 +92,14 @@ public class Loza implements RaftManager {
      * @param raftConfiguration Raft configuration.
      * @param dataPath Data path.
      * @param clock A hybrid logical clock.
+     * @param executor Executor for raft group services.
      */
     public Loza(
             ClusterService clusterNetSvc,
             RaftConfiguration raftConfiguration,
             Path dataPath,
-            HybridClock clock
+            HybridClock clock,
+            ScheduledExecutorService executor
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.raftConfiguration = raftConfiguration;
@@ -112,9 +112,32 @@ public class Loza implements RaftManager {
 
         this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, 
options);
 
-        this.executor = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
-                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
-                        CLIENT_POOL_NAME), LOG
+        this.executor = executor;
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param clusterNetSvc Cluster network service.
+     * @param raftConfiguration Raft configuration.
+     * @param dataPath Data path.
+     * @param clock A hybrid logical clock.
+     */
+    public Loza(
+            ClusterService clusterNetSvc,
+            RaftConfiguration raftConfiguration,
+            Path dataPath,
+            HybridClock clock
+    ) {
+        this(
+                clusterNetSvc,
+                raftConfiguration,
+                dataPath,
+                clock,
+                new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+                        new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),
+                                CLIENT_POOL_NAME), LOG
+                        )
                 )
         );
     }
@@ -136,8 +159,6 @@ public class Loza implements RaftManager {
 
         busyLock.block();
 
-        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
-
         raftServer.stop();
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
index cfe26eadc4..a538012a48 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/RaftServiceEventListener.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.internal.raft.server.impl;
 
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.ClusterNode;
 
 /**
@@ -48,9 +49,13 @@ public class RaftServiceEventListener {
                 actions = new HashSet<>();
             }
 
-            actions.add(notifyAction);
+            var finalActions = actions;
 
             nodesSubscriptions.compute(subscriber, (node, nodeActions) -> {
+                if (!nullOrEmpty(finalActions) && !nullOrEmpty(nodeActions)) {
+                    return nodeActions;
+                }
+
                 if (nodeActions == null) {
                     nodeActions = new HashSet<>();
                 }
@@ -60,6 +65,8 @@ public class RaftServiceEventListener {
                 return nodeActions;
             });
 
+            actions.add(notifyAction);
+
             return actions;
         });
     }
@@ -77,7 +84,7 @@ public class RaftServiceEventListener {
 
                 grpNodeActions.retainAll(nodeActions);
 
-                if (CollectionUtils.nullOrEmpty(grpNodeActions)) {
+                if (nullOrEmpty(grpNodeActions)) {
                     return nodeActions;
                 }
 
@@ -88,14 +95,14 @@ public class RaftServiceEventListener {
                 nodeActions.remove(actionToRemove);
                 actions.remove(actionToRemove);
 
-                if (CollectionUtils.nullOrEmpty(nodeActions)) {
+                if (nullOrEmpty(nodeActions)) {
                     return null;
                 }
 
                 return nodeActions;
             });
 
-            if (CollectionUtils.nullOrEmpty(actions)) {
+            if (nullOrEmpty(actions)) {
                 return null;
             }
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index afe6311409..8ac95b3be7 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.app;
 
+import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_NAME;
+import static org.apache.ignite.internal.raft.Loza.CLIENT_POOL_SIZE;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -29,6 +32,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -71,6 +77,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
 import org.apache.ignite.internal.metrics.rest.MetricRestFactory;
@@ -108,6 +115,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.VaultService;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -253,6 +261,8 @@ public class IgniteImpl implements Ignite {
 
     private final RestAddressReporter restAddressReporter;
 
+    private final ScheduledExecutorService raftExecutorService;
+
     /**
      * The Constructor.
      *
@@ -310,11 +320,20 @@ public class IgniteImpl implements Ignite {
 
         clock = new HybridClockImpl();
 
+        RaftConfiguration raftConfiguration = 
nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
+
+        raftExecutorService = new ScheduledThreadPoolExecutor(CLIENT_POOL_SIZE,
+                new 
NamedThreadFactory(NamedThreadFactory.threadPrefix(clusterSvc.localConfiguration().getName(),
+                        CLIENT_POOL_NAME), LOG
+                )
+        );
+
         raftMgr = new Loza(
                 clusterSvc,
-                nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY),
+                raftConfiguration,
                 workDir,
-                clock
+                clock,
+                raftExecutorService
         );
 
         LockManager lockMgr = new HeapLockManager();
@@ -354,7 +373,14 @@ public class IgniteImpl implements Ignite {
                 new RocksDbKeyValueStorage(name, 
workDir.resolve(METASTORAGE_DB_PATH))
         );
 
-        placementDriverMgr = new PlacementDriverManager(metaStorageMgr);
+        placementDriverMgr = new PlacementDriverManager(
+                MetastorageGroupId.INSTANCE,
+                clusterSvc,
+                raftConfiguration,
+                cmgMgr::metaStorageNodes,
+                logicalTopologyService,
+                raftExecutorService
+        );
 
         this.cfgStorage = new DistributedConfigurationStorage(metaStorageMgr, 
vaultMgr);
 
@@ -675,6 +701,7 @@ public class IgniteImpl implements Ignite {
     public void stop() {
         lifecycleManager.stopNode();
         restAddressReporter.removeReport();
+        IgniteUtils.shutdownAndAwaitTermination(raftExecutorService, 10, 
TimeUnit.SECONDS);
     }
 
     /** {@inheritDoc} */


Reply via email to