This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 8980c45e57 IGNITE-20210 Start partitions on corresponding assignments.stable, calculate if missing, cleanup obsolete resources (part 1) (#2867) 8980c45e57 is described below commit 8980c45e57a69478285a03a5c7692dad6220bd4c Author: Denis Chudov <moongll...@gmail.com> AuthorDate: Wed Dec 20 13:36:22 2023 +0300 IGNITE-20210 Start partitions on corresponding assignments.stable, calculate if missing, cleanup obsolete resources (part 1) (#2867) --- .../distributionzones/rebalance/RebalanceUtil.java | 50 +- .../apache/ignite/internal/index/IndexManager.java | 2 +- .../runner/app/ItIgniteNodeRestartTest.java | 101 +++ .../rebalance/ItRebalanceDistributedTest.java | 21 +- .../internal/table/distributed/TableManager.java | 703 +++++++++++---------- 5 files changed, 516 insertions(+), 361 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java index 9c3e178236..3e95302031 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java @@ -48,12 +48,12 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.util.ByteUtils; -import org.apache.ignite.internal.vault.VaultEntry; -import org.apache.ignite.internal.vault.VaultManager; +import org.jetbrains.annotations.Nullable; /** * Util class for methods needed for the rebalance process. @@ -465,19 +465,24 @@ public class RebalanceUtil { } /** - * Returns partition assignments from vault. + * Returns partition assignments from meta storage locally. * - * @param vaultManager Vault manager. + * @param metaStorageManager Meta storage manager. * @param tableId Table id. * @param partitionNumber Partition number. - * @return Returns partition assignments from vault or {@code null} if assignments is absent. + * @param revision Revision. + * @return Returns partition assignments from meta storage locally or {@code null} if assignments is absent. */ - public static Set<Assignment> partitionAssignments( - VaultManager vaultManager, int tableId, int partitionNumber) { - VaultEntry entry = - vaultManager.get(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionNumber))).join(); + @Nullable + public static Set<Assignment> partitionAssignmentsGetLocally( + MetaStorageManager metaStorageManager, + int tableId, + int partitionNumber, + long revision + ) { + Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionNumber)), revision); - return (entry == null) ? null : ByteUtils.fromBytes(entry.value()); + return (entry == null || entry.empty() || entry.tombstone()) ? null : ByteUtils.fromBytes(entry.value()); } /** @@ -516,25 +521,28 @@ public class RebalanceUtil { } /** - * Returns table assignments for all table partitions from vault. + * Returns table assignments for all table partitions from meta storage locally. Assignments must be present. * - * @param vaultManager Vault manager. + * @param metaStorageManager Meta storage manager. * @param tableId Table id. * @param numberOfPartitions Number of partitions. + * @param revision Revision. * @return Future with table assignments as a value. */ - public static List<Set<Assignment>> tableAssignments( - VaultManager vaultManager, + public static List<Set<Assignment>> tableAssignmentsGetLocally( + MetaStorageManager metaStorageManager, int tableId, - int numberOfPartitions + int numberOfPartitions, + long revision ) { return IntStream.range(0, numberOfPartitions) - .mapToObj(i -> - (Set<Assignment>) ByteUtils.fromBytes( - vaultManager.get( - stablePartAssignmentsKey(new TablePartitionId(tableId, i)) - ).join().value()) - ) + .mapToObj(p -> { + Entry e = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, p)), revision); + + assert e != null && !e.empty() && !e.tombstone() : e; + + return (Set<Assignment>) ByteUtils.fromBytes(e.value()); + }) .collect(Collectors.toList()); } } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 1fbc57e70a..8bb672fc4e 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -331,7 +331,7 @@ public class IndexManager implements IgniteComponent { if (throwable != null) { LOG.error("Error starting indexes", throwable); } else { - LOG.debug("Indexes started successfully"); + LOG.info("Indexes started successfully"); } }); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index cc8fd11bd7..72f1b5f5c1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -18,7 +18,11 @@ package org.apache.ignite.internal.runner.app; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; @@ -58,6 +62,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.internal.BaseIgniteRestartTest; +import org.apache.ignite.internal.affinity.Assignment; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; @@ -92,6 +97,7 @@ import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; @@ -103,6 +109,7 @@ import org.apache.ignite.internal.network.recovery.VaultStateIds; import org.apache.ignite.internal.placementdriver.PlacementDriverManager; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; @@ -110,6 +117,7 @@ import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.SchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; @@ -124,6 +132,7 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; +import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -131,6 +140,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.TxMessageGroup; +import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.NettyBootstrapFactory; import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; @@ -1146,6 +1156,13 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * The test for node restart when there is a gap between the node local configuration and distributed configuration. */ @Test + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19712") + // TODO https://issues.apache.org/jira/browse/IGNITE-19712 This test should work, but is disabled because of assertion errors. + // TODO Root cause of errors is the absence of the indexes on the partition after restart. Scenario: indexes are recovered from + // TODO the catalog, then partition storages are cleaned up on recovery due to the absence of the node in stable assignments, + // TODO then after recovery the pending assignments event is processed, and it creates the storages and partition again, but + // TODO doesn't register the indexes. As a result, indexes are not found and assertion happens. + // TODO This also causes https://issues.apache.org/jira/browse/IGNITE-20996 . public void testCfgGap() { List<IgniteImpl> nodes = startNodes(4); @@ -1191,6 +1208,90 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { startNodes(3); } + @Test + public void destroyObsoleteStoragesOnRestart() throws InterruptedException { + int nodesCount = 3; + List<IgniteImpl> nodes = startNodes(nodesCount); + + int partitions = nodesCount; + int replicas = nodesCount; + createTableWithData(nodes, TABLE_NAME, replicas, partitions); + + int restartedNodeIndex = nodesCount - 1; + + WatchListenerInhibitor inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex)); + + inhibitor.startInhibit(); + + // Change the zone - one replica per partition. + alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", TABLE_NAME.toUpperCase()), 1); + + stopNode(restartedNodeIndex); + + inhibitor.stopInhibit(); + + IgniteImpl restartedNode = startNode(restartedNodeIndex); + + TableImpl table = (TableImpl) restartedNode.tables().table(TABLE_NAME); + + assertTrue(waitForCondition(() -> { + // Check that only storage for 1 partition left on the restarted node. + return IntStream.range(0, partitions) + .mapToObj(i -> table.internalTable().storage().getMvPartition(i)) + .filter(Objects::nonNull) + .count() == 1; + }, 10_000)); + } + + @Test + public void testCorrectPartitionRecoveryOnSnapshot() throws InterruptedException { + int nodesCount = 3; + List<IgniteImpl> nodes = startNodes(nodesCount); + + int partitions = nodesCount; + int replicas = nodesCount; + createTableWithData(nodes, TABLE_NAME, replicas, partitions); + + int restartedNodeIndex = nodesCount - 1; + + WatchListenerInhibitor inhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex)); + + inhibitor.startInhibit(); + + alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", TABLE_NAME.toUpperCase()), 1); + + stopNode(restartedNodeIndex); + + inhibitor.stopInhibit(); + + forceSnapshotUsageOnRestart(nodes.get(0)); + + IgniteImpl restartedNode = startNode(restartedNodeIndex); + + TableImpl table = (TableImpl) restartedNode.tables().table(TABLE_NAME); + + long recoveryRevision = restartedNode.metaStorageManager().recoveryFinishedFuture().join(); + + PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(nodes.stream().map(IgniteImpl::name) + .collect(toSet()), Set.of()); + + for (int p = 0; p < partitions; p++) { + TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), p); + + Entry e = restartedNode.metaStorageManager().getLocally(stablePartAssignmentsKey(tablePartitionId), recoveryRevision); + + Set<Assignment> assignment = ByteUtils.fromBytes(e.value()); + + boolean shouldBe = assignment.stream().anyMatch(n -> n.consistentId().equals(restartedNode.name())); + + Peer peer = configuration.peer(restartedNode.name()); + + boolean isStarted = restartedNode.raftManager().isStarted(new RaftNodeId(tablePartitionId, peer)); + + assertEquals(shouldBe, isStarted); + } + } + /** * Checks the table exists and validates all data in it. * diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index f78aef081f..f6f81281b0 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -154,6 +154,7 @@ import org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryDataStora import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration; import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.TableManager; @@ -503,7 +504,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { @Test @UseTestTxStateStorage @UseRocksMetaStorage - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210") + @Disabled("https://issues.apache.org/jira/browse/IGNITE-19170") void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) throws Exception { Node node = getNode(0); @@ -701,8 +702,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws Exception { for (int i = 0; i < NODE_COUNT; i++) { - verify(getNode(i).raftManager, times(1)) - .startRaftGroupNode(any(), any(), any(), any(), any(RaftGroupOptions.class)); + verify(getNode(i).raftManager, times(1)).startRaftGroupNode(any(), any(), any(), any(), any(RaftGroupOptions.class)); verify(getNode(i).replicaManager, times(1)).startReplica(any(), any(), any(), any(), any()); } } @@ -712,6 +712,21 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n, partNum).size() == replicasNum), (long) AWAIT_TIMEOUT_MILLIS * nodes.size() )); + + if (replicasNum == nodes.size()) { + assertTrue(waitForCondition( + () -> { + try { + return ((TableImpl) nodes.get(0).tableManager.table(TABLE_NAME)) + .internalTable().partitionRaftGroupService(partNum) != null; + } catch (IgniteInternalException e) { + // Raft group service not found. + return false; + } + }, + (long) AWAIT_TIMEOUT_MILLIS * nodes.size() + )); + } } private void waitPartitionPendingAssignmentsSyncedToExpected(int partNum, int replicasNum) throws Exception { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index cacdb8edb7..423a3a03b7 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.anyOf; @@ -26,12 +27,13 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; @@ -68,11 +70,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongFunction; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -217,9 +219,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** Meta storage manager. */ private final MetaStorageManager metaStorageMgr; - /** Vault manager. */ - private final VaultManager vaultManager; - /** Data storage manager. */ private final DataStorageManager dataStorageMgr; @@ -244,7 +243,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Versioned store for tracking RAFT groups initialization and starting completion. * - * <p>Only explicitly updated in {@link #createTablePartitionsLocally(long, CompletableFuture, TableImpl)}. + * <p>Only explicitly updated in {@link #startLocalPartitionsAndUpdateClients(CompletableFuture, TableImpl)}. * * <p>Completed strictly after {@link #localPartsByTableIdVv}. */ @@ -393,7 +392,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.dataStorageMgr = dataStorageMgr; this.storagePath = storagePath; this.metaStorageMgr = metaStorageMgr; - this.vaultManager = vaultManager; this.schemaManager = schemaManager; this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator; this.clock = clock; @@ -494,7 +492,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { startTables(recoveryRevision); - performRebalanceOnRecovery(recoveryRevision); + processAssignmentsOnRecovery(recoveryRevision); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), pendingAssignmentsRebalanceListener); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); @@ -516,18 +514,38 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); } - private void performRebalanceOnRecovery(long recoveryRevision) { - CompletableFuture<Void> pendingAssignmentsRecoveryFuture; + private void processAssignmentsOnRecovery(long recoveryRevision) { + var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX); + var pendingAssignmentsPrefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX); - var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX); + startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery( + stableAssignmentsPrefix, + recoveryRevision, + (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, true), + "stable" + )); + startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery( + pendingAssignmentsPrefix, + recoveryRevision, + (entry, rev) -> handleChangePendingAssignmentEvent(entry, rev, true), + "pending" + )); + } - try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, recoveryRevision)) { + private CompletableFuture<Void> handleAssignmentsOnRecovery( + ByteArray prefix, + long revision, + BiFunction<Entry, Long, CompletableFuture<Void>> assignmentsEventHandler, + String assignmentsType + ) { + try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, revision)) { CompletableFuture<?>[] futures = cursor.stream() - .map(pendingAssignmentEntry -> { + .map(entry -> { if (LOG.isInfoEnabled()) { LOG.info( - "Missed pending assignments for key '{}' discovered, performing recovery", - new String(pendingAssignmentEntry.key(), UTF_8) + "Missed {} assignments for key '{}' discovered, performing recovery", + assignmentsType, + new String(entry.key(), UTF_8) ); } @@ -535,20 +553,19 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // 'handleChangePendingAssignmentEvent' accesses some Versioned Values that only store values starting with // tokens equal to Meta Storage recovery revision. In other words, if the entry has a lower revision than the // recovery revision, there will never be a Versioned Value corresponding to its revision. - return handleChangePendingAssignmentEvent(pendingAssignmentEntry, recoveryRevision); + return assignmentsEventHandler.apply(entry, revision); }) .toArray(CompletableFuture[]::new); - pendingAssignmentsRecoveryFuture = allOf(futures) + + return allOf(futures) // Simply log any errors, we don't want to block watch processing. .exceptionally(e -> { - LOG.error("Error when performing pending assignments recovery", e); + LOG.error("Error when performing assignments recovery", e); return null; }); } - - startVv.update(recoveryRevision, (v, e) -> pendingAssignmentsRecoveryFuture); } private CompletableFuture<?> onTableCreate(CreateTableEventParameters parameters) { @@ -602,220 +619,188 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Updates or creates partition raft groups and storages. * - * @param causalityToken Causality token. * @param assignmentsFuture Table assignments. * @param table Initialized table entity. * @return future, which will be completed when the partitions creations done. */ - private CompletableFuture<?> createTablePartitionsLocally( - long causalityToken, + private CompletableFuture<Void> startLocalPartitionsAndUpdateClients( CompletableFuture<List<Set<Assignment>>> assignmentsFuture, TableImpl table ) { int tableId = table.tableId(); // Create new raft nodes according to new assignments. - Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> assignmentsFuture.thenCompose(newAssignments -> { + return assignmentsFuture.thenCompose(assignments -> { // Empty assignments might be a valid case if tables are created from within cluster init HOCON // configuration, which is not supported now. - assert newAssignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); + assert assignments != null : IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId); - int partitions = newAssignments.size(); + int partitions = assignments.size(); - CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions]; + List<CompletableFuture<?>> futures = new ArrayList<>(); - // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set partitions only for assigned partitions. - PartitionSet parts = new BitSetPartitionSet(); + for (int i = 0; i < partitions; i++) { + int partId = i; - for (int i = 0; i < futures.length; i++) { - futures[i] = new CompletableFuture<>(); + CompletableFuture<?> future = startPartitionAndStartClient( + table, + partId, + assignments.get(partId), + false + ) + .whenComplete((res, ex) -> { + if (ex != null) { + LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); + } + }); - parts.set(i); + futures.add(future); } - String localMemberName = localNode().name(); + return allOf(futures.toArray(new CompletableFuture<?>[0])); + }); + } - for (int i = 0; i < partitions; i++) { - int partId = i; + private CompletableFuture<Void> startPartitionAndStartClient( + TableImpl table, + int partId, + Set<Assignment> newPartAssignment, + boolean isRecovery + ) { + CompletableFuture<Void> resultFuture = new CompletableFuture<>(); + + int tableId = table.tableId(); + + InternalTable internalTbl = table.internalTable(); - Set<Assignment> newPartAssignment = newAssignments.get(partId); + Assignment localMemberAssignment = newPartAssignment.stream() + .filter(a -> a.consistentId().equals(localNode().name())) + .findAny() + .orElse(null); - InternalTable internalTbl = table.internalTable(); + PeersAndLearners newConfiguration = configurationFromAssignments(newPartAssignment); - Assignment localMemberAssignment = newPartAssignment.stream() - .filter(a -> a.consistentId().equals(localMemberName)) - .findAny() - .orElse(null); + TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId); - PeersAndLearners newConfiguration = configurationFromAssignments(newPartAssignment); + var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>( + new HybridTimestamp(1, 0) + ); + var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); - TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId); + PartitionStorages partitionStorages = getPartitionStorages(table, partId); - var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>( - new HybridTimestamp(1, 0) - ); - var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); + PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(), + internalTbl, partId); - ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); + storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); - PartitionStorages partitionStorages = getPartitionStorages(table, partId); + PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( + partId, + partitionDataStorage, + table, + safeTimeTracker + ); + + Peer serverPeer = newConfiguration.peer(localNode().name()); - PartitionDataStorage partitionDataStorage = partitionDataStorage(partitionStorages.getMvPartitionStorage(), - internalTbl, partId); + var raftNodeId = localMemberAssignment == null ? null : new RaftNodeId(replicaGrpId, serverPeer); - storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null); + boolean shouldStartRaftListeners = localMemberAssignment != null && !((Loza) raftMgr).isStarted(raftNodeId); - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( - partId, - partitionDataStorage, - table, - safeTimeTracker - ); + if (shouldStartRaftListeners) { + ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); - mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); + mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); + } - CompletableFuture<Boolean> startGroupFut; + CompletableFuture<Boolean> startGroupFut; - // start new nodes, only if it is table creation, other cases will be covered by rebalance logic - if (localMemberAssignment != null) { - CompletableFuture<Boolean> shouldStartGroupFut = partitionReplicatorNodeRecovery.shouldStartGroup( + // start new nodes, only if it is table creation, other cases will be covered by rebalance logic + if (localMemberAssignment != null) { + CompletableFuture<Boolean> shouldStartGroupFut = isRecovery + ? partitionReplicatorNodeRecovery.shouldStartGroup( replicaGrpId, internalTbl, newConfiguration, localMemberAssignment - ); - - startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> { - if (!startGroup) { - return false; - } - TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage(); - - RaftGroupOptions groupOptions = groupOptionsForPartition( - internalTbl.storage(), - internalTbl.txStateStorage(), - partitionKey(internalTbl, partId), - partitionUpdateHandlers - ); - - Peer serverPeer = newConfiguration.peer(localMemberName); - - var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer); - - try { - // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 - ((Loza) raftMgr).startRaftGroupNode( - raftNodeId, - newConfiguration, - new PartitionListener( - txManager, - partitionDataStorage, - partitionUpdateHandlers.storageUpdateHandler, - txStatePartitionStorage, - safeTimeTracker, - storageIndexTracker - ), - new RebalanceRaftGroupEventsListener( - metaStorageMgr, - replicaGrpId, - busyLock, - createPartitionMover(internalTbl, partId), - this::calculateAssignments, - rebalanceScheduler - ), - groupOptions - ); + ) + : trueCompletedFuture(); - return true; - } catch (NodeStoppingException ex) { - throw new CompletionException(ex); - } - }), ioExecutor); - } else { - startGroupFut = falseCompletedFuture(); + startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> { + if (!startGroup) { + return false; } - startGroupFut - .thenComposeAsync(v -> inBusyLock(busyLock, () -> { - try { - //TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. - return raftMgr.startRaftGroupService( - replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller - ); - } catch (NodeStoppingException ex) { - return failedFuture(ex); - } - }), ioExecutor) - .thenAcceptAsync(updatedRaftGroupService -> inBusyLock(busyLock, () -> { - ((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService); - - boolean startedRaftNode = startGroupFut.join(); - if (localMemberAssignment == null || !startedRaftNode) { - return; - } - - MvPartitionStorage partitionStorage = partitionStorages.getMvPartitionStorage(); - TxStateStorage txStateStorage = partitionStorages.getTxStateStorage(); - - try { - startReplicaWithNewListener( - replicaGrpId, - table, - safeTimeTracker, - storageIndexTracker, - partitionStorage, - txStateStorage, - partitionUpdateHandlers, - updatedRaftGroupService - ); - } catch (NodeStoppingException ex) { - throw new AssertionError("Loza was stopped before Table manager", ex); - } - }), ioExecutor) - .whenComplete((res, ex) -> { - if (ex != null) { - LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); - - futures[partId].completeExceptionally(ex); - } else { - futures[partId].complete(null); - } - }); - } + if (((Loza) raftMgr).isStarted(raftNodeId)) { + return true; + } - return allOf(futures); - }); + try { + startPartitionRaftGroupNode( + replicaGrpId, + raftNodeId, + newConfiguration, + safeTimeTracker, + storageIndexTracker, + internalTbl, + partitionStorages.getTxStateStorage(), + partitionDataStorage, + partitionUpdateHandlers + ); - // NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!). - CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken, - (previous, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenCompose(newAssignments -> { - PartitionSet parts = new BitSetPartitionSet(); + return true; + } catch (NodeStoppingException ex) { + throw new CompletionException(ex); + } + }), ioExecutor); + } else { + startGroupFut = falseCompletedFuture(); + } - for (int i = 0; i < newAssignments.size(); i++) { - parts.set(i); + startGroupFut + .thenComposeAsync(v -> inBusyLock(busyLock, () -> { + try { + //TODO IGNITE-19614 This procedure takes 10 seconds if there's no majority online. + return raftMgr + .startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller); + } catch (NodeStoppingException ex) { + return failedFuture(ex); } + }), ioExecutor) + .thenAcceptAsync(updatedRaftGroupService -> inBusyLock(busyLock, () -> { + ((InternalTableImpl) internalTbl).updateInternalTableRaftGroupService(partId, updatedRaftGroupService); - return getOrCreatePartitionStorages(table, parts).thenApply(u -> { - var newValue = new HashMap<>(previous); - - newValue.put(tableId, parts); - - return newValue; - }); - }))); + boolean startedRaftNode = startGroupFut.join(); + if (localMemberAssignment == null || !startedRaftNode || replicaMgr.isReplicaStarted(replicaGrpId)) { + return; + } - // We bring the future outside to avoid OutdatedTokenException. - CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = tablesByIdVv.get(causalityToken); + try { + startReplicaWithNewListener( + replicaGrpId, + table, + safeTimeTracker, + storageIndexTracker, + partitionStorages.getMvPartitionStorage(), + partitionStorages.getTxStateStorage(), + partitionUpdateHandlers, + updatedRaftGroupService + ); + } catch (NodeStoppingException ex) { + throw new AssertionError("Loza was stopped before Table manager", ex); + } + }), ioExecutor) + .whenComplete((res, ex) -> { + if (ex != null) { + LOG.warn("Unable to update raft groups on the node [tableId={}, partitionId={}]", ex, tableId, partId); - return assignmentsUpdatedVv.update(causalityToken, (token, e) -> { - if (e != null) { - return failedFuture(e); - } + resultFuture.completeExceptionally(ex); + } else { + resultFuture.complete(null); + } + }); - return localPartsUpdateFuture.thenCompose(unused -> - tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock(busyLock, updateAssignmentsClosure), ioExecutor) - ); - }); + return resultFuture; } private void startReplicaWithNewListener( @@ -1054,9 +1039,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // Check if the table already has assignments in the vault. // So, it means, that it is a recovery process and we should use the vault assignments instead of calculation for the new ones. - // TODO: IGNITE-20210 Fix it - if (partitionAssignments(vaultManager, tableId, 0) != null) { - assignmentsFuture = completedFuture(tableAssignments(vaultManager, tableId, zoneDescriptor.partitions())); + // TODO https://issues.apache.org/jira/browse/IGNITE-20993 + if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) { + assignmentsFuture = completedFuture( + tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken)); } else { assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId) .thenApply(dataNodes -> AffinityUtils.calculateAssignments( @@ -1136,7 +1122,43 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }); })); - CompletableFuture<?> createPartsFut = createTablePartitionsLocally(causalityToken, assignmentsFuture, table); + // NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!). + CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken, + (previous, throwable) -> inBusyLock(busyLock, () -> assignmentsFuture.thenCompose(newAssignments -> { + PartitionSet parts = new BitSetPartitionSet(); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set partitions only for + // TODO assigned partitions. + for (int i = 0; i < newAssignments.size(); i++) { + parts.set(i); + } + + return getOrCreatePartitionStorages(table, parts).thenApply(u -> { + var newValue = new HashMap<>(previous); + + newValue.put(tableId, parts); + + return newValue; + }); + }))); + + // We bring the future outside to avoid OutdatedTokenException. + CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = tablesByIdVv.get(causalityToken); + + // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be started only on the assignments change + // TODO event triggered by zone create or alter. + CompletableFuture<?> createPartsFut = assignmentsUpdatedVv.update(causalityToken, (token, e) -> { + if (e != null) { + return failedFuture(e); + } + + return localPartsUpdateFuture.thenCompose(unused -> + tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock( + busyLock, + () -> startLocalPartitionsAndUpdateClients(assignmentsFuture, table) + ), ioExecutor) + ); + }); pendingTables.put(tableId, table); startedTables.put(tableId, table); @@ -1242,6 +1264,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { CompletableFuture<?>[] stopReplicaFutures = new CompletableFuture<?>[partitions]; + // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions should be stopped on the assignments change + // TODO event triggered by zone drop or alter. for (int partitionId = 0; partitionId < partitions; partitionId++) { var replicationGroupId = new TablePartitionId(tableId, partitionId); @@ -1341,6 +1365,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * Returns the latest tables by ID map, for which all assignment updates have been completed. */ private Map<Integer, TableImpl> latestTablesById() { + // TODO https://issues.apache.org/jira/browse/IGNITE-20915 fix this. if (assignmentsUpdatedVv.latestCausalityToken() < 0L) { // No tables at all in case of empty causality token. return emptyMap(); @@ -1557,7 +1582,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { try { Entry newEntry = evt.entryEvent().newEntry(); - return handleChangePendingAssignmentEvent(newEntry, evt.revision()); + return handleChangePendingAssignmentEvent(newEntry, evt.revision(), false); } finally { busyLock.leaveBusy(); } @@ -1570,7 +1595,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { }; } - private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry pendingAssignmentsEntry, long revision) { + private CompletableFuture<Void> handleChangePendingAssignmentEvent( + Entry pendingAssignmentsEntry, + long revision, + boolean isRecovery + ) { if (pendingAssignmentsEntry.value() == null) { return nullCompletedFuture(); } @@ -1583,8 +1612,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // Stable assignments from the meta store, which revision is bounded by the current pending event. CompletableFuture<Entry> stableAssignmentsFuture = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), revision); - return tablesById(revision) - .thenCombineAsync(stableAssignmentsFuture, (tables, stableAssignmentsEntry) -> { + Set<Assignment> pendingAssignments = ByteUtils.fromBytes(pendingAssignmentsEntry.value()); + + return tablesByIdVv.get(revision) + .thenCombine(stableAssignmentsFuture, (tables, stableAssignmentsEntry) -> { if (!busyLock.enterBusy()) { return CompletableFuture.<Void>failedFuture(new NodeStoppingException()); } @@ -1602,163 +1633,126 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { return CompletableFutures.<Void>nullCompletedFuture(); } + if (LOG.isInfoEnabled()) { + var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8); + + LOG.info("Received update on pending assignments. Check if new raft group should be started" + + " [key={}, partition={}, table={}, localMemberAddress={}]", + stringKey, partId, table.name(), localNode().address()); + } + + Set<Assignment> stableAssignments = stableAssignmentsEntry.value() == null + ? emptySet() + : ByteUtils.fromBytes(stableAssignmentsEntry.value()); + return handleChangePendingAssignmentEvent( - replicaGrpId, - table, - pendingAssignmentsEntry, - stableAssignmentsEntry, - revision - ); + replicaGrpId, + table, + pendingAssignments, + stableAssignments, + revision, + isRecovery + ) + .thenCompose(v -> changePeersOnRebalance(table, replicaGrpId, pendingAssignments, revision)); } finally { busyLock.leaveBusy(); } - }, ioExecutor) + }) .thenCompose(Function.identity()); } private CompletableFuture<Void> handleChangePendingAssignmentEvent( TablePartitionId replicaGrpId, TableImpl tbl, - Entry pendingAssignmentsEntry, - Entry stableAssignmentsEntry, - long revision + Set<Assignment> pendingAssignments, + Set<Assignment> stableAssignments, + long revision, + boolean isRecovery ) { ClusterNode localMember = localNode(); - int partId = replicaGrpId.partitionId(); - - if (LOG.isInfoEnabled()) { - var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8); - - LOG.info("Received update on pending assignments. Check if new raft group should be started" - + " [key={}, partition={}, table={}, localMemberAddress={}]", - stringKey, partId, tbl.name(), localMember.address()); - } - - Set<Assignment> pendingAssignments = ByteUtils.fromBytes(pendingAssignmentsEntry.value()); - - Set<Assignment> stableAssignments = ByteUtils.fromBytes(stableAssignmentsEntry.value()); - // Start a new Raft node and Replica if this node has appeared in the new assignments. - boolean shouldStartLocalServices = pendingAssignments.stream() + boolean shouldStartLocalGroupNode = pendingAssignments.stream() .filter(assignment -> localMember.name().equals(assignment.consistentId())) .anyMatch(assignment -> !stableAssignments.contains(assignment)); CompletableFuture<Void> localServicesStartFuture; - if (shouldStartLocalServices) { + if (shouldStartLocalGroupNode) { localServicesStartFuture = localPartsByTableIdVv.get(revision) .thenComposeAsync(oldMap -> { - int tableId = tbl.tableId(); - - PartitionSet partitionSet = oldMap.get(tableId).copy(); - - return getOrCreatePartitionStorages(tbl, partitionSet).thenApply(u -> { - var newMap = new HashMap<>(oldMap); - - newMap.put(tableId, partitionSet); - - return newMap; - }); - }, ioExecutor) - .thenRunAsync(() -> inBusyLock(busyLock, () -> { - InternalTable internalTable = tbl.internalTable(); - - var safeTimeTracker = new PendingComparableValuesTracker<HybridTimestamp, Void>(HybridTimestamp.MIN_VALUE); - var storageIndexTracker = new PendingComparableValuesTracker<Long, Void>(0L); - - PartitionStorages partitionStorages = getPartitionStorages(tbl, partId); - - MvPartitionStorage mvPartitionStorage = partitionStorages.getMvPartitionStorage(); - TxStateStorage txStatePartitionStorage = partitionStorages.getTxStateStorage(); - - PartitionDataStorage partitionDataStorage = partitionDataStorage(mvPartitionStorage, internalTable, partId); - - PartitionUpdateHandlers partitionUpdateHandlers = createPartitionUpdateHandlers( - partId, - partitionDataStorage, - tbl, - safeTimeTracker - ); + if (shouldStartLocalGroupNode) { + // TODO https://issues.apache.org/jira/browse/IGNITE-20957 This is incorrect usage of the value stored in + // TODO versioned value. See ticket for the details. + int tableId = tbl.tableId(); - PeersAndLearners pendingConfiguration = configurationFromAssignments(pendingAssignments); + PartitionSet partitionSet = oldMap.get(tableId).copy(); - try { - Peer serverPeer = pendingConfiguration.peer(localMember.name()); + return getOrCreatePartitionStorages(tbl, partitionSet).thenApply(u -> { + var newMap = new HashMap<>(oldMap); - RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, serverPeer); + newMap.put(tableId, partitionSet); - if (!((Loza) raftMgr).isStarted(raftNodeId)) { - PeersAndLearners stableConfiguration = configurationFromAssignments(stableAssignments); - - startPartitionRaftGroupNode( - replicaGrpId, - pendingConfiguration, - stableConfiguration, - safeTimeTracker, - storageIndexTracker, - internalTable, - txStatePartitionStorage, - partitionDataStorage, - partitionUpdateHandlers - ); - } - - if (!replicaMgr.isReplicaStarted(replicaGrpId)) { - startReplicaWithNewListener( - replicaGrpId, - tbl, - safeTimeTracker, - storageIndexTracker, - mvPartitionStorage, - txStatePartitionStorage, - partitionUpdateHandlers, - (TopologyAwareRaftGroupService) internalTable.partitionRaftGroupService(partId) - ); - } - } catch (NodeStoppingException ignored) { - // No-op. + return newMap; + }); + } else { + return nullCompletedFuture(); } - }), ioExecutor); + }, ioExecutor) + .thenComposeAsync(unused -> inBusyLock(busyLock, () -> startPartitionAndStartClient( + tbl, + replicaGrpId.partitionId(), + pendingAssignments, + isRecovery + )), ioExecutor); } else { localServicesStartFuture = nullCompletedFuture(); } - return localServicesStartFuture - .thenCompose(v -> { - RaftGroupService partGrpSvc = tbl.internalTable().partitionRaftGroupService(partId); + return localServicesStartFuture; + } - return partGrpSvc.refreshAndGetLeaderWithTerm() - .thenCompose(leaderWithTerm -> { - if (!isLocalPeer(leaderWithTerm.leader())) { - return nullCompletedFuture(); - } + private CompletableFuture<Void> changePeersOnRebalance( + TableImpl table, + TablePartitionId replicaGrpId, + Set<Assignment> pendingAssignments, + long revision + ) { + int partId = replicaGrpId.partitionId(); - // run update of raft configuration if this node is a leader - LOG.info("Current node={} is the leader of partition raft group={}. " - + "Initiate rebalance process for partition={}, table={}", - leaderWithTerm.leader(), replicaGrpId, partId, tbl.name()); + RaftGroupService partGrpSvc = table.internalTable().partitionRaftGroupService(partId); - return metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)) - .thenCompose(latestPendingAssignmentsEntry -> { - // Do not change peers of the raft group if this is a stale event. - // Note that we start raft node before for the sake of the consistency in a starting and - // stopping raft nodes. - if (revision < latestPendingAssignmentsEntry.revision()) { - return nullCompletedFuture(); - } + return partGrpSvc.refreshAndGetLeaderWithTerm() + .thenCompose(leaderWithTerm -> { + if (!isLocalPeer(leaderWithTerm.leader())) { + return nullCompletedFuture(); + } - PeersAndLearners newConfiguration = configurationFromAssignments(pendingAssignments); + // run update of raft configuration if this node is a leader + LOG.info("Current node={} is the leader of partition raft group={}. " + + "Initiate rebalance process for partition={}, table={}", + leaderWithTerm.leader(), replicaGrpId, partId, table.name()); + + return metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)) + .thenCompose(latestPendingAssignmentsEntry -> { + // Do not change peers of the raft group if this is a stale event. + // Note that we start raft node before for the sake of the consistency in a + // starting and stopping raft nodes. + if (revision < latestPendingAssignmentsEntry.revision()) { + return nullCompletedFuture(); + } - return partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term()); - }); + PeersAndLearners newConfiguration = + configurationFromAssignments(pendingAssignments); + + return partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term()); }); }); } private void startPartitionRaftGroupNode( TablePartitionId replicaGrpId, - PeersAndLearners pendingConfiguration, + RaftNodeId raftNodeId, PeersAndLearners stableConfiguration, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, PendingComparableValuesTracker<Long, Void> storageIndexTracker, @@ -1767,8 +1761,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PartitionDataStorage partitionDataStorage, PartitionUpdateHandlers partitionUpdateHandlers ) throws NodeStoppingException { - ClusterNode localMember = localNode(); - RaftGroupOptions groupOptions = groupOptionsForPartition( internalTable.storage(), internalTable.txStateStorage(), @@ -1794,10 +1786,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { rebalanceScheduler ); - Peer serverPeer = pendingConfiguration.peer(localMember.name()); - - var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer); - // TODO: use RaftManager interface, see https://issues.apache.org/jira/browse/IGNITE-18273 ((Loza) raftMgr).startRaftGroupNode( raftNodeId, @@ -1966,12 +1954,29 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { // these updates always processing only 1 partition, so, only 1 stable partition key. assert evt.single() : evt; + if (evt.entryEvent().oldEntry() == null) { + // This means it's an event on table creation. + return nullCompletedFuture(); + } + Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry(); + long revision = evt.revision(); + + assert stableAssignmentsWatchEvent.revision() == revision : stableAssignmentsWatchEvent; + if (stableAssignmentsWatchEvent.value() == null) { return nullCompletedFuture(); } + return handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, evt.revision(), false); + } + + protected CompletableFuture<Void> handleChangeStableAssignmentEvent( + Entry stableAssignmentsWatchEvent, + long revision, + boolean isRecovery + ) { int partitionId = extractPartitionNumber(stableAssignmentsWatchEvent.key()); int tableId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX); @@ -1979,38 +1984,64 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { Set<Assignment> stableAssignments = ByteUtils.fromBytes(stableAssignmentsWatchEvent.value()); - return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId), stableAssignmentsWatchEvent.revision()) + return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId), revision) .thenComposeAsync(pendingAssignmentsEntry -> { - // Update raft client peers and learners according to the actual assignments. - CompletableFuture<Void> raftClientUpdateFuture = tablesById(evt.revision()).thenAccept(t -> { - t.get(tableId).internalTable() - .partitionRaftGroupService(tablePartitionId.partitionId()) - .updateConfiguration(configurationFromAssignments(stableAssignments)); - }); - byte[] pendingAssignmentsFromMetaStorage = pendingAssignmentsEntry.value(); Set<Assignment> pendingAssignments = pendingAssignmentsFromMetaStorage == null ? Set.of() : ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage); - String localMemberName = localNode().name(); + return stopAndDestroyPartitionAndUpdateClients( + tablePartitionId, + stableAssignments, + pendingAssignments, + isRecovery, + revision + ); + }, ioExecutor); + } - boolean shouldStopLocalServices = Stream.concat(stableAssignments.stream(), pendingAssignments.stream()) - .noneMatch(assignment -> assignment.consistentId().equals(localMemberName)); + private CompletableFuture<Void> updatePartitionClients( + TablePartitionId tablePartitionId, + Set<Assignment> stableAssignments, + long revision + ) { + // Update raft client peers and learners according to the actual assignments. + return tablesById(revision).thenAccept(t -> { + t.get(tablePartitionId.tableId()).internalTable() + .partitionRaftGroupService(tablePartitionId.partitionId()) + .updateConfiguration(configurationFromAssignments(stableAssignments)); + }); + } - if (shouldStopLocalServices) { - return allOf( - raftClientUpdateFuture, - stopAndDestroyPartition(tablePartitionId, evt.revision())); - } else { - return raftClientUpdateFuture; - } - }, ioExecutor); + private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients( + TablePartitionId tablePartitionId, + Set<Assignment> stableAssignments, + Set<Assignment> pendingAssignments, + boolean isRecovery, + long revision + ) { + CompletableFuture<Void> clientUpdateFuture = isRecovery + // Updating clients is not needed on recovery. + ? nullCompletedFuture() + : updatePartitionClients(tablePartitionId, stableAssignments, revision); + + boolean shouldStopLocalServices = Stream.concat(stableAssignments.stream(), pendingAssignments.stream()) + .noneMatch(assignment -> assignment.consistentId().equals(localNode().name())); + + if (shouldStopLocalServices) { + return allOf( + clientUpdateFuture, + stopAndDestroyPartition(tablePartitionId, revision) + ); + } else { + return clientUpdateFuture; + } } private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId tablePartitionId, long causalityToken) { - return tablesById(causalityToken) + return tablesByIdVv.get(causalityToken) .thenCompose(tables -> { TableImpl table = tables.get(tablePartitionId.tableId());