This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8980c45e57 IGNITE-20210 Start partitions on corresponding 
assignments.stable, calculate if missing, cleanup obsolete resources (part 1) 
(#2867)
8980c45e57 is described below

commit 8980c45e57a69478285a03a5c7692dad6220bd4c
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Wed Dec 20 13:36:22 2023 +0300

    IGNITE-20210 Start partitions on corresponding assignments.stable, 
calculate if missing, cleanup obsolete resources (part 1) (#2867)
---
 .../distributionzones/rebalance/RebalanceUtil.java |  50 +-
 .../apache/ignite/internal/index/IndexManager.java |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        | 101 +++
 .../rebalance/ItRebalanceDistributedTest.java      |  21 +-
 .../internal/table/distributed/TableManager.java   | 703 +++++++++++----------
 5 files changed, 516 insertions(+), 361 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 9c3e178236..3e95302031 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -48,12 +48,12 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Util class for methods needed for the rebalance process.
@@ -465,19 +465,24 @@ public class RebalanceUtil {
     }
 
     /**
-     * Returns partition assignments from vault.
+     * Returns partition assignments from meta storage locally.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param partitionNumber Partition number.
-     * @return Returns partition assignments from vault or {@code null} if 
assignments is absent.
+     * @param revision Revision.
+     * @return Returns partition assignments from meta storage locally or 
{@code null} if assignments is absent.
      */
-    public static Set<Assignment> partitionAssignments(
-            VaultManager vaultManager, int tableId, int partitionNumber) {
-        VaultEntry entry =
-                vaultManager.get(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber))).join();
+    @Nullable
+    public static Set<Assignment> partitionAssignmentsGetLocally(
+            MetaStorageManager metaStorageManager,
+            int tableId,
+            int partitionNumber,
+            long revision
+    ) {
+        Entry entry = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionNumber)), revision);
 
-        return (entry == null) ? null : ByteUtils.fromBytes(entry.value());
+        return (entry == null || entry.empty() || entry.tombstone()) ? null : 
ByteUtils.fromBytes(entry.value());
     }
 
     /**
@@ -516,25 +521,28 @@ public class RebalanceUtil {
     }
 
     /**
-     * Returns table assignments for all table partitions from vault.
+     * Returns table assignments for all table partitions from meta storage 
locally. Assignments must be present.
      *
-     * @param vaultManager Vault manager.
+     * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
      * @param numberOfPartitions Number of partitions.
+     * @param revision Revision.
      * @return Future with table assignments as a value.
      */
-    public static List<Set<Assignment>> tableAssignments(
-            VaultManager vaultManager,
+    public static List<Set<Assignment>> tableAssignmentsGetLocally(
+            MetaStorageManager metaStorageManager,
             int tableId,
-            int numberOfPartitions
+            int numberOfPartitions,
+            long revision
     ) {
         return IntStream.range(0, numberOfPartitions)
-                .mapToObj(i ->
-                        (Set<Assignment>) ByteUtils.fromBytes(
-                                vaultManager.get(
-                                        stablePartAssignmentsKey(new 
TablePartitionId(tableId, i))
-                                ).join().value())
-                )
+                .mapToObj(p -> {
+                    Entry e = 
metaStorageManager.getLocally(stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)), revision);
+
+                    assert e != null && !e.empty() && !e.tombstone() : e;
+
+                    return (Set<Assignment>) ByteUtils.fromBytes(e.value());
+                })
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 1fbc57e70a..8bb672fc4e 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -331,7 +331,7 @@ public class IndexManager implements IgniteComponent {
                     if (throwable != null) {
                         LOG.error("Error starting indexes", throwable);
                     } else {
-                        LOG.debug("Indexes started successfully");
+                        LOG.info("Indexes started successfully");
                     }
                 });
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index cc8fd11bd7..72f1b5f5c1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -18,7 +18,11 @@
 package org.apache.ignite.internal.runner.app;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -58,6 +62,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.BaseIgniteRestartTest;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.ClockWaiter;
@@ -92,6 +97,7 @@ import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
@@ -103,6 +109,7 @@ import 
org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -110,6 +117,7 @@ import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
@@ -124,6 +132,7 @@ import 
org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -131,6 +140,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
@@ -1146,6 +1156,13 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      * The test for node restart when there is a gap between the node local 
configuration and distributed configuration.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19712";)
+    // TODO https://issues.apache.org/jira/browse/IGNITE-19712 This test 
should work, but is disabled because of assertion errors.
+    // TODO Root cause of errors is the absence of the indexes on the 
partition after restart. Scenario: indexes are recovered from
+    // TODO the catalog, then partition storages are cleaned up on recovery 
due to the absence of the node in stable assignments,
+    // TODO then after recovery the pending assignments event is processed, 
and it creates the storages and partition again, but
+    // TODO doesn't register the indexes. As a result, indexes are not found 
and assertion happens.
+    // TODO This also causes 
https://issues.apache.org/jira/browse/IGNITE-20996 .
     public void testCfgGap() {
         List<IgniteImpl> nodes = startNodes(4);
 
@@ -1191,6 +1208,90 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         startNodes(3);
     }
 
+    @Test
+    public void destroyObsoleteStoragesOnRestart() throws InterruptedException 
{
+        int nodesCount = 3;
+        List<IgniteImpl> nodes = startNodes(nodesCount);
+
+        int partitions = nodesCount;
+        int replicas = nodesCount;
+        createTableWithData(nodes, TABLE_NAME, replicas, partitions);
+
+        int restartedNodeIndex = nodesCount - 1;
+
+        WatchListenerInhibitor inhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex));
+
+        inhibitor.startInhibit();
+
+        // Change the zone - one replica per partition.
+        alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", 
TABLE_NAME.toUpperCase()), 1);
+
+        stopNode(restartedNodeIndex);
+
+        inhibitor.stopInhibit();
+
+        IgniteImpl restartedNode = startNode(restartedNodeIndex);
+
+        TableImpl table = (TableImpl) restartedNode.tables().table(TABLE_NAME);
+
+        assertTrue(waitForCondition(() -> {
+            // Check that only storage for 1 partition left on the restarted 
node.
+            return IntStream.range(0, partitions)
+                    .mapToObj(i -> 
table.internalTable().storage().getMvPartition(i))
+                    .filter(Objects::nonNull)
+                    .count() == 1;
+        }, 10_000));
+    }
+
+    @Test
+    public void testCorrectPartitionRecoveryOnSnapshot() throws 
InterruptedException {
+        int nodesCount = 3;
+        List<IgniteImpl> nodes = startNodes(nodesCount);
+
+        int partitions = nodesCount;
+        int replicas = nodesCount;
+        createTableWithData(nodes, TABLE_NAME, replicas, partitions);
+
+        int restartedNodeIndex = nodesCount - 1;
+
+        WatchListenerInhibitor inhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(nodes.get(restartedNodeIndex));
+
+        inhibitor.startInhibit();
+
+        alterZone(nodes.get(0).catalogManager(), String.format("ZONE_%s", 
TABLE_NAME.toUpperCase()), 1);
+
+        stopNode(restartedNodeIndex);
+
+        inhibitor.stopInhibit();
+
+        forceSnapshotUsageOnRestart(nodes.get(0));
+
+        IgniteImpl restartedNode = startNode(restartedNodeIndex);
+
+        TableImpl table = (TableImpl) restartedNode.tables().table(TABLE_NAME);
+
+        long recoveryRevision = 
restartedNode.metaStorageManager().recoveryFinishedFuture().join();
+
+        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(nodes.stream().map(IgniteImpl::name)
+                .collect(toSet()), Set.of());
+
+        for (int p = 0; p < partitions; p++) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), p);
+
+            Entry e = 
restartedNode.metaStorageManager().getLocally(stablePartAssignmentsKey(tablePartitionId),
 recoveryRevision);
+
+            Set<Assignment> assignment = ByteUtils.fromBytes(e.value());
+
+            boolean shouldBe = assignment.stream().anyMatch(n -> 
n.consistentId().equals(restartedNode.name()));
+
+            Peer peer = configuration.peer(restartedNode.name());
+
+            boolean isStarted = restartedNode.raftManager().isStarted(new 
RaftNodeId(tablePartitionId, peer));
+
+            assertEquals(shouldBe, isStarted);
+        }
+    }
+
     /**
      * Checks the table exists and validates all data in it.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index f78aef081f..f6f81281b0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -154,6 +154,7 @@ import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryDataStora
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
@@ -503,7 +504,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20210";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19170";)
     void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) 
throws Exception {
         Node node = getNode(0);
 
@@ -701,8 +702,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
     private void verifyThatRaftNodesAndReplicasWereStartedOnlyOnce() throws 
Exception {
         for (int i = 0; i < NODE_COUNT; i++) {
-            verify(getNode(i).raftManager, times(1))
-                    .startRaftGroupNode(any(), any(), any(), any(), 
any(RaftGroupOptions.class));
+            verify(getNode(i).raftManager, times(1)).startRaftGroupNode(any(), 
any(), any(), any(), any(RaftGroupOptions.class));
             verify(getNode(i).replicaManager, times(1)).startReplica(any(), 
any(), any(), any(), any());
         }
     }
@@ -712,6 +712,21 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                 () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n, 
partNum).size() == replicasNum),
                 (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
         ));
+
+        if (replicasNum == nodes.size()) {
+            assertTrue(waitForCondition(
+                    () -> {
+                        try {
+                            return ((TableImpl) 
nodes.get(0).tableManager.table(TABLE_NAME))
+                                    
.internalTable().partitionRaftGroupService(partNum) != null;
+                        } catch (IgniteInternalException e) {
+                            // Raft group service not found.
+                            return false;
+                        }
+                    },
+                    (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+            ));
+        }
     }
 
     private void waitPartitionPendingAssignmentsSyncedToExpected(int partNum, 
int replicasNum) throws Exception {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index cacdb8edb7..423a3a03b7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableMap;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.anyOf;
@@ -26,12 +27,13 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -68,11 +70,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongFunction;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -217,9 +219,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
-    /** Vault manager. */
-    private final VaultManager vaultManager;
-
     /** Data storage manager. */
     private final DataStorageManager dataStorageMgr;
 
@@ -244,7 +243,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /**
      * Versioned store for tracking RAFT groups initialization and starting 
completion.
      *
-     * <p>Only explicitly updated in {@link 
#createTablePartitionsLocally(long, CompletableFuture, TableImpl)}.
+     * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndUpdateClients(CompletableFuture, TableImpl)}.
      *
      * <p>Completed strictly after {@link #localPartsByTableIdVv}.
      */
@@ -393,7 +392,6 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.dataStorageMgr = dataStorageMgr;
         this.storagePath = storagePath;
         this.metaStorageMgr = metaStorageMgr;
-        this.vaultManager = vaultManager;
         this.schemaManager = schemaManager;
         this.volatileLogStorageFactoryCreator = 
volatileLogStorageFactoryCreator;
         this.clock = clock;
@@ -494,7 +492,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             startTables(recoveryRevision);
 
-            performRebalanceOnRecovery(recoveryRevision);
+            processAssignmentsOnRecovery(recoveryRevision);
 
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
             
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
@@ -516,18 +514,38 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private void performRebalanceOnRecovery(long recoveryRevision) {
-        CompletableFuture<Void> pendingAssignmentsRecoveryFuture;
+    private void processAssignmentsOnRecovery(long recoveryRevision) {
+        var stableAssignmentsPrefix = new ByteArray(STABLE_ASSIGNMENTS_PREFIX);
+        var pendingAssignmentsPrefix = new 
ByteArray(PENDING_ASSIGNMENTS_PREFIX);
 
-        var prefix = new ByteArray(PENDING_ASSIGNMENTS_PREFIX);
+        startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
+                stableAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangeStableAssignmentEvent(entry, rev, 
true),
+                "stable"
+        ));
+        startVv.update(recoveryRevision, (v, e) -> handleAssignmentsOnRecovery(
+                pendingAssignmentsPrefix,
+                recoveryRevision,
+                (entry, rev) -> handleChangePendingAssignmentEvent(entry, rev, 
true),
+                "pending"
+        ));
+    }
 
-        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
recoveryRevision)) {
+    private CompletableFuture<Void> handleAssignmentsOnRecovery(
+            ByteArray prefix,
+            long revision,
+            BiFunction<Entry, Long, CompletableFuture<Void>> 
assignmentsEventHandler,
+            String assignmentsType
+    ) {
+        try (Cursor<Entry> cursor = metaStorageMgr.prefixLocally(prefix, 
revision)) {
             CompletableFuture<?>[] futures = cursor.stream()
-                    .map(pendingAssignmentEntry -> {
+                    .map(entry -> {
                         if (LOG.isInfoEnabled()) {
                             LOG.info(
-                                    "Missed pending assignments for key '{}' 
discovered, performing recovery",
-                                    new String(pendingAssignmentEntry.key(), 
UTF_8)
+                                    "Missed {} assignments for key '{}' 
discovered, performing recovery",
+                                    assignmentsType,
+                                    new String(entry.key(), UTF_8)
                             );
                         }
 
@@ -535,20 +553,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                         // 'handleChangePendingAssignmentEvent' accesses some 
Versioned Values that only store values starting with
                         // tokens equal to Meta Storage recovery revision. In 
other words, if the entry has a lower revision than the
                         // recovery revision, there will never be a Versioned 
Value corresponding to its revision.
-                        return 
handleChangePendingAssignmentEvent(pendingAssignmentEntry, recoveryRevision);
+                        return assignmentsEventHandler.apply(entry, revision);
                     })
                     .toArray(CompletableFuture[]::new);
 
-            pendingAssignmentsRecoveryFuture = allOf(futures)
+
+            return allOf(futures)
                     // Simply log any errors, we don't want to block watch 
processing.
                     .exceptionally(e -> {
-                        LOG.error("Error when performing pending assignments 
recovery", e);
+                        LOG.error("Error when performing assignments 
recovery", e);
 
                         return null;
                     });
         }
-
-        startVv.update(recoveryRevision, (v, e) -> 
pendingAssignmentsRecoveryFuture);
     }
 
     private CompletableFuture<?> onTableCreate(CreateTableEventParameters 
parameters) {
@@ -602,220 +619,188 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     /**
      * Updates or creates partition raft groups and storages.
      *
-     * @param causalityToken Causality token.
      * @param assignmentsFuture Table assignments.
      * @param table Initialized table entity.
      * @return future, which will be completed when the partitions creations 
done.
      */
-    private CompletableFuture<?> createTablePartitionsLocally(
-            long causalityToken,
+    private CompletableFuture<Void> startLocalPartitionsAndUpdateClients(
             CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
             TableImpl table
     ) {
         int tableId = table.tableId();
 
         // Create new raft nodes according to new assignments.
-        Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> 
assignmentsFuture.thenCompose(newAssignments -> {
+        return assignmentsFuture.thenCompose(assignments -> {
             // Empty assignments might be a valid case if tables are created 
from within cluster init HOCON
             // configuration, which is not supported now.
-            assert newAssignments != null : 
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
+            assert assignments != null : IgniteStringFormatter.format("Table 
[id={}] has empty assignments.", tableId);
 
-            int partitions = newAssignments.size();
+            int partitions = assignments.size();
 
-            CompletableFuture<?>[] futures = new 
CompletableFuture<?>[partitions];
+            List<CompletableFuture<?>> futures = new ArrayList<>();
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 
Process assignments and set partitions only for assigned partitions.
-            PartitionSet parts = new BitSetPartitionSet();
+            for (int i = 0; i < partitions; i++) {
+                int partId = i;
 
-            for (int i = 0; i < futures.length; i++) {
-                futures[i] = new CompletableFuture<>();
+                CompletableFuture<?> future = startPartitionAndStartClient(
+                                table,
+                                partId,
+                                assignments.get(partId),
+                                false
+                        )
+                        .whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
+                            }
+                        });
 
-                parts.set(i);
+                futures.add(future);
             }
 
-            String localMemberName = localNode().name();
+            return allOf(futures.toArray(new CompletableFuture<?>[0]));
+        });
+    }
 
-            for (int i = 0; i < partitions; i++) {
-                int partId = i;
+    private CompletableFuture<Void> startPartitionAndStartClient(
+            TableImpl table,
+            int partId,
+            Set<Assignment> newPartAssignment,
+            boolean isRecovery
+    ) {
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+
+        int tableId = table.tableId();
+
+        InternalTable internalTbl = table.internalTable();
 
-                Set<Assignment> newPartAssignment = newAssignments.get(partId);
+        Assignment localMemberAssignment = newPartAssignment.stream()
+                .filter(a -> a.consistentId().equals(localNode().name()))
+                .findAny()
+                .orElse(null);
 
-                InternalTable internalTbl = table.internalTable();
+        PeersAndLearners newConfiguration = 
configurationFromAssignments(newPartAssignment);
 
-                Assignment localMemberAssignment = newPartAssignment.stream()
-                        .filter(a -> a.consistentId().equals(localMemberName))
-                        .findAny()
-                        .orElse(null);
+        TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
 
-                PeersAndLearners newConfiguration = 
configurationFromAssignments(newPartAssignment);
+        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, Void>(
+                new HybridTimestamp(1, 0)
+        );
+        var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
 
-                TablePartitionId replicaGrpId = new TablePartitionId(tableId, 
partId);
+        PartitionStorages partitionStorages = getPartitionStorages(table, 
partId);
 
-                var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, Void>(
-                        new HybridTimestamp(1, 0)
-                );
-                var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
+        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                internalTbl, partId);
 
-                ((InternalTableImpl) 
internalTbl).updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
+        storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), 
null);
 
-                PartitionStorages partitionStorages = 
getPartitionStorages(table, partId);
+        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                partId,
+                partitionDataStorage,
+                table,
+                safeTimeTracker
+        );
+
+        Peer serverPeer = newConfiguration.peer(localNode().name());
 
-                PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
-                        internalTbl, partId);
+        var raftNodeId = localMemberAssignment == null ? null : new 
RaftNodeId(replicaGrpId, serverPeer);
 
-                
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+        boolean shouldStartRaftListeners = localMemberAssignment != null && 
!((Loza) raftMgr).isStarted(raftNodeId);
 
-                PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                        partId,
-                        partitionDataStorage,
-                        table,
-                        safeTimeTracker
-                );
+        if (shouldStartRaftListeners) {
+            ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
 
-                mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+            mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
+        }
 
-                CompletableFuture<Boolean> startGroupFut;
+        CompletableFuture<Boolean> startGroupFut;
 
-                // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
-                if (localMemberAssignment != null) {
-                    CompletableFuture<Boolean> shouldStartGroupFut = 
partitionReplicatorNodeRecovery.shouldStartGroup(
+        // start new nodes, only if it is table creation, other cases will be 
covered by rebalance logic
+        if (localMemberAssignment != null) {
+            CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
+                    ? partitionReplicatorNodeRecovery.shouldStartGroup(
                             replicaGrpId,
                             internalTbl,
                             newConfiguration,
                             localMemberAssignment
-                    );
-
-                    startGroupFut = 
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
-                        if (!startGroup) {
-                            return false;
-                        }
-                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
-
-                        RaftGroupOptions groupOptions = 
groupOptionsForPartition(
-                                internalTbl.storage(),
-                                internalTbl.txStateStorage(),
-                                partitionKey(internalTbl, partId),
-                                partitionUpdateHandlers
-                        );
-
-                        Peer serverPeer = 
newConfiguration.peer(localMemberName);
-
-                        var raftNodeId = new RaftNodeId(replicaGrpId, 
serverPeer);
-
-                        try {
-                            // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
-                            ((Loza) raftMgr).startRaftGroupNode(
-                                    raftNodeId,
-                                    newConfiguration,
-                                    new PartitionListener(
-                                            txManager,
-                                            partitionDataStorage,
-                                            
partitionUpdateHandlers.storageUpdateHandler,
-                                            txStatePartitionStorage,
-                                            safeTimeTracker,
-                                            storageIndexTracker
-                                    ),
-                                    new RebalanceRaftGroupEventsListener(
-                                            metaStorageMgr,
-                                            replicaGrpId,
-                                            busyLock,
-                                            createPartitionMover(internalTbl, 
partId),
-                                            this::calculateAssignments,
-                                            rebalanceScheduler
-                                    ),
-                                    groupOptions
-                            );
+                    )
+                    : trueCompletedFuture();
 
-                            return true;
-                        } catch (NodeStoppingException ex) {
-                            throw new CompletionException(ex);
-                        }
-                    }), ioExecutor);
-                } else {
-                    startGroupFut = falseCompletedFuture();
+            startGroupFut = shouldStartGroupFut.thenApplyAsync(startGroup -> 
inBusyLock(busyLock, () -> {
+                if (!startGroup) {
+                    return false;
                 }
 
-                startGroupFut
-                        .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
-                            try {
-                                //TODO IGNITE-19614 This procedure takes 10 
seconds if there's no majority online.
-                                return raftMgr.startRaftGroupService(
-                                        replicaGrpId, newConfiguration, 
raftGroupServiceFactory, raftCommandsMarshaller
-                                );
-                            } catch (NodeStoppingException ex) {
-                                return failedFuture(ex);
-                            }
-                        }), ioExecutor)
-                        .thenAcceptAsync(updatedRaftGroupService -> 
inBusyLock(busyLock, () -> {
-                            ((InternalTableImpl) 
internalTbl).updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService);
-
-                            boolean startedRaftNode = startGroupFut.join();
-                            if (localMemberAssignment == null || 
!startedRaftNode) {
-                                return;
-                            }
-
-                            MvPartitionStorage partitionStorage = 
partitionStorages.getMvPartitionStorage();
-                            TxStateStorage txStateStorage = 
partitionStorages.getTxStateStorage();
-
-                            try {
-                                startReplicaWithNewListener(
-                                        replicaGrpId,
-                                        table,
-                                        safeTimeTracker,
-                                        storageIndexTracker,
-                                        partitionStorage,
-                                        txStateStorage,
-                                        partitionUpdateHandlers,
-                                        updatedRaftGroupService
-                                );
-                            } catch (NodeStoppingException ex) {
-                                throw new AssertionError("Loza was stopped 
before Table manager", ex);
-                            }
-                        }), ioExecutor)
-                        .whenComplete((res, ex) -> {
-                            if (ex != null) {
-                                LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
-
-                                futures[partId].completeExceptionally(ex);
-                            } else {
-                                futures[partId].complete(null);
-                            }
-                        });
-            }
+                if (((Loza) raftMgr).isStarted(raftNodeId)) {
+                    return true;
+                }
 
-            return allOf(futures);
-        });
+                try {
+                    startPartitionRaftGroupNode(
+                            replicaGrpId,
+                            raftNodeId,
+                            newConfiguration,
+                            safeTimeTracker,
+                            storageIndexTracker,
+                            internalTbl,
+                            partitionStorages.getTxStateStorage(),
+                            partitionDataStorage,
+                            partitionUpdateHandlers
+                    );
 
-        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
-        CompletableFuture<?> localPartsUpdateFuture = 
localPartsByTableIdVv.update(causalityToken,
-                (previous, throwable) -> inBusyLock(busyLock, () -> 
assignmentsFuture.thenCompose(newAssignments -> {
-                    PartitionSet parts = new BitSetPartitionSet();
+                    return true;
+                } catch (NodeStoppingException ex) {
+                    throw new CompletionException(ex);
+                }
+            }), ioExecutor);
+        } else {
+            startGroupFut = falseCompletedFuture();
+        }
 
-                    for (int i = 0; i < newAssignments.size(); i++) {
-                        parts.set(i);
+        startGroupFut
+                .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
+                    try {
+                        //TODO IGNITE-19614 This procedure takes 10 seconds if 
there's no majority online.
+                        return raftMgr
+                                .startRaftGroupService(replicaGrpId, 
newConfiguration, raftGroupServiceFactory, raftCommandsMarshaller);
+                    } catch (NodeStoppingException ex) {
+                        return failedFuture(ex);
                     }
+                }), ioExecutor)
+                .thenAcceptAsync(updatedRaftGroupService -> 
inBusyLock(busyLock, () -> {
+                    ((InternalTableImpl) 
internalTbl).updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService);
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenApply(u -> {
-                        var newValue = new HashMap<>(previous);
-
-                        newValue.put(tableId, parts);
-
-                        return newValue;
-                    });
-                })));
+                    boolean startedRaftNode = startGroupFut.join();
+                    if (localMemberAssignment == null || !startedRaftNode || 
replicaMgr.isReplicaStarted(replicaGrpId)) {
+                        return;
+                    }
 
-        // We bring the future outside to avoid OutdatedTokenException.
-        CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = 
tablesByIdVv.get(causalityToken);
+                    try {
+                        startReplicaWithNewListener(
+                                replicaGrpId,
+                                table,
+                                safeTimeTracker,
+                                storageIndexTracker,
+                                partitionStorages.getMvPartitionStorage(),
+                                partitionStorages.getTxStateStorage(),
+                                partitionUpdateHandlers,
+                                updatedRaftGroupService
+                        );
+                    } catch (NodeStoppingException ex) {
+                        throw new AssertionError("Loza was stopped before 
Table manager", ex);
+                    }
+                }), ioExecutor)
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.warn("Unable to update raft groups on the node 
[tableId={}, partitionId={}]", ex, tableId, partId);
 
-        return assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
-            if (e != null) {
-                return failedFuture(e);
-            }
+                        resultFuture.completeExceptionally(ex);
+                    } else {
+                        resultFuture.complete(null);
+                    }
+                });
 
-            return localPartsUpdateFuture.thenCompose(unused ->
-                    tablesByIdFuture.thenComposeAsync(tablesById -> 
inBusyLock(busyLock, updateAssignmentsClosure), ioExecutor)
-            );
-        });
+        return resultFuture;
     }
 
     private void startReplicaWithNewListener(
@@ -1054,9 +1039,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
             // Check if the table already has assignments in the vault.
             // So, it means, that it is a recovery process and we should use 
the vault assignments instead of calculation for the new ones.
-            // TODO: IGNITE-20210 Fix it
-            if (partitionAssignments(vaultManager, tableId, 0) != null) {
-                assignmentsFuture = 
completedFuture(tableAssignments(vaultManager, tableId, 
zoneDescriptor.partitions()));
+            // TODO https://issues.apache.org/jira/browse/IGNITE-20993
+            if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, 
causalityToken) != null) {
+                assignmentsFuture = completedFuture(
+                        tableAssignmentsGetLocally(metaStorageMgr, tableId, 
zoneDescriptor.partitions(), causalityToken));
             } else {
                 assignmentsFuture = 
distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
                         .thenApply(dataNodes -> 
AffinityUtils.calculateAssignments(
@@ -1136,7 +1122,43 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     });
         }));
 
-        CompletableFuture<?> createPartsFut = 
createTablePartitionsLocally(causalityToken, assignmentsFuture, table);
+        // NB: all vv.update() calls must be made from the synchronous part of 
the method (not in thenCompose()/etc!).
+        CompletableFuture<?> localPartsUpdateFuture = 
localPartsByTableIdVv.update(causalityToken,
+                (previous, throwable) -> inBusyLock(busyLock, () -> 
assignmentsFuture.thenCompose(newAssignments -> {
+                    PartitionSet parts = new BitSetPartitionSet();
+
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19713 Process assignments and set 
partitions only for
+                    // TODO assigned partitions.
+                    for (int i = 0; i < newAssignments.size(); i++) {
+                        parts.set(i);
+                    }
+
+                    return getOrCreatePartitionStorages(table, 
parts).thenApply(u -> {
+                        var newValue = new HashMap<>(previous);
+
+                        newValue.put(tableId, parts);
+
+                        return newValue;
+                    });
+                })));
+
+        // We bring the future outside to avoid OutdatedTokenException.
+        CompletableFuture<Map<Integer, TableImpl>> tablesByIdFuture = 
tablesByIdVv.get(causalityToken);
+
+        // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions 
should be started only on the assignments change
+        // TODO event triggered by zone create or alter.
+        CompletableFuture<?> createPartsFut = 
assignmentsUpdatedVv.update(causalityToken, (token, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return localPartsUpdateFuture.thenCompose(unused ->
+                    tablesByIdFuture.thenComposeAsync(tablesById -> inBusyLock(
+                            busyLock,
+                            () -> 
startLocalPartitionsAndUpdateClients(assignmentsFuture, table)
+                    ), ioExecutor)
+            );
+        });
 
         pendingTables.put(tableId, table);
         startedTables.put(tableId, table);
@@ -1242,6 +1264,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
             CompletableFuture<?>[] stopReplicaFutures = new 
CompletableFuture<?>[partitions];
 
+            // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
Partitions should be stopped on the assignments change
+            // TODO event triggered by zone drop or alter.
             for (int partitionId = 0; partitionId < partitions; partitionId++) 
{
                 var replicationGroupId = new TablePartitionId(tableId, 
partitionId);
 
@@ -1341,6 +1365,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * Returns the latest tables by ID map, for which all assignment updates 
have been completed.
      */
     private Map<Integer, TableImpl> latestTablesById() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20915 fix this.
         if (assignmentsUpdatedVv.latestCausalityToken() < 0L) {
             // No tables at all in case of empty causality token.
             return emptyMap();
@@ -1557,7 +1582,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 try {
                     Entry newEntry = evt.entryEvent().newEntry();
 
-                    return handleChangePendingAssignmentEvent(newEntry, 
evt.revision());
+                    return handleChangePendingAssignmentEvent(newEntry, 
evt.revision(), false);
                 } finally {
                     busyLock.leaveBusy();
                 }
@@ -1570,7 +1595,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         };
     }
 
-    private CompletableFuture<Void> handleChangePendingAssignmentEvent(Entry 
pendingAssignmentsEntry, long revision) {
+    private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+            Entry pendingAssignmentsEntry,
+            long revision,
+            boolean isRecovery
+    ) {
         if (pendingAssignmentsEntry.value() == null) {
             return nullCompletedFuture();
         }
@@ -1583,8 +1612,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         // Stable assignments from the meta store, which revision is bounded 
by the current pending event.
         CompletableFuture<Entry> stableAssignmentsFuture = 
metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId), revision);
 
-        return tablesById(revision)
-                .thenCombineAsync(stableAssignmentsFuture, (tables, 
stableAssignmentsEntry) -> {
+        Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
+
+        return tablesByIdVv.get(revision)
+                .thenCombine(stableAssignmentsFuture, (tables, 
stableAssignmentsEntry) -> {
                     if (!busyLock.enterBusy()) {
                         return CompletableFuture.<Void>failedFuture(new 
NodeStoppingException());
                     }
@@ -1602,163 +1633,126 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             return 
CompletableFutures.<Void>nullCompletedFuture();
                         }
 
+                        if (LOG.isInfoEnabled()) {
+                            var stringKey = new 
String(pendingAssignmentsEntry.key(), UTF_8);
+
+                            LOG.info("Received update on pending assignments. 
Check if new raft group should be started"
+                                            + " [key={}, partition={}, 
table={}, localMemberAddress={}]",
+                                    stringKey, partId, table.name(), 
localNode().address());
+                        }
+
+                        Set<Assignment> stableAssignments = 
stableAssignmentsEntry.value() == null
+                                ? emptySet()
+                                : 
ByteUtils.fromBytes(stableAssignmentsEntry.value());
+
                         return handleChangePendingAssignmentEvent(
-                                replicaGrpId,
-                                table,
-                                pendingAssignmentsEntry,
-                                stableAssignmentsEntry,
-                                revision
-                        );
+                                    replicaGrpId,
+                                    table,
+                                    pendingAssignments,
+                                    stableAssignments,
+                                    revision,
+                                    isRecovery
+                                )
+                                .thenCompose(v -> 
changePeersOnRebalance(table, replicaGrpId, pendingAssignments, revision));
                     } finally {
                         busyLock.leaveBusy();
                     }
-                }, ioExecutor)
+                })
                 .thenCompose(Function.identity());
     }
 
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
             TablePartitionId replicaGrpId,
             TableImpl tbl,
-            Entry pendingAssignmentsEntry,
-            Entry stableAssignmentsEntry,
-            long revision
+            Set<Assignment> pendingAssignments,
+            Set<Assignment> stableAssignments,
+            long revision,
+            boolean isRecovery
     ) {
         ClusterNode localMember = localNode();
 
-        int partId = replicaGrpId.partitionId();
-
-        if (LOG.isInfoEnabled()) {
-            var stringKey = new String(pendingAssignmentsEntry.key(), UTF_8);
-
-            LOG.info("Received update on pending assignments. Check if new 
raft group should be started"
-                            + " [key={}, partition={}, table={}, 
localMemberAddress={}]",
-                    stringKey, partId, tbl.name(), localMember.address());
-        }
-
-        Set<Assignment> pendingAssignments = 
ByteUtils.fromBytes(pendingAssignmentsEntry.value());
-
-        Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsEntry.value());
-
         // Start a new Raft node and Replica if this node has appeared in the 
new assignments.
-        boolean shouldStartLocalServices = pendingAssignments.stream()
+        boolean shouldStartLocalGroupNode = pendingAssignments.stream()
                 .filter(assignment -> 
localMember.name().equals(assignment.consistentId()))
                 .anyMatch(assignment -> 
!stableAssignments.contains(assignment));
 
         CompletableFuture<Void> localServicesStartFuture;
 
-        if (shouldStartLocalServices) {
+        if (shouldStartLocalGroupNode) {
             localServicesStartFuture = localPartsByTableIdVv.get(revision)
                     .thenComposeAsync(oldMap -> {
-                        int tableId = tbl.tableId();
-
-                        PartitionSet partitionSet = oldMap.get(tableId).copy();
-
-                        return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
-                            var newMap = new HashMap<>(oldMap);
-
-                            newMap.put(tableId, partitionSet);
-
-                            return newMap;
-                        });
-                    }, ioExecutor)
-                    .thenRunAsync(() -> inBusyLock(busyLock, () -> {
-                        InternalTable internalTable = tbl.internalTable();
-
-                        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
-                        var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
-
-                        PartitionStorages partitionStorages = 
getPartitionStorages(tbl, partId);
-
-                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
-                        TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
-
-                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
-
-                        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                                partId,
-                                partitionDataStorage,
-                                tbl,
-                                safeTimeTracker
-                        );
+                        if (shouldStartLocalGroupNode) {
+                            // TODO 
https://issues.apache.org/jira/browse/IGNITE-20957 This is incorrect usage of 
the value stored in
+                            // TODO versioned value. See ticket for the 
details.
+                            int tableId = tbl.tableId();
 
-                        PeersAndLearners pendingConfiguration = 
configurationFromAssignments(pendingAssignments);
+                            PartitionSet partitionSet = 
oldMap.get(tableId).copy();
 
-                        try {
-                            Peer serverPeer = 
pendingConfiguration.peer(localMember.name());
+                            return getOrCreatePartitionStorages(tbl, 
partitionSet).thenApply(u -> {
+                                var newMap = new HashMap<>(oldMap);
 
-                            RaftNodeId raftNodeId = new 
RaftNodeId(replicaGrpId, serverPeer);
+                                newMap.put(tableId, partitionSet);
 
-                            if (!((Loza) raftMgr).isStarted(raftNodeId)) {
-                                PeersAndLearners stableConfiguration = 
configurationFromAssignments(stableAssignments);
-
-                                startPartitionRaftGroupNode(
-                                        replicaGrpId,
-                                        pendingConfiguration,
-                                        stableConfiguration,
-                                        safeTimeTracker,
-                                        storageIndexTracker,
-                                        internalTable,
-                                        txStatePartitionStorage,
-                                        partitionDataStorage,
-                                        partitionUpdateHandlers
-                                );
-                            }
-
-                            if (!replicaMgr.isReplicaStarted(replicaGrpId)) {
-                                startReplicaWithNewListener(
-                                        replicaGrpId,
-                                        tbl,
-                                        safeTimeTracker,
-                                        storageIndexTracker,
-                                        mvPartitionStorage,
-                                        txStatePartitionStorage,
-                                        partitionUpdateHandlers,
-                                        (TopologyAwareRaftGroupService) 
internalTable.partitionRaftGroupService(partId)
-                                );
-                            }
-                        } catch (NodeStoppingException ignored) {
-                            // No-op.
+                                return newMap;
+                            });
+                        } else {
+                            return nullCompletedFuture();
                         }
-                    }), ioExecutor);
+                    }, ioExecutor)
+                    .thenComposeAsync(unused -> inBusyLock(busyLock, () -> 
startPartitionAndStartClient(
+                            tbl,
+                            replicaGrpId.partitionId(),
+                            pendingAssignments,
+                            isRecovery
+                    )), ioExecutor);
         } else {
             localServicesStartFuture = nullCompletedFuture();
         }
 
-        return localServicesStartFuture
-                .thenCompose(v -> {
-                    RaftGroupService partGrpSvc = 
tbl.internalTable().partitionRaftGroupService(partId);
+        return localServicesStartFuture;
+    }
 
-                    return partGrpSvc.refreshAndGetLeaderWithTerm()
-                            .thenCompose(leaderWithTerm -> {
-                                if (!isLocalPeer(leaderWithTerm.leader())) {
-                                    return nullCompletedFuture();
-                                }
+    private CompletableFuture<Void> changePeersOnRebalance(
+            TableImpl table,
+            TablePartitionId replicaGrpId,
+            Set<Assignment> pendingAssignments,
+            long revision
+    ) {
+        int partId = replicaGrpId.partitionId();
 
-                                // run update of raft configuration if this 
node is a leader
-                                LOG.info("Current node={} is the leader of 
partition raft group={}. "
-                                                + "Initiate rebalance process 
for partition={}, table={}",
-                                        leaderWithTerm.leader(), replicaGrpId, 
partId, tbl.name());
+        RaftGroupService partGrpSvc = 
table.internalTable().partitionRaftGroupService(partId);
 
-                                return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
-                                        
.thenCompose(latestPendingAssignmentsEntry -> {
-                                            // Do not change peers of the raft 
group if this is a stale event.
-                                            // Note that we start raft node 
before for the sake of the consistency in a starting and
-                                            // stopping raft nodes.
-                                            if (revision < 
latestPendingAssignmentsEntry.revision()) {
-                                                return nullCompletedFuture();
-                                            }
+        return partGrpSvc.refreshAndGetLeaderWithTerm()
+                .thenCompose(leaderWithTerm -> {
+                    if (!isLocalPeer(leaderWithTerm.leader())) {
+                        return nullCompletedFuture();
+                    }
 
-                                            PeersAndLearners newConfiguration 
= configurationFromAssignments(pendingAssignments);
+                    // run update of raft configuration if this node is a 
leader
+                    LOG.info("Current node={} is the leader of partition raft 
group={}. "
+                                    + "Initiate rebalance process for 
partition={}, table={}",
+                            leaderWithTerm.leader(), replicaGrpId, partId, 
table.name());
+
+                    return 
metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId))
+                            .thenCompose(latestPendingAssignmentsEntry -> {
+                                // Do not change peers of the raft group if 
this is a stale event.
+                                // Note that we start raft node before for the 
sake of the consistency in a
+                                // starting and stopping raft nodes.
+                                if (revision < 
latestPendingAssignmentsEntry.revision()) {
+                                    return nullCompletedFuture();
+                                }
 
-                                            return 
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
-                                        });
+                                PeersAndLearners newConfiguration =
+                                        
configurationFromAssignments(pendingAssignments);
+
+                                return 
partGrpSvc.changePeersAsync(newConfiguration, leaderWithTerm.term());
                             });
                 });
     }
 
     private void startPartitionRaftGroupNode(
             TablePartitionId replicaGrpId,
-            PeersAndLearners pendingConfiguration,
+            RaftNodeId raftNodeId,
             PeersAndLearners stableConfiguration,
             PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
@@ -1767,8 +1761,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             PartitionDataStorage partitionDataStorage,
             PartitionUpdateHandlers partitionUpdateHandlers
     ) throws NodeStoppingException {
-        ClusterNode localMember = localNode();
-
         RaftGroupOptions groupOptions = groupOptionsForPartition(
                 internalTable.storage(),
                 internalTable.txStateStorage(),
@@ -1794,10 +1786,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 rebalanceScheduler
         );
 
-        Peer serverPeer = pendingConfiguration.peer(localMember.name());
-
-        var raftNodeId = new RaftNodeId(replicaGrpId, serverPeer);
-
         // TODO: use RaftManager interface, see 
https://issues.apache.org/jira/browse/IGNITE-18273
         ((Loza) raftMgr).startRaftGroupNode(
                 raftNodeId,
@@ -1966,12 +1954,29 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         // these updates always processing only 1 partition, so, only 1 stable 
partition key.
         assert evt.single() : evt;
 
+        if (evt.entryEvent().oldEntry() == null) {
+            // This means it's an event on table creation.
+            return nullCompletedFuture();
+        }
+
         Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
 
+        long revision = evt.revision();
+
+        assert stableAssignmentsWatchEvent.revision() == revision : 
stableAssignmentsWatchEvent;
+
         if (stableAssignmentsWatchEvent.value() == null) {
             return nullCompletedFuture();
         }
 
+        return handleChangeStableAssignmentEvent(stableAssignmentsWatchEvent, 
evt.revision(), false);
+    }
+
+    protected CompletableFuture<Void> handleChangeStableAssignmentEvent(
+            Entry stableAssignmentsWatchEvent,
+            long revision,
+            boolean isRecovery
+    ) {
         int partitionId = 
extractPartitionNumber(stableAssignmentsWatchEvent.key());
         int tableId = extractTableId(stableAssignmentsWatchEvent.key(), 
STABLE_ASSIGNMENTS_PREFIX);
 
@@ -1979,38 +1984,64 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         Set<Assignment> stableAssignments = 
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
 
-        return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId), 
stableAssignmentsWatchEvent.revision())
+        return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId), 
revision)
                 .thenComposeAsync(pendingAssignmentsEntry -> {
-                    // Update raft client peers and learners according to the 
actual assignments.
-                    CompletableFuture<Void> raftClientUpdateFuture = 
tablesById(evt.revision()).thenAccept(t -> {
-                        t.get(tableId).internalTable()
-                                
.partitionRaftGroupService(tablePartitionId.partitionId())
-                                
.updateConfiguration(configurationFromAssignments(stableAssignments));
-                    });
-
                     byte[] pendingAssignmentsFromMetaStorage = 
pendingAssignmentsEntry.value();
 
                     Set<Assignment> pendingAssignments = 
pendingAssignmentsFromMetaStorage == null
                             ? Set.of()
                             : 
ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
 
-                    String localMemberName = localNode().name();
+                    return stopAndDestroyPartitionAndUpdateClients(
+                            tablePartitionId,
+                            stableAssignments,
+                            pendingAssignments,
+                            isRecovery,
+                            revision
+                    );
+                }, ioExecutor);
+    }
 
-                    boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
-                            .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
+    private CompletableFuture<Void> updatePartitionClients(
+            TablePartitionId tablePartitionId,
+            Set<Assignment> stableAssignments,
+            long revision
+    ) {
+        // Update raft client peers and learners according to the actual 
assignments.
+        return tablesById(revision).thenAccept(t -> {
+            t.get(tablePartitionId.tableId()).internalTable()
+                    .partitionRaftGroupService(tablePartitionId.partitionId())
+                    
.updateConfiguration(configurationFromAssignments(stableAssignments));
+        });
+    }
 
-                    if (shouldStopLocalServices) {
-                        return allOf(
-                                raftClientUpdateFuture,
-                                stopAndDestroyPartition(tablePartitionId, 
evt.revision()));
-                    } else {
-                        return raftClientUpdateFuture;
-                    }
-                }, ioExecutor);
+    private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients(
+            TablePartitionId tablePartitionId,
+            Set<Assignment> stableAssignments,
+            Set<Assignment> pendingAssignments,
+            boolean isRecovery,
+            long revision
+    ) {
+        CompletableFuture<Void> clientUpdateFuture = isRecovery
+                // Updating clients is not needed on recovery.
+                ? nullCompletedFuture()
+                : updatePartitionClients(tablePartitionId, stableAssignments, 
revision);
+
+        boolean shouldStopLocalServices = 
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+                .noneMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
+
+        if (shouldStopLocalServices) {
+            return allOf(
+                    clientUpdateFuture,
+                    stopAndDestroyPartition(tablePartitionId, revision)
+            );
+        } else {
+            return clientUpdateFuture;
+        }
     }
 
     private CompletableFuture<Void> stopAndDestroyPartition(TablePartitionId 
tablePartitionId, long causalityToken) {
-        return tablesById(causalityToken)
+        return tablesByIdVv.get(causalityToken)
                 .thenCompose(tables -> {
                     TableImpl table = tables.get(tablePartitionId.tableId());
 

Reply via email to