This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 97c531227e IGNITE-19234 Enable and fix group reentry logic for volatile storages (#3763) 97c531227e is described below commit 97c531227ebe74a2211626da5472ec05818152d4 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu May 16 10:52:24 2024 +0400 IGNITE-19234 Enable and fix group reentry logic for volatile storages (#3763) The reentry logic (see IGNITE-16668) was erroneously disabled. This commit enables it back and contains fixes required to make it work (due to changes made to the common rebalancing code since it was disabled). --- .../ignite/internal/affinity/Assignments.java | 16 +++ .../raftsnapshot/ItTableRaftSnapshotsTest.java | 6 +- .../PartitionReplicatorNodeRecovery.java | 113 +++++++++++++++------ .../internal/table/distributed/TableManager.java | 17 ++-- .../{HasDataResponse.java => DataPresence.java} | 18 ++-- .../table/distributed/message/HasDataResponse.java | 13 ++- .../ignite/internal/utils/RebalanceUtilEx.java | 19 ++-- 7 files changed, 143 insertions(+), 59 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java index d0498b8047..2f0d7fcbee 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java @@ -107,6 +107,22 @@ public class Assignments implements Serializable { return force; } + /** + * Adds an assignment to this collection of assignments. + * + * @param assignment Assignment to add. + */ + public void add(Assignment assignment) { + nodes.add(assignment); + } + + /** + * Returns {@code true} if this collection has no assignments, {@code false} if it has some assignments. + */ + public boolean isEmpty() { + return nodes.isEmpty(); + } + /** * Serializes the instance into an array of bytes. */ diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index 70b68a98e6..fadc8e6510 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand; import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine; +import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine; import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine; import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier; import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse; @@ -172,9 +173,8 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { @ParameterizedTest @ValueSource(strings = { RocksDbStorageEngine.ENGINE_NAME, - PersistentPageMemoryStorageEngine.ENGINE_NAME - // TODO: uncomment when https://issues.apache.org/jira/browse/IGNITE-19234 is fixed -// VolatilePageMemoryStorageEngine.ENGINE_NAME + PersistentPageMemoryStorageEngine.ENGINE_NAME, + VolatilePageMemoryStorageEngine.ENGINE_NAME }) void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception { testLeaderFeedsFollowerWithSnapshot(storageEngine); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java index fc34504bd5..8c22b1030e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java @@ -20,17 +20,20 @@ package org.apache.ignite.internal.table.distributed; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.IntFunction; @@ -43,9 +46,12 @@ import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageClosedException; +import org.apache.ignite.internal.storage.StorageRebalanceException; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.message.DataPresence; import org.apache.ignite.internal.table.distributed.message.HasDataRequest; import org.apache.ignite.internal.table.distributed.message.HasDataResponse; import org.apache.ignite.internal.utils.RebalanceUtilEx; @@ -70,6 +76,8 @@ class PartitionReplicatorNodeRecovery { private final TopologyService topologyService; + private final Executor storageAccessExecutor; + /** Obtains a TableImpl instance by a table ID. */ private final IntFunction<TableViewInternal> tableById; @@ -77,11 +85,13 @@ class PartitionReplicatorNodeRecovery { MetaStorageManager metaStorageManager, MessagingService messagingService, TopologyService topologyService, + Executor storageAccessExecutor, IntFunction<TableViewInternal> tableById ) { this.metaStorageManager = metaStorageManager; this.messagingService = messagingService; this.topologyService = topologyService; + this.storageAccessExecutor = storageAccessExecutor; this.tableById = tableById; } @@ -100,39 +110,58 @@ class PartitionReplicatorNodeRecovery { HasDataRequest msg = (HasDataRequest) message; - int tableId = msg.tableId(); - int partitionId = msg.partitionId(); + storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId)); + } + }); + } - boolean storageHasData = false; + private void handleHasDataRequest(HasDataRequest msg, ClusterNode sender, Long correlationId) { + int tableId = msg.tableId(); + int partitionId = msg.partitionId(); - TableViewInternal table = tableById.apply(tableId); + DataPresence dataPresence = DataPresence.UNKNOWN; - if (table != null) { - MvTableStorage storage = table.internalTable().storage(); + TableViewInternal table = tableById.apply(tableId); - MvPartitionStorage mvPartition = storage.getMvPartition(partitionId); + if (table != null) { + MvTableStorage storage = table.internalTable().storage(); - // If node's recovery process is incomplete (no partition storage), then we consider this node's - // partition storage empty. - if (mvPartition != null) { - storageHasData = mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null; - } - } + MvPartitionStorage mvPartition = storage.getMvPartition(partitionId); - messagingService.respond(sender, TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(), correlationId); + if (mvPartition != null) { + try { + dataPresence = mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null + ? DataPresence.HAS_DATA : DataPresence.EMPTY; + } catch (StorageClosedException | StorageRebalanceException ignored) { + // Ignoring so we'll return UNKNOWN for storageHasData meaning that we have no idea. + } } - }); + } + + messagingService.respond( + sender, + TABLE_MESSAGES_FACTORY.hasDataResponse().presenceString(dataPresence.name()).build(), + correlationId + ); } /** - * Returns a future that completes with a decision: should we start the corresponding group locally or not. + * Initiates group reentry (that is, exits the group and then enters it again) if there is a possibility that + * this node lost its Raft metastorage state. This trick allows to solve the double-voting problem (this node + * could vote for one candidate, then do a restart (losing its Raft metastorage, including votedFor field), then + * vote for another candidate in the same term. As a result of removing itself and adding self back, the term + * will be incremented, so the possible old vote will be invalidated. + * + * <p>The possibility of losing the Raft metastorage state is detected by checking if the partition storage is + * volatile (and hence Raft metastorage is also volatile). * * @param tablePartitionId ID of the table partition. * @param internalTable Table we are working with. * @param newConfiguration New configuration that is going to be applied if we'll start the group. * @param localMemberAssignment Assignment of this node in this group. + * @return A future that completes with a decision: should we start the corresponding group locally or not. */ - CompletableFuture<Boolean> shouldStartGroup( + CompletableFuture<Boolean> initiateGroupReentryIfNeeded( TablePartitionId tablePartitionId, InternalTable internalTable, PeersAndLearners newConfiguration, @@ -163,15 +192,15 @@ class PartitionReplicatorNodeRecovery { // No majority and not a full partition restart - need to 'remove, then add' nodes // with current partition. - return waitForPeersAndQueryDataNodesCount(tableId, partId, newConfiguration.peers()) - .thenApply(dataNodesCount -> { - boolean fullPartitionRestart = dataNodesCount == 0; + return waitForPeersAndQueryDataNodesCounts(tableId, partId, newConfiguration.peers()) + .thenApply(dataNodesCounts -> { + boolean fullPartitionRestart = dataNodesCounts.emptyNodes == newConfiguration.peers().size(); if (fullPartitionRestart) { return true; } - boolean majorityAvailable = dataNodesCount >= (newConfiguration.peers().size() / 2) + 1; + boolean majorityAvailable = dataNodesCounts.nonEmptyNodes >= (newConfiguration.peers().size() / 2) + 1; if (majorityAvailable) { RebalanceUtilEx.startPeerRemoval(tablePartitionId, localMemberAssignment, metaStorageManager); @@ -193,13 +222,13 @@ class PartitionReplicatorNodeRecovery { * @param tblId Table id. * @param partId Partition id. * @param peers Raft peers. - * @return A future that will hold the quantity of data nodes. + * @return A future that will hold the counts of data nodes. */ - private CompletableFuture<Long> waitForPeersAndQueryDataNodesCount(int tblId, int partId, Collection<Peer> peers) { + private CompletableFuture<DataNodesCounts> waitForPeersAndQueryDataNodesCounts(int tblId, int partId, Collection<Peer> peers) { HasDataRequest request = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build(); return allPeersAreInTopology(peers) - .thenCompose(unused -> queryDataNodesCount(peers, request)); + .thenCompose(unused -> queryDataNodesCounts(peers, request)); } private CompletableFuture<?> allPeersAreInTopology(Collection<Peer> peers) { @@ -274,9 +303,9 @@ class PartitionReplicatorNodeRecovery { .thenCompose(identity()); } - private CompletableFuture<Long> queryDataNodesCount(Collection<Peer> peers, HasDataRequest request) { + private CompletableFuture<DataNodesCounts> queryDataNodesCounts(Collection<Peer> peers, HasDataRequest request) { //noinspection unchecked - CompletableFuture<Boolean>[] requestFutures = peers.stream() + CompletableFuture<DataPresence>[] presenceFutures = peers.stream() .map(Peer::consistentId) .map(topologyService::getByConsistentId) .filter(Objects::nonNull) @@ -285,12 +314,36 @@ class PartitionReplicatorNodeRecovery { .thenApply(response -> { assert response instanceof HasDataResponse : response; - return ((HasDataResponse) response).result(); + return ((HasDataResponse) response).presence(); }) - .exceptionally(unused -> false)) + .exceptionally(unused -> DataPresence.UNKNOWN)) .toArray(CompletableFuture[]::new); - return allOf(requestFutures) - .thenApply(unused -> Arrays.stream(requestFutures).filter(CompletableFuture::join).count()); + return allOf(presenceFutures) + .thenApply(unused -> { + List<DataPresence> hasDataFlags = Arrays.stream(presenceFutures) + .map(CompletableFuture::join) + .collect(toList()); + + long nodesSurelyHavingData = hasDataFlags.stream().filter(presence -> presence == DataPresence.HAS_DATA).count(); + long nodesSurelyEmpty = hasDataFlags.stream().filter(presence -> presence == DataPresence.EMPTY).count(); + return new DataNodesCounts(nodesSurelyHavingData, nodesSurelyEmpty); + }); + } + + /** + * It is not guaranteed that {@link #nonEmptyNodes} plus {@link #emptyNodes} gives the replicator group size + * as for some nodes we don't know at the moment whether they have data or not. + */ + private static class DataNodesCounts { + /** Number of nodes that reported that they have some data for the partition of interest. */ + private final long nonEmptyNodes; + /* Number of nodes that reported that they don't have any data for the partition of interest. */ + private final long emptyNodes; + + private DataNodesCounts(long nonEmptyNodes, long emptyNodes) { + this.nonEmptyNodes = nonEmptyNodes; + this.emptyNodes = emptyNodes; + } } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index bda552db27..5dfb1ad1e4 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -287,7 +287,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Versioned value for tracking RAFT groups initialization and starting completion. * - * <p>Only explicitly updated in {@link #startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}. + * <p>Only explicitly updated in {@link #startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean)}. * * <p>Completed strictly after {@link #localPartitionsVv}. */ @@ -385,7 +385,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Versioned value used only at manager startup to correctly fire table creation events. */ private final IncrementalVersionedValue<Void> startVv; - /** Ends at the {@link #stop()} with an {@link NodeStoppingException}. */ + /** Ends at the {@link #stopAsync()} with an {@link NodeStoppingException}. */ private final CompletableFuture<Void> stopManagerFuture = new CompletableFuture<>(); /** Configuration for {@link StorageUpdateHandler}. */ @@ -570,6 +570,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { metaStorageMgr, messagingService, topologyService, + partitionOperationsExecutor, tableId -> tablesById().get(tableId) ); @@ -830,12 +831,14 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * @param assignmentsFuture Table assignments. * @param table Initialized table entity. * @param zoneId Zone id. + * @param isRecovery {@code true} if the node is being started up. * @return future, which will be completed when the partitions creations done. */ private CompletableFuture<Void> startLocalPartitionsAndClients( CompletableFuture<List<Assignments>> assignmentsFuture, TableImpl table, - int zoneId + int zoneId, + boolean isRecovery ) { int tableId = table.tableId(); @@ -858,7 +861,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { assignments.get(partId), null, zoneId, - false + isRecovery ) .whenComplete((res, ex) -> { if (ex != null) { @@ -934,7 +937,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { if (localMemberAssignment != null) { CompletableFuture<Boolean> shouldStartGroupFut = isRecovery - ? partitionReplicatorNodeRecovery.shouldStartGroup( + ? partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded( replicaGrpId, internalTbl, newConfiguration, @@ -1392,7 +1395,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { registerIndexesToTable(table, catalogService, partitionSet, schemaRegistry, lwm); } - return startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id()); + return startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id(), onNodeRecovery); } ), ioExecutor); }); @@ -2528,7 +2531,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Returns the future that will complete when, either the future from the argument or {@link #stopManagerFuture} will complete, - * successfully or exceptionally. Allows to protect from getting stuck at {@link #stop()} when someone is blocked (by using + * successfully or exceptionally. Allows to protect from getting stuck at {@link #stopAsync()} when someone is blocked (by using * {@link #busyLock}) for a long time. * * @param future Future. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java similarity index 65% copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java index 66f7b0f6d7..67fc4cbdbc 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java @@ -17,15 +17,15 @@ package org.apache.ignite.internal.table.distributed.message; -import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.table.distributed.TableMessageGroup; - /** - * A response to the {@link HasDataRequest}. + * Whether a node has data or not (or it's not known because it did not respond in time, or the corresopnding storage is + * already closed or still being rebalanced to). */ -@Transferable(TableMessageGroup.HAS_DATA_RESPONSE) -public interface HasDataResponse extends NetworkMessage { - /** {@code true} if a node has data for a partition of a table, {@code false} otherwise. */ - boolean result(); +public enum DataPresence { + /** The storage is empty. */ + EMPTY, + /** The storage has some data. */ + HAS_DATA, + /** We don't know for some reason. */ + UNKNOWN } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java index 66f7b0f6d7..69d7bd994e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java @@ -26,6 +26,15 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup; */ @Transferable(TableMessageGroup.HAS_DATA_RESPONSE) public interface HasDataResponse extends NetworkMessage { - /** {@code true} if a node has data for a partition of a table, {@code false} otherwise. */ - boolean result(); + /** + * Data presence indicator. + */ + default DataPresence presence() { + return DataPresence.valueOf(presenceString()); + } + + /** + * String representation of {@link #presence()}. + */ + String presenceString(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java index 451e98c4c3..7a2a4da589 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.affinity.AffinityUtils; import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.affinity.Assignments; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -73,23 +74,23 @@ public class RebalanceUtilEx { byte[] prevValue = retrievedAssignmentsSwitchReduce.value(); if (prevValue != null) { - Set<Assignment> prev = ByteUtils.fromBytes(prevValue); + Assignments prev = Assignments.fromBytes(prevValue); prev.add(peerAssignment); return metaStorageMgr.invoke( revision(key).eq(retrievedAssignmentsSwitchReduce.revision()), - put(key, ByteUtils.toBytes(prev)), + put(key, prev.toBytes()), Operations.noop() ); } else { - var newValue = new HashSet<>(); + var newValue = Assignments.of(new HashSet<>()); newValue.add(peerAssignment); return metaStorageMgr.invoke( notExists(key), - put(key, ByteUtils.toBytes(newValue)), + put(key, newValue.toBytes()), Operations.noop() ); } @@ -118,7 +119,9 @@ public class RebalanceUtilEx { Entry entry = event.entryEvent().newEntry(); byte[] eventData = entry.value(); - Set<Assignment> switchReduce = ByteUtils.fromBytes(eventData); + assert eventData != null : "Null event data for " + partId; + + Assignments switchReduce = Assignments.fromBytes(eventData); if (switchReduce.isEmpty()) { return nullCompletedFuture(); @@ -128,10 +131,10 @@ public class RebalanceUtilEx { ByteArray pendingKey = pendingPartAssignmentsKey(partId); - Set<Assignment> pendingAssignments = difference(assignments, switchReduce); + Set<Assignment> pendingAssignments = difference(assignments, switchReduce.nodes()); - byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments); - byte[] assignmentsByteArray = ByteUtils.toBytes(assignments); + byte[] pendingByteArray = Assignments.toBytes(pendingAssignments); + byte[] assignmentsByteArray = Assignments.toBytes(assignments); ByteArray changeTriggerKey = pendingChangeTriggerKey(partId); byte[] rev = ByteUtils.longToBytes(entry.revision());