This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f5de973f4b IGNITE-20209 Recovery for rebalance triggers (#2919)
f5de973f4b is described below

commit f5de973f4b9a78f3f30a48f04c33353252544e5e
Author: Kirill Gusakov <kgusa...@gmail.com>
AuthorDate: Mon Jan 22 19:06:28 2024 +0300

    IGNITE-20209 Recovery for rebalance triggers (#2919)
---
 .../distributionzones/DistributionZoneManager.java |   2 +-
 .../rebalance/DistributionZoneRebalanceEngine.java |  98 +++++++--
 .../DistributionZoneRebalanceEngineTest.java       |   2 +
 .../rebalance/ItRebalanceTriggersRecoveryTest.java | 245 +++++++++++++++++++++
 .../internal/table/distributed/TableManager.java   |  15 +-
 5 files changed, 334 insertions(+), 28 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 1347bb02d2..8ca12f33df 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -264,7 +264,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
             return allOf(
                     createOrRestoreZonesStates(recoveryRevision),
                     
restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision)
-            ).thenRun(rebalanceEngine::start);
+            ).thenCompose((notUsed) -> rebalanceEngine.start());
         });
     }
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 94ea672eac..1a6d958b76 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.distributionzones.rebalance;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
@@ -32,6 +33,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -99,8 +101,8 @@ public class DistributionZoneRebalanceEngine {
     /**
      * Starts the rebalance engine by registering corresponding meta storage 
and configuration listeners.
      */
-    public void start() {
-        IgniteUtils.inBusyLock(busyLock, () -> {
+    public CompletableFuture<Void> start() {
+        return IgniteUtils.inBusyLockAsync(busyLock, () -> {
             catalogService.listen(ZONE_ALTER, new 
CatalogAlterZoneEventListener(catalogService) {
                 @Override
                 protected CompletableFuture<Void> 
onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
@@ -110,9 +112,45 @@ public class DistributionZoneRebalanceEngine {
 
             // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
             metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), 
dataNodesListener);
+
+            CompletableFuture<Long> recoveryFinishFuture = 
metaStorageManager.recoveryFinishedFuture();
+
+            // At the moment of the start of this manager, it is guaranteed 
that Meta Storage has been recovered.
+            assert recoveryFinishFuture.isDone();
+
+            long recoveryRevision = recoveryFinishFuture.join();
+
+            return rebalanceTriggersRecovery(recoveryRevision);
         });
     }
 
+    /**
+     * Run the update of rebalance metastore's state.
+     *
+     * @param recoveryRevision Recovery revision.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment 
this method produce many metastore multi-invokes
+    // TODO: which can be avoided by the local logic, which mirror the logic 
of metastore invokes.
+    // TODO: And then run the remote invoke, only if needed.
+    private CompletableFuture<Void> rebalanceTriggersRecovery(long 
recoveryRevision) {
+        if (recoveryRevision > 0) {
+            List<CompletableFuture<Void>> zonesRecoveryFutures = 
catalogService.zones(catalogService.latestCatalogVersion())
+                    .stream()
+                    .map(zoneDesc ->
+                            recalculateAssignmentsAndScheduleRebalance(
+                                    zoneDesc,
+                                    recoveryRevision,
+                                    catalogService.latestCatalogVersion()
+                            )
+                    )
+                    .collect(Collectors.toUnmodifiableList());
+
+            return allOf(zonesRecoveryFutures.toArray(new 
CompletableFuture[0]));
+        } else {
+            return completedFuture(null);
+        }
+    }
+
     /**
      * Stops the rebalance engine by unregistering meta storage watches.
      */
@@ -177,27 +215,41 @@ public class DistributionZoneRebalanceEngine {
     }
 
     private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters 
parameters) {
-        return IgniteUtils.inBusyLockAsync(busyLock, () -> {
-            int zoneId = parameters.zoneDescriptor().id();
-            long causalityToken = parameters.causalityToken();
-            int catalogVersion = parameters.catalogVersion();
-
-            return distributionZoneManager.dataNodes(causalityToken, 
catalogVersion, zoneId)
-                    .thenCompose(dataNodes -> {
-                        if (dataNodes.isEmpty()) {
-                            return nullCompletedFuture();
-                        }
-
-                        List<CatalogTableDescriptor> tableDescriptors = 
findTablesByZoneId(zoneId, parameters.catalogVersion());
-
-                        return triggerPartitionsRebalanceForAllTables(
-                                causalityToken,
-                                parameters.zoneDescriptor(),
-                                dataNodes,
-                                tableDescriptors
-                        );
-                    });
-        });
+        return recalculateAssignmentsAndScheduleRebalance(
+                parameters.zoneDescriptor(),
+                parameters.causalityToken(),
+                parameters.catalogVersion()
+        );
+    }
+
+    /**
+     * Recalculate assignments for table partitions of target zone and 
schedule rebalance (by update rebalance metastore keys).
+     *
+     * @param zoneDescriptor Zone descriptor.
+     * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
+     * @return The future, which completes when the all metastore updates done.
+     */
+    private CompletableFuture<Void> recalculateAssignmentsAndScheduleRebalance(
+            CatalogZoneDescriptor zoneDescriptor,
+            long causalityToken,
+            int catalogVersion) {
+
+        return distributionZoneManager.dataNodes(causalityToken, 
catalogVersion, zoneDescriptor.id())
+                .thenCompose(dataNodes -> {
+                    if (dataNodes.isEmpty()) {
+                        return nullCompletedFuture();
+                    }
+
+                    List<CatalogTableDescriptor> tableDescriptors = 
findTablesByZoneId(zoneDescriptor.id(), catalogVersion);
+
+                    return triggerPartitionsRebalanceForAllTables(
+                            causalityToken,
+                            zoneDescriptor,
+                            dataNodes,
+                            tableDescriptors
+                    );
+                });
     }
 
     private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables(
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 2b367a0f5e..bb499fd575 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -148,6 +148,8 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
             return null;
         }).when(metaStorageManager).registerPrefixWatch(any(), any());
 
+        
when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L));
+
         AtomicLong raftIndex = new AtomicLong();
 
         keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName));
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
new file mode 100644
index 0000000000..cfb23c83a8
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rebalance;
+
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
+import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for recovery of the rebalance procedure.
+ */
+public class ItRebalanceTriggersRecoveryTest extends 
ClusterPerTestIntegrationTest {
+    private static final String US_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  clientConnector: { port:{} },\n"
+            + "  nodeAttributes: {\n"
+            + "    nodeAttributes: {region: {attribute: \"US\"}, zone: 
{attribute: \"global\"}}\n"
+            + "  },\n"
+            + "  rest.port: {}\n"
+            + "}";
+
+    private static final String GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  clientConnector: { port:{} },\n"
+            + "  nodeAttributes: {\n"
+            + "    nodeAttributes: {zone: {attribute: \"global\"}}\n"
+            + "  },\n"
+            + "  rest.port: {}\n"
+            + "}";
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void testRebalanceTriggersRecoveryAfterFilterUpdate() throws 
InterruptedException {
+        // The nodes from different regions/zones needed to implement the 
predictable way of nodes choice.
+        startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, 
REPLICAS=2, DATA_NODES_FILTER='$[?(@.region == \"US\")]'");
+            session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name 
INT) WITH PRIMARY_ZONE='TEST_ZONE'");
+            session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+        });
+
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 
10_000));
+        assertFalse(containsPartition(cluster.node(2)));
+
+        // By this we guarantee, that there will no any partition data nodes, 
which will be available to perform the rebalance.
+        // To run the actual changePeersAsync we need the partition leader, 
which catch the metastore event about new pending keys.
+        
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
+        
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "ALTER ZONE TEST_ZONE SET 
DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
+        });
+
+        // Check that metastore node schedule the rebalance procedure.
+        assertTrue(waitForCondition(
+                (() -> getPartitionPendingClusterNodes(node(0), 
0).equals(Set.of(
+                        Assignment.forPeer(node(2).name()),
+                        Assignment.forPeer(node(1).name())))),
+                10_000));
+
+        // Remove the pending keys in a barbarian way. So, the rebalance can 
be triggered only by the recovery logic now.
+        Integer tableId = getTableId(node(0).catalogManager(), "TEST", new 
HybridClockImpl().nowLong());
+        node(0)
+                .metaStorageManager()
+                .remove(pendingPartAssignmentsKey(new 
TablePartitionId(tableId, 0))).join();
+
+        restartNode(1);
+        restartNode(2);
+
+        // Check that new replica from 'global' zone received the data and 
rebalance really happened.
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 
10_000));
+    }
+
+    @Test
+    void testRebalanceTriggersRecoveryAfterReplicasUpdate() throws 
InterruptedException {
+        // The nodes from different regions/zones needed to implement the 
predictable way of nodes choice.
+        startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, 
REPLICAS=1, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
+            session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name 
INT) WITH PRIMARY_ZONE='TEST_ZONE'");
+            session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+        });
+
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 
10_000));
+        assertFalse(containsPartition(cluster.node(2)));
+
+        // By this we guarantee, that there will no any partition data nodes, 
which will be available to perform the rebalance.
+        // To run the actual changePeersAsync we need the partition leader, 
which catch the metastore event about new pending keys.
+        
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit();
+        
WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit();
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2");
+        });
+
+        // Check that metastore node schedule the rebalance procedure.
+        assertTrue(waitForCondition(
+                (() -> getPartitionPendingClusterNodes(node(0), 
0).equals(Set.of(
+                        Assignment.forPeer(node(2).name()),
+                        Assignment.forPeer(node(1).name())))),
+                10_000));
+
+        // Remove the pending keys in a barbarian way. So, the rebalance can 
be triggered only by the recovery logic now.
+        Integer tableId = getTableId(node(0).catalogManager(), "TEST", new 
HybridClockImpl().nowLong());
+        node(0)
+                .metaStorageManager()
+                .remove(pendingPartAssignmentsKey(new 
TablePartitionId(tableId, 0))).join();
+
+        restartNode(1);
+        restartNode(2);
+
+        // Check that new replica from 'global' zone received the data and 
rebalance really happened.
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 
10_000));
+    }
+
+    @Test
+    void 
testRebalanceTriggersRecoveryWhenUpdatesWereProcessedByAnotherNodesAlready() 
throws Exception {
+        // The nodes from different regions/zones needed to implement the 
predictable way of nodes choice.
+        startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
+        startNode(3);
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, 
REPLICAS=1, DATA_NODES_FILTER='$[?(@.region == \"US\")]'");
+            session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name 
INT) WITH PRIMARY_ZONE='TEST_ZONE'");
+            session.execute(null, "INSERT INTO TEST VALUES (0, 0)");
+        });
+
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 
10_000));
+        assertFalse(containsPartition(cluster.node(2)));
+
+        stopNode(3);
+
+        cluster.doInSession(0, session -> {
+            session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2, 
DATA_NODES_FILTER='$[?(@.zone == \"global\")]'");
+        });
+
+        // Check that new replica from 'global' zone received the data and 
rebalance really happened.
+        assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 
10_000));
+        assertTrue(waitForCondition(
+                (() -> getPartitionPendingClusterNodes(node(0), 
0).equals(Set.of())),
+                10_000));
+
+        TablePartitionId tablePartitionId =
+                new TablePartitionId(
+                        getTableId(node(0).catalogManager(),
+                                "TEST",
+                                new HybridClockImpl().nowLong()),
+                        0
+                );
+        long pendingsKeysRevisionBeforeRecovery = node(0).metaStorageManager()
+                .get(pendingPartAssignmentsKey(tablePartitionId))
+                .get(10, TimeUnit.SECONDS).revision();
+
+
+        startNode(3, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE);
+
+        long pendingsKeysRevisionAfterRecovery = node(0).metaStorageManager()
+                .get(pendingPartAssignmentsKey(tablePartitionId))
+                .get(10, TimeUnit.SECONDS).revision();
+
+        // Check that recovered node doesn't produce new rebalances for 
already processed triggers.
+        assertEquals(pendingsKeysRevisionBeforeRecovery, 
pendingsKeysRevisionAfterRecovery);
+    }
+
+    private static Set<Assignment> getPartitionPendingClusterNodes(IgniteImpl 
node, int partNum) {
+        return Optional.ofNullable(getTableId(node.catalogManager(), "TEST", 
new HybridClockImpl().nowLong()))
+                .map(tableId -> 
partitionPendingAssignments(node.metaStorageManager(), tableId, partNum).join())
+                .orElse(Set.of());
+    }
+
+    private static CompletableFuture<Set<Assignment>> 
partitionPendingAssignments(
+            MetaStorageManager metaStorageManager,
+            int tableId,
+            int partitionNumber
+    ) {
+        return metaStorageManager
+                .get(pendingPartAssignmentsKey(new TablePartitionId(tableId, 
partitionNumber)))
+                .thenApply(e -> (e.value() == null) ? null : 
fromBytes(e.value()));
+    }
+
+    private static boolean containsPartition(Ignite node) {
+        var tableManager = ((TableManager) node.tables());
+
+        MvPartitionStorage storage = tableManager.tableView("TEST")
+                .internalTable()
+                .storage()
+                .getMvPartition(0);
+
+        return storage != null && storage.rowsCount() != 0;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 976e0f2ae1..911aa1ff21 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2140,10 +2140,17 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         int partitionId = tablePartitionId.partitionId();
 
-        return allOf(
-                internalTable.storage().destroyPartition(partitionId),
-                runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
-        );
+        List<CompletableFuture<?>> destroyFutures = new ArrayList<>();
+
+        if (internalTable.storage().getMvPartition(partitionId) != null) {
+            
destroyFutures.add(internalTable.storage().destroyPartition(partitionId));
+        }
+
+        if (internalTable.txStateStorage().getTxStateStorage(partitionId) != 
null) {
+            destroyFutures.add(runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor));
+        }
+
+        return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
     }
 
     private int[] collectTableIndexIds(int tableId, int catalogVersion, 
boolean onNodeRecovery) {

Reply via email to