This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 84b760b54c IGNITE-22315 Make raft-client starting only once and only with raft-client and replica together (#3956) 84b760b54c is described below commit 84b760b54cbfe9ef5842b3b470a7e6f4be4ecdba Author: Mikhail Efremov <jakuten...@gmail.com> AuthorDate: Tue Jul 9 11:45:44 2024 +0600 IGNITE-22315 Make raft-client starting only once and only with raft-client and replica together (#3956) --- .../ignite/internal/index/ItBuildIndexTest.java | 10 +- .../metastorage/server/time/ClusterTime.java | 5 + .../metastorage/server/time/ClusterTimeImpl.java | 10 +- .../PartitionReplicaLifecycleManager.java | 4 +- .../ignite/internal/replicator/ReplicaManager.java | 61 ++-- .../app/ItIgniteInMemoryNodeRestartTest.java | 4 +- .../rebalance/ItRebalanceDistributedTest.java | 116 +++++-- .../internal/table/distributed/TableManager.java | 346 ++++++++++++--------- .../distributed/TableManagerRecoveryTest.java | 3 - .../table/distributed/TableManagerTest.java | 3 - .../apache/ignite/distributed/ItTxTestCluster.java | 5 - 11 files changed, 341 insertions(+), 226 deletions(-) diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java index 64ec13d0de..e873d3e895 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand; @@ -324,7 +325,14 @@ public class ItBuildIndexTest extends BaseSqlIntegrationTest { assertNotNull(indexDescriptor); for (int partitionId = 0; partitionId < internalTable.partitions(); partitionId++) { - RaftGroupService raftGroupService = internalTable.tableRaftService().partitionRaftGroupService(partitionId); + // Excluding partitions on the node outside of replication group + // TODO: will be replaced with replica usage in https://issues.apache.org/jira/browse/IGNITE-22218 + RaftGroupService raftGroupService; + try { + raftGroupService = internalTable.tableRaftService().partitionRaftGroupService(partitionId); + } catch (IgniteInternalException e) { + continue; + } List<Peer> allPeers = raftGroupService.peers(); diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java index 55d662bbfa..4dee69f665 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTime.java @@ -34,6 +34,11 @@ public interface ClusterTime { */ long nowLong(); + /** + * Returns current safe time. + */ + HybridTimestamp currentSafeTime(); + /** * Provides the future that is completed when cluster time reaches given one. If the time is greater or equal * then the given one, returns completed future. diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java index af74dcde63..9277d40bf0 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/time/ClusterTimeImpl.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.jetbrains.annotations.Nullable; -import org.jetbrains.annotations.TestOnly; /** * Cluster time implementation with additional methods to adjust time and update safe time. @@ -138,6 +137,11 @@ public class ClusterTimeImpl implements ClusterTime, MetaStorageMetrics, Manuall return clock.nowLong(); } + @Override + public HybridTimestamp currentSafeTime() { + return this.safeTime.current(); + } + @Override public CompletableFuture<Void> waitFor(HybridTimestamp time) { return safeTime.waitFor(time); @@ -233,10 +237,6 @@ public class ClusterTimeImpl implements ClusterTime, MetaStorageMetrics, Manuall IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS); } - } - @TestOnly - public HybridTimestamp currentSafeTime() { - return this.safeTime.current(); } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index 8f7cd70732..c02d26f89c 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -631,7 +631,7 @@ public class PartitionReplicaLifecycleManager implements IgniteComponent { ) { // Update raft client peers and learners according to the actual assignments. if (replicaMgr.isReplicaStarted(zonePartitionId)) { - replicaMgr.getReplica(zonePartitionId).join() + replicaMgr.replica(zonePartitionId).join() .raftClient().updateConfiguration(fromAssignments(stableAssignments)); } @@ -785,7 +785,7 @@ public class PartitionReplicaLifecycleManager implements IgniteComponent { ? pendingAssignmentsNodes : RebalanceUtil.union(pendingAssignmentsNodes, stableAssignments.nodes()); - replicaMgr.getReplica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments)); + replicaMgr.replica(replicaGrpId).join().raftClient().updateConfiguration(fromAssignments(newAssignments)); }, ioExecutor); } diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index ca37e69f04..fd3d919f64 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage; import org.apache.ignite.internal.replicator.message.TimestampAware; import org.apache.ignite.internal.thread.ExecutorChooser; +import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.thread.PublicApiThreading; import org.apache.ignite.internal.thread.ThreadAttributes; @@ -200,6 +201,8 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc private final ReplicaStateManager replicaStateManager; + private final ExecutorService replicasCreationExecutor; + private volatile String localNodeId; private volatile String localNodeConsistentId; @@ -331,6 +334,15 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc new LinkedBlockingQueue<>(), NamedThreadFactory.create(nodeName, "replica", LOG) ); + + replicasCreationExecutor = new ThreadPoolExecutor( + threadCount, + threadCount, + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + IgniteThreadFactory.create(nodeName, "replica-manager", LOG, STORAGE_READ, STORAGE_WRITE) + ); } private void onReplicaMessageReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) { @@ -613,6 +625,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc * @param replicaGrpId Replication group id. * @param storageIndexTracker Storage index tracker. * @param newConfiguration A configuration for new raft group. + * * @return Future that promises ready new replica when done. */ public CompletableFuture<Boolean> startReplica( @@ -752,14 +765,14 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc ) throws NodeStoppingException { LOG.info("Replica is about to start [replicationGroupId={}].", replicaGrpId); - CompletableFuture<Boolean> resultFuture = newRaftClientFut.thenAccept(updateTableRaftService) - .thenApply((v) -> true); - - CompletableFuture<ReplicaListener> newReplicaListenerFut = newRaftClientFut.thenApply(createListener); - - startReplica(replicaGrpId, storageIndexTracker, newReplicaListenerFut); - - return resultFuture; + return newRaftClientFut + .thenApplyAsync(raftClient -> { + // TODO: will be removed in https://issues.apache.org/jira/browse/IGNITE-22218 + updateTableRaftService.accept(raftClient); + return createListener.apply(raftClient); + }, replicasCreationExecutor) + .thenCompose(replicaListener -> startReplica(replicaGrpId, storageIndexTracker, completedFuture(replicaListener))) + .thenApply(r -> true); } /** @@ -777,7 +790,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc ReplicationGroupId replicaGrpId, PendingComparableValuesTracker<Long, Void> storageIndexTracker, CompletableFuture<ReplicaListener> newReplicaListenerFut - ) throws NodeStoppingException { + ) { ClusterNode localNode = clusterNetSvc.topologyService().localMember(); @@ -820,30 +833,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc .thenCompose(v -> replicaFuture); } - /** - * Temporary public method for RAFT-client starting. - * TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 - * - * @param replicaGrpId Replication Group ID. - * @param newConfiguration Peers and learners nodes for a raft group. - * @param raftClientCache Temporal supplier that returns RAFT-client from TableRaftService if it's already exists and was put into the - * service's map. - * @return Future that returns started RAFT-client. - * @throws NodeStoppingException In case if node was stopping. - */ - @Deprecated - public CompletableFuture<TopologyAwareRaftGroupService> startRaftClient( - ReplicationGroupId replicaGrpId, - PeersAndLearners newConfiguration, - Supplier<RaftGroupService> raftClientCache) - throws NodeStoppingException { - RaftGroupService cachedRaftClient = raftClientCache.get(); - return cachedRaftClient != null - ? completedFuture((TopologyAwareRaftGroupService) cachedRaftClient) - // TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. - : raftManager.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller); - } - /** * Returns future with a replica if it was created or null if there no any replicas starting with given identifier. * @@ -1021,6 +1010,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS); shutdownAndAwaitTermination(executor, shutdownTimeoutSeconds, TimeUnit.SECONDS); + shutdownAndAwaitTermination(replicasCreationExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS); assert replicas.values().stream().noneMatch(CompletableFuture::isDone) : "There are replicas alive [replicas=" @@ -1219,11 +1209,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc return replicas.containsKey(replicaGrpId); } - @TestOnly - public CompletableFuture<Replica> getReplica(ReplicationGroupId replicationGroupId) { - return replicas.get(replicationGroupId); - } - /** * Returns started replication groups. * diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java index 378b2cb681..4e479fbe90 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java @@ -219,7 +219,9 @@ public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest { List<String> partitionAssignments = assignments.get(0); - return partitionAssignments.contains(restartingNodeConsistentId); + return !assignments.isEmpty() + && partitionAssignments != null + && partitionAssignments.contains(restartingNodeConsistentId); } private static boolean isRaftNodeStarted(TableViewInternal table, Loza loza) { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index b48222de3c..8825e5ddf0 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; +import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil; import org.apache.ignite.internal.failure.FailureProcessor; import org.apache.ignite.internal.failure.NoOpFailureProcessor; import org.apache.ignite.internal.hlc.ClockService; @@ -160,12 +161,14 @@ import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils; import org.apache.ignite.internal.replicator.Replica; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; import org.apache.ignite.internal.rest.configuration.RestConfiguration; @@ -707,19 +710,23 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { )); // Check that raft clients on all nodes were updated with the new list of peers. + Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n, newAssignment); assertTrue(waitForCondition( - () -> nodes.stream().allMatch(n -> - n.tableManager - .startedTables() - .get(getTableId(node, TABLE_NAME)) - .internalTable() - .tableRaftService() - .partitionRaftGroupService(0) - .peers() - .equals(List.of(new Peer(newNodeNameForAssignment)))), + () -> nodes.stream() + .filter(isNodeInReplicationGroup) + .allMatch(n -> isNodeUpdatesPeersOnGroupService(node, assignmentsToPeersSet(newAssignment))), (long) AWAIT_TIMEOUT_MILLIS * nodes.size() )); + // Checks that there no any replicas outside replication group + var replGrpId = new TablePartitionId(getTableId(node, TABLE_NAME), 0); + Predicate<Node> isNodeOutsideReplicationGroup = n -> !isNodeInAssignments(n, newAssignment); + assertTrue(waitForCondition( + () -> nodes.stream() + .filter(isNodeOutsideReplicationGroup) + .noneMatch(n -> isReplicationGroupStarted(n, replGrpId)), + (long) AWAIT_TIMEOUT_MILLIS * nodes.size() + )); } @@ -733,13 +740,14 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { waitPartitionAssignmentsSyncedToExpected(0, 1); - String assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0).stream() + var assignmentsBeforeRebalance = getPartitionClusterNodes(node, 0); + String nodeNameAssignedBeforeRebalance = assignmentsBeforeRebalance.stream() .findFirst() .orElseThrow() .consistentId(); String newNodeNameForAssignment = nodes.stream() - .filter(n -> !assignmentsBeforeRebalance.equals(n.clusterService.nodeName())) + .filter(n -> !nodeNameAssignedBeforeRebalance.equals(n.clusterService.nodeName())) .findFirst() .orElseThrow() .name; @@ -762,25 +770,65 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { node.metaStorageManager.put(partAssignmentsPendingKey, bytesPendingAssignments).get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS); + Set<Assignment> union = RebalanceUtil.union(assignmentsBeforeRebalance, newAssignment); + // Check that raft clients on all nodes were updated with the new list of peers. + Predicate<Node> isNodeInReplicationGroup = n -> isNodeInAssignments(n, union); assertTrue(waitForCondition( - () -> nodes.stream().allMatch(n -> - n.tableManager - .startedTables() - .get(getTableId(node, TABLE_NAME)) - .internalTable() - .tableRaftService() - .partitionRaftGroupService(0) - .peers() - .stream() - .collect(toSet()) - .equals(Set.of(new Peer(newNodeNameForAssignment), new Peer(assignmentsBeforeRebalance)))), + () -> nodes.stream() + .filter(isNodeInReplicationGroup) + .allMatch(n -> isNodeUpdatesPeersOnGroupService(node, assignmentsToPeersSet(union))), + (long) AWAIT_TIMEOUT_MILLIS * nodes.size() + )); + + // Checks that there no any replicas outside replication group + Predicate<Node> isNodeOutsideReplicationGroup = n -> !isNodeInAssignments(n, union); + assertTrue(waitForCondition( + () -> nodes.stream() + .filter(isNodeOutsideReplicationGroup) + .noneMatch(n -> isReplicationGroupStarted(n, partId)), (long) AWAIT_TIMEOUT_MILLIS * nodes.size() )); dropMessages.set(false); } + private static Set<Peer> assignmentsToPeersSet(Set<Assignment> assignments) { + return assignments.stream() + .map(Assignment::consistentId) + .map(Peer::new) + .collect(toSet()); + } + + private static boolean isNodeInAssignments(Node node, Set<Assignment> assignments) { + return assignmentsToPeersSet(assignments).stream() + .map(Peer::consistentId) + .anyMatch(id -> id.equals(node.clusterService.nodeName())); + } + + private static boolean isReplicationGroupStarted(Node node, ReplicationGroupId replicationGroupId) { + return node.replicaManager.isReplicaStarted(replicationGroupId); + } + + private static boolean isNodeUpdatesPeersOnGroupService(Node node, Set<Peer> desiredPeers) { + // TODO: will be replaced with replica usage in https://issues.apache.org/jira/browse/IGNITE-22218 + TableRaftService tblRaftSvc = node.tableManager.startedTables() + .get(getTableId(node, TABLE_NAME)) + .internalTable() + .tableRaftService(); + RaftGroupService groupService; + try { + groupService = tblRaftSvc.partitionRaftGroupService(0); + } catch (IgniteInternalException e) { + return false; + } + List<Peer> peersList = groupService.peers(); + boolean isUpdated = peersList.stream() + .collect(toSet()) + .equals(desiredPeers); + return isUpdated; + } + private void clearSpyInvocations() { for (int i = 0; i < NODE_COUNT; i++) { clearInvocations(getNode(i).raftManager); @@ -846,16 +894,26 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { (long) AWAIT_TIMEOUT_MILLIS * nodes.size() )); + Node anyNode = nodes.get(0); + Set<Assignment> assignments = getPartitionClusterNodes(anyNode, tableName, replicasNum); assertTrue(waitForCondition( () -> { try { - return nodes.stream().allMatch(n -> - n.tableManager - .cachedTable(getTableId(n, tableName)) - .internalTable() - .tableRaftService() - .partitionRaftGroupService(partNum) != null - ); + return nodes.stream() + .filter(n -> isNodeInAssignments(n, assignments)) + .allMatch(n -> { + // TODO: will be replaced with replica usage in https://issues.apache.org/jira/browse/IGNITE-22218 + TableRaftService trs = n.tableManager + .cachedTable(getTableId(n, tableName)) + .internalTable() + .tableRaftService(); + + try { + return trs.partitionRaftGroupService(partNum) != null; + } catch (IgniteInternalException e) { + return false; + } + }); } catch (IgniteInternalException e) { // Raft group service not found. return false; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 13406d7f77..9bad9f3241 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -41,6 +41,7 @@ import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablesCounterKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union; @@ -99,6 +100,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongFunction; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -152,7 +154,6 @@ import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; @@ -161,6 +162,7 @@ import org.apache.ignite.internal.replicator.Replica; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaManager.WeakReplicaStopReason; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.schema.SchemaManager; @@ -413,6 +415,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final IndexMetaStorage indexMetaStorage; + private final Predicate<Assignment> isLocalNodeAssignment = assignment -> assignment.consistentId().equals(localNode().name()); + /** * Creates a new table manager. * @@ -646,7 +650,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery( stableAssignmentsPrefix, recoveryRevision, - (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, true), + (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, true), "stable" )); startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery( @@ -889,6 +893,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int zoneId, boolean isRecovery ) { + if (localMemberAssignment == null) { + // (0) in case if node not in the assignments + return nullCompletedFuture(); + } + int tableId = table.tableId(); var internalTbl = (InternalTableImpl) table.internalTable(); @@ -902,137 +911,103 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { .tableRaftService() .updateInternalTableRaftGroupService(partId, raftClient); - CompletableFuture<Boolean> startGroupFut; - - if (localMemberAssignment != null) { - CompletableFuture<Boolean> shouldStartGroupFut = isRecovery - ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded( - replicaGrpId, - internalTbl, - stablePeersAndLearners, - localMemberAssignment - ) - : trueCompletedFuture(); - - Assignments forcedAssignments = stableAssignments.force() ? stableAssignments : null; - - startGroupFut = replicaMgr.weakStartReplica( - replicaGrpId, - () -> shouldStartGroupFut.thenComposeAsync(startGroup -> inBusyLock(busyLock, () -> { - // (1) if partitionReplicatorNodeRecovery#shouldStartGroup fails -> do start nothing - if (!startGroup) { - return falseCompletedFuture(); - } + CompletableFuture<Boolean> shouldStartGroupFut = isRecovery + ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded( + replicaGrpId, + internalTbl, + stablePeersAndLearners, + localMemberAssignment + ) + : trueCompletedFuture(); - // (2) Otherwise let's start replica manually - var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>(HybridTimestamp.MIN_VALUE); + Assignments forcedAssignments = stableAssignments.force() ? stableAssignments : null; - var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); + Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> shouldStartGroupFut + .thenComposeAsync(startGroup -> inBusyLock(busyLock, () -> { + // (1) if partitionReplicatorNodeRecovery#shouldStartGroup fails -> do start nothing + if (!startGroup) { + return falseCompletedFuture(); + } - PartitionStorages partitionStorages = getPartitionStorages(table, partId); + // (2) Otherwise let's start replica manually + var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>(HybridTimestamp.MIN_VALUE); - PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(), - internalTbl, partId); + var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); - storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); + PartitionStorages partitionStorages = getPartitionStorages(table, partId); - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( - partId, - partitionDataStorage, - table, - safeTimeTracker, - storageUpdateConfig - ); + PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(), + internalTbl, partId); - internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); + storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); - mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); + PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( + partId, + partitionDataStorage, + table, + safeTimeTracker, + storageUpdateConfig + ); - RaftGroupListener raftGroupListener = new PartitionListener( - txManager, - partitionDataStorage, - partitionUpdateHandlers.storageUpdateHandler, - partitionStorages.getTxStateStorage(), - safeTimeTracker, - storageIndexTracker, - catalogService, - table.schemaView(), - clockService, - indexMetaStorage - ); + internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); + + mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); + + RaftGroupListener raftGroupListener = new PartitionListener( + txManager, + partitionDataStorage, + partitionUpdateHandlers.storageUpdateHandler, + partitionStorages.getTxStateStorage(), + safeTimeTracker, + storageIndexTracker, + catalogService, + table.schemaView(), + clockService, + indexMetaStorage + ); - SnapshotStorageFactory snapshotStorageFactory = createSnapshotStorageFactory(replicaGrpId, - partitionUpdateHandlers, internalTbl); + SnapshotStorageFactory snapshotStorageFactory = createSnapshotStorageFactory(replicaGrpId, + partitionUpdateHandlers, internalTbl); - Function<RaftGroupService, ReplicaListener> createListener = (raftClient) -> createReplicaListener( - replicaGrpId, - table, - safeTimeTracker, - partitionStorages.getMvPartitionStorage(), - partitionStorages.getTxStateStorage(), - partitionUpdateHandlers, - raftClient); - - RaftGroupEventsListener raftGroupEventsListener = createRaftGroupEventsListener(zoneId, replicaGrpId); - - MvTableStorage mvTableStorage = internalTbl.storage(); - - try { - return replicaMgr.startReplica( - raftGroupEventsListener, - raftGroupListener, - mvTableStorage.isVolatile(), - snapshotStorageFactory, - updateTableRaftService, - createListener, - storageIndexTracker, - replicaGrpId, - stablePeersAndLearners); - } catch (NodeStoppingException e) { - throw new AssertionError("Loza was stopped before Table manager", e); - } - }), ioExecutor), - forcedAssignments - ); - } else { - // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 - // (4) in case if node not in the assignments - startGroupFut = falseCompletedFuture(); - } + Function<RaftGroupService, ReplicaListener> createListener = (raftClient) -> createReplicaListener( + replicaGrpId, + table, + safeTimeTracker, + partitionStorages.getMvPartitionStorage(), + partitionStorages.getTxStateStorage(), + partitionUpdateHandlers, + raftClient); - return startGroupFut - // TODO: the stage will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 - .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, () -> { - if (isReplicaStarted) { - return nullCompletedFuture(); - } + RaftGroupEventsListener raftGroupEventsListener = createRaftGroupEventsListener(zoneId, replicaGrpId); - // TODO: will be removed in https://issues.apache.org/jira/browse/IGNITE-22315 - Supplier<RaftGroupService> getCachedRaftClient = () -> { - try { - // Return existing service if it's already started. - return internalTbl - .tableRaftService() - .partitionRaftGroupService(replicaGrpId.partitionId()); - } catch (IgniteInternalException e) { - // We use "IgniteInternalException" in accordance with the javadoc of "partitionRaftGroupService" method. - return null; - } - }; + MvTableStorage mvTableStorage = internalTbl.storage(); - CompletableFuture<TopologyAwareRaftGroupService> newRaftClientFut; try { - newRaftClientFut = replicaMgr.startRaftClient(replicaGrpId, stablePeersAndLearners, getCachedRaftClient); + return replicaMgr.startReplica( + raftGroupEventsListener, + raftGroupListener, + mvTableStorage.isVolatile(), + snapshotStorageFactory, + updateTableRaftService, + createListener, + storageIndexTracker, + replicaGrpId, + stablePeersAndLearners); } catch (NodeStoppingException e) { - throw new CompletionException(e); + throw new AssertionError("Loza was stopped before Table manager", e); } - return newRaftClientFut.thenAccept(updateTableRaftService); - }), ioExecutor) - .whenComplete((res, ex) -> { - if (ex != null) { - LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); - } - }); + }), ioExecutor); + + return replicaMgr.weakStartReplica( + replicaGrpId, + startReplicaSupplier, + forcedAssignments + ).handle((res, ex) -> { + if (ex != null) { + LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); + } + return null; + }); } @Nullable @@ -1110,6 +1085,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return peer.consistentId().equals(localNode().name()); } + private boolean isLocalNodeInAssignments(Collection<Assignment> assignments) { + return assignments.stream().anyMatch(isLocalNodeAssignment); + } + private PartitionDataStorage partitionDataStorage(MvPartitionStorage partitionStorage, InternalTable internalTbl, int partId) { return new SnapshotAwarePartitionDataStorage( partitionStorage, @@ -1141,6 +1120,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { @Override public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { + // NB: busy lock had already gotten in {@link beforeNodeStop} assert beforeStopGuard.get() : "'stop' called before 'beforeNodeStop'"; if (!stopGuard.compareAndSet(false, true)) { @@ -1793,7 +1773,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int catalogVersion = catalogService.latestCatalogVersion(); return setTablesPartitionCountersForRebalance(replicaGrpId, revision, pendingAssignments.force(), catalogVersion) - .thenCompose(r -> handleChangePendingAssignmentEvent( + .thenCompose(v -> handleChangePendingAssignmentEvent( replicaGrpId, table, stableAssignments, @@ -1802,7 +1782,20 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { isRecovery, catalogVersion )) - .thenCompose(v -> changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(), revision)); + .thenCompose(v -> { + boolean isLocalNodeInStableOrPending = isNodeInReducedStableOrPendingAssignments( + replicaGrpId, + stableAssignments, + pendingAssignments, + revision + ); + + if (!isLocalNodeInStableOrPending) { + return nullCompletedFuture(); + } + + return changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(), revision); + }); } finally { busyLock.leaveBusy(); } @@ -1886,24 +1879,76 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }), ioExecutor); } else { localServicesStartFuture = runAsync(() -> { - if (pendingAssignmentsAreForced && replicaMgr.isReplicaStarted(replicaGrpId)) { + if (pendingAssignmentsAreForced && localMemberAssignment != null) { + + assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is outside of the replication group"; + replicaMgr.resetPeers(replicaGrpId, fromAssignments(computedStableAssignments.nodes())); + } else if (pendingAssignmentsAreForced && localMemberAssignment == null) { + assert !replicaMgr.isReplicaStarted(replicaGrpId) + : "The local node is inside of the replication group"; } }, ioExecutor); } - return localServicesStartFuture.thenRunAsync(() -> { - // For forced assignments, we exclude dead stable nodes, and all alive stable nodes are already in pending assignments. - // Union is not required in such a case. - Set<Assignment> newAssignments = pendingAssignmentsAreForced || stableAssignments == null - ? pendingAssignmentsNodes - : union(pendingAssignmentsNodes, stableAssignments.nodes()); + return localServicesStartFuture + .thenComposeAsync(v -> inBusyLock(busyLock, () -> isLocalNodeLeaseholder(replicaGrpId)), ioExecutor) + .thenAcceptAsync(isLeaseholder -> inBusyLock(busyLock, () -> { + boolean isLocalNodeInStableOrPending = isNodeInReducedStableOrPendingAssignments( + replicaGrpId, + stableAssignments, + pendingAssignments, + revision + ); - tbl.internalTable() - .tableRaftService() - .partitionRaftGroupService(partitionId) - .updateConfiguration(fromAssignments(newAssignments)); - }, ioExecutor); + if (!isLocalNodeInStableOrPending && !isLeaseholder) { + return; + } + + assert isLocalNodeInStableOrPending || isLeaseholder + : "The local node is outside of the replication group [inStableOrPending=" + isLocalNodeInStableOrPending + + ", isLeaseholder=" + isLeaseholder + "]."; + + // For forced assignments, we exclude dead stable nodes, and all alive stable nodes are already in pending assignments. + // Union is not required in such a case. + Set<Assignment> newAssignments = pendingAssignmentsAreForced || stableAssignments == null + ? pendingAssignmentsNodes + : union(pendingAssignmentsNodes, stableAssignments.nodes()); + + tbl.internalTable() + .tableRaftService() + .partitionRaftGroupService(partitionId) + .updateConfiguration(fromAssignments(newAssignments)); + }), ioExecutor); + } + + private boolean isNodeInReducedStableOrPendingAssignments( + TablePartitionId replicaGrpId, + @Nullable Assignments stableAssignments, + Assignments pendingAssignments, + long revision + ) { + Entry reduceEntry = metaStorageMgr.getLocally(RebalanceUtil.switchReduceKey(replicaGrpId), revision); + + Assignments reduceAssignments = reduceEntry != null + ? Assignments.fromBytes(reduceEntry.value()) + : null; + + Set<Assignment> reducedStableAssignments = reduceAssignments != null + ? subtract(stableAssignments.nodes(), reduceAssignments.nodes()) + : stableAssignments.nodes(); + + if (!isLocalNodeInAssignments(union(reducedStableAssignments, pendingAssignments.nodes()))) { + return false; + } + + assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local node is outside of the replication group [" + + ", stable=" + stableAssignments + + ", pending=" + pendingAssignments + + ", reduce=" + reduceAssignments + + ", localName=" + localNode().name() + "]."; + + return true; } private CompletableFuture<Void> setTablesPartitionCountersForRebalance( @@ -1966,6 +2011,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { } private CompletableFuture<Void> changePeersOnRebalance( + // TODO: remove excessive argument (used to get raft-client) https://issues.apache.org/jira/browse/IGNITE-22218 TableImpl table, TablePartitionId replicaGrpId, Set<Assignment> pendingAssignments, @@ -1999,7 +2045,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // run update of raft configuration if this node is a leader LOG.info("Current node={} is the leader of partition raft group={}. " + "Initiate rebalance process for partition={}, table={}", - leaderWithTerm.leader(), replicaGrpId, partId, table.name()); + leaderWithTerm.leader(), replicaGrpId, partId, tables.get(replicaGrpId.tableId()).name()); return metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)) .thenCompose(latestPendingAssignmentsEntry -> { @@ -2010,8 +2056,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return nullCompletedFuture(); } - PeersAndLearners newConfiguration = - fromAssignments(pendingAssignments); + PeersAndLearners newConfiguration = fromAssignments(pendingAssignments); return partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term()); }); @@ -2244,18 +2289,41 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }, ioExecutor).thenCompose(identity()); } + private CompletableFuture<Boolean> isLocalNodeLeaseholder(ReplicationGroupId replicationGroupId) { + HybridTimestamp previousMetastoreSafeTime = metaStorageMgr.clusterTime() + .currentSafeTime() + .addPhysicalTime(-clockService.maxClockSkewMillis()); + + return executorInclinedPlacementDriver.getPrimaryReplica(replicationGroupId, previousMetastoreSafeTime) + .thenApply(replicaMeta -> replicaMeta != null + && replicaMeta.getLeaseholderId() != null + && replicaMeta.getLeaseholderId().equals(localNode().name())); + } + private CompletableFuture<Void> updatePartitionClients( TablePartitionId tablePartitionId, Set<Assignment> stableAssignments, long revision ) { - // Update raft client peers and learners according to the actual assignments. - return tablesById(revision).thenAccept(t -> { - t.get(tablePartitionId.tableId()).internalTable() + return isLocalNodeLeaseholder(tablePartitionId).thenCompose(isLeaseholder -> inBusyLock(busyLock, () -> { + boolean isLocalInStable = isLocalNodeInAssignments(stableAssignments); + + if (!isLocalInStable && !isLeaseholder) { + return nullCompletedFuture(); + } + + assert replicaMgr.isReplicaStarted(tablePartitionId) + : "The local node is outside of the replication group [inStable=" + isLocalInStable + + ", isLeaseholder=" + isLeaseholder + "]."; + + // Update raft client peers and learners according to the actual assignments. + return tablesById(revision).thenAccept(t -> t.get(tablePartitionId.tableId()) + .internalTable() .tableRaftService() .partitionRaftGroupService(tablePartitionId.partitionId()) - .updateConfiguration(fromAssignments(stableAssignments)); - }); + .updateConfiguration(fromAssignments(stableAssignments)) + ); + })); } private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients( @@ -2274,7 +2342,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { ? pendingAssignments.nodes().stream() : Stream.concat(stableAssignments.stream(), pendingAssignments.nodes().stream()) ) - .noneMatch(assignment -> assignment.consistentId().equals(localNode().name())); + .noneMatch(isLocalNodeAssignment); if (shouldStopLocalServices) { return allOf( diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java index 250f56c51b..a4127f5c7c 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java @@ -281,9 +281,6 @@ public class TableManagerRecoveryTest extends IgniteAbstractTest { when(replicaMgr.getLogSyncer()).thenReturn(mock(LogSyncer.class)); when(replicaMgr.startReplica(any(), any(), any(), any(), any(PendingComparableValuesTracker.class), any())) .thenReturn(nullCompletedFuture()); - // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 - when(replicaMgr.startRaftClient(any(), any(), any())) - .thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class))); when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture()); when(replicaMgr.weakStartReplica(any(), any(), any())).thenReturn(trueCompletedFuture()); when(replicaMgr.weakStopReplica(any(), any(), any())).thenReturn(nullCompletedFuture()); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index df39756946..ecb6c2016b 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -282,9 +282,6 @@ public class TableManagerTest extends IgniteAbstractTest { when(replicaMgr.startReplica(any(), any(), anyBoolean(), any(), any(), any(), any(), any(), any())) .thenReturn(trueCompletedFuture()); - // TODO: will be removed after https://issues.apache.org/jira/browse/IGNITE-22315 - when(replicaMgr.startRaftClient(any(), any(), any())) - .thenReturn(completedFuture(mock(TopologyAwareRaftGroupService.class))); when(replicaMgr.stopReplica(any())).thenReturn(trueCompletedFuture()); when(replicaMgr.weakStartReplica(any(), any(), any())).thenAnswer(inv -> { Supplier<CompletableFuture<Void>> startOperation = inv.getArgument(1); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 7ec6ac9c5b..f1379fee24 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -32,7 +32,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.stopAsync; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -682,7 +681,6 @@ public class ItTxTestCluster { topologyAwareRaftGroupServiceFactory ).thenAccept( raftSvc -> { - try { PartitionReplicaListener listener = newReplicaListener( mvPartStorage, raftSvc, @@ -713,9 +711,6 @@ public class ItTxTestCluster { storageIndexTracker, completedFuture(listener) ); - } catch (NodeStoppingException e) { - fail("Unexpected node stopping", e); - } } );