This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f5de973f4b IGNITE-20209 Recovery for rebalance triggers (#2919) f5de973f4b is described below commit f5de973f4b9a78f3f30a48f04c33353252544e5e Author: Kirill Gusakov <kgusa...@gmail.com> AuthorDate: Mon Jan 22 19:06:28 2024 +0300 IGNITE-20209 Recovery for rebalance triggers (#2919) --- .../distributionzones/DistributionZoneManager.java | 2 +- .../rebalance/DistributionZoneRebalanceEngine.java | 98 +++++++-- .../DistributionZoneRebalanceEngineTest.java | 2 + .../rebalance/ItRebalanceTriggersRecoveryTest.java | 245 +++++++++++++++++++++ .../internal/table/distributed/TableManager.java | 15 +- 5 files changed, 334 insertions(+), 28 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 1347bb02d2..8ca12f33df 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -264,7 +264,7 @@ public class DistributionZoneManager implements IgniteComponent { return allOf( createOrRestoreZonesStates(recoveryRevision), restoreLogicalTopologyChangeEventAndStartTimers(recoveryRevision) - ).thenRun(rebalanceEngine::start); + ).thenCompose((notUsed) -> rebalanceEngine.start()); }); } diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java index 94ea672eac..1a6d958b76 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.distributionzones.rebalance; import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId; @@ -32,6 +33,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; @@ -99,8 +101,8 @@ public class DistributionZoneRebalanceEngine { /** * Starts the rebalance engine by registering corresponding meta storage and configuration listeners. */ - public void start() { - IgniteUtils.inBusyLock(busyLock, () -> { + public CompletableFuture<Void> start() { + return IgniteUtils.inBusyLockAsync(busyLock, () -> { catalogService.listen(ZONE_ALTER, new CatalogAlterZoneEventListener(catalogService) { @Override protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) { @@ -110,9 +112,45 @@ public class DistributionZoneRebalanceEngine { // TODO: IGNITE-18694 - Recovery for the case when zones watch listener processed event but assignments were not updated. metaStorageManager.registerPrefixWatch(zoneDataNodesKey(), dataNodesListener); + + CompletableFuture<Long> recoveryFinishFuture = metaStorageManager.recoveryFinishedFuture(); + + // At the moment of the start of this manager, it is guaranteed that Meta Storage has been recovered. + assert recoveryFinishFuture.isDone(); + + long recoveryRevision = recoveryFinishFuture.join(); + + return rebalanceTriggersRecovery(recoveryRevision); }); } + /** + * Run the update of rebalance metastore's state. + * + * @param recoveryRevision Recovery revision. + */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-21058 At the moment this method produce many metastore multi-invokes + // TODO: which can be avoided by the local logic, which mirror the logic of metastore invokes. + // TODO: And then run the remote invoke, only if needed. + private CompletableFuture<Void> rebalanceTriggersRecovery(long recoveryRevision) { + if (recoveryRevision > 0) { + List<CompletableFuture<Void>> zonesRecoveryFutures = catalogService.zones(catalogService.latestCatalogVersion()) + .stream() + .map(zoneDesc -> + recalculateAssignmentsAndScheduleRebalance( + zoneDesc, + recoveryRevision, + catalogService.latestCatalogVersion() + ) + ) + .collect(Collectors.toUnmodifiableList()); + + return allOf(zonesRecoveryFutures.toArray(new CompletableFuture[0])); + } else { + return completedFuture(null); + } + } + /** * Stops the rebalance engine by unregistering meta storage watches. */ @@ -177,27 +215,41 @@ public class DistributionZoneRebalanceEngine { } private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters parameters) { - return IgniteUtils.inBusyLockAsync(busyLock, () -> { - int zoneId = parameters.zoneDescriptor().id(); - long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); - - return distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId) - .thenCompose(dataNodes -> { - if (dataNodes.isEmpty()) { - return nullCompletedFuture(); - } - - List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneId, parameters.catalogVersion()); - - return triggerPartitionsRebalanceForAllTables( - causalityToken, - parameters.zoneDescriptor(), - dataNodes, - tableDescriptors - ); - }); - }); + return recalculateAssignmentsAndScheduleRebalance( + parameters.zoneDescriptor(), + parameters.causalityToken(), + parameters.catalogVersion() + ); + } + + /** + * Recalculate assignments for table partitions of target zone and schedule rebalance (by update rebalance metastore keys). + * + * @param zoneDescriptor Zone descriptor. + * @param causalityToken Causality token. + * @param catalogVersion Catalog version. + * @return The future, which completes when the all metastore updates done. + */ + private CompletableFuture<Void> recalculateAssignmentsAndScheduleRebalance( + CatalogZoneDescriptor zoneDescriptor, + long causalityToken, + int catalogVersion) { + + return distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneDescriptor.id()) + .thenCompose(dataNodes -> { + if (dataNodes.isEmpty()) { + return nullCompletedFuture(); + } + + List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneDescriptor.id(), catalogVersion); + + return triggerPartitionsRebalanceForAllTables( + causalityToken, + zoneDescriptor, + dataNodes, + tableDescriptors + ); + }); } private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables( diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java index 2b367a0f5e..bb499fd575 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java @@ -148,6 +148,8 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest { return null; }).when(metaStorageManager).registerPrefixWatch(any(), any()); + when(metaStorageManager.recoveryFinishedFuture()).thenReturn(completedFuture(1L)); + AtomicLong raftIndex = new AtomicLong(); keyValueStorage = spy(new SimpleInMemoryKeyValueStorage(nodeName)); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java new file mode 100644 index 0000000000..cfb23c83a8 --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rebalance; + +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey; +import static org.apache.ignite.internal.table.TableTestUtils.getTableId; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.util.ByteUtils.fromBytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.distributed.TableManager; +import org.apache.ignite.internal.test.WatchListenerInhibitor; +import org.junit.jupiter.api.Test; + +/** + * Tests for recovery of the rebalance procedure. + */ +public class ItRebalanceTriggersRecoveryTest extends ClusterPerTestIntegrationTest { + private static final String US_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n" + + " network: {\n" + + " port: {},\n" + + " nodeFinder: {\n" + + " netClusterNodes: [ {} ]\n" + + " }\n" + + " },\n" + + " clientConnector: { port:{} },\n" + + " nodeAttributes: {\n" + + " nodeAttributes: {region: {attribute: \"US\"}, zone: {attribute: \"global\"}}\n" + + " },\n" + + " rest.port: {}\n" + + "}"; + + private static final String GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n" + + " network: {\n" + + " port: {},\n" + + " nodeFinder: {\n" + + " netClusterNodes: [ {} ]\n" + + " }\n" + + " },\n" + + " clientConnector: { port:{} },\n" + + " nodeAttributes: {\n" + + " nodeAttributes: {zone: {attribute: \"global\"}}\n" + + " },\n" + + " rest.port: {}\n" + + "}"; + + @Override + protected int initialNodes() { + return 1; + } + + @Test + void testRebalanceTriggersRecoveryAfterFilterUpdate() throws InterruptedException { + // The nodes from different regions/zones needed to implement the predictable way of nodes choice. + startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE); + startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE); + + cluster.doInSession(0, session -> { + session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=2, DATA_NODES_FILTER='$[?(@.region == \"US\")]'"); + session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'"); + session.execute(null, "INSERT INTO TEST VALUES (0, 0)"); + }); + + assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000)); + assertFalse(containsPartition(cluster.node(2))); + + // By this we guarantee, that there will no any partition data nodes, which will be available to perform the rebalance. + // To run the actual changePeersAsync we need the partition leader, which catch the metastore event about new pending keys. + WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit(); + WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit(); + + cluster.doInSession(0, session -> { + session.execute(null, "ALTER ZONE TEST_ZONE SET DATA_NODES_FILTER='$[?(@.zone == \"global\")]'"); + }); + + // Check that metastore node schedule the rebalance procedure. + assertTrue(waitForCondition( + (() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of( + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(1).name())))), + 10_000)); + + // Remove the pending keys in a barbarian way. So, the rebalance can be triggered only by the recovery logic now. + Integer tableId = getTableId(node(0).catalogManager(), "TEST", new HybridClockImpl().nowLong()); + node(0) + .metaStorageManager() + .remove(pendingPartAssignmentsKey(new TablePartitionId(tableId, 0))).join(); + + restartNode(1); + restartNode(2); + + // Check that new replica from 'global' zone received the data and rebalance really happened. + assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000)); + } + + @Test + void testRebalanceTriggersRecoveryAfterReplicasUpdate() throws InterruptedException { + // The nodes from different regions/zones needed to implement the predictable way of nodes choice. + startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE); + startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE); + + cluster.doInSession(0, session -> { + session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=1, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'"); + session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'"); + session.execute(null, "INSERT INTO TEST VALUES (0, 0)"); + }); + + assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000)); + assertFalse(containsPartition(cluster.node(2))); + + // By this we guarantee, that there will no any partition data nodes, which will be available to perform the rebalance. + // To run the actual changePeersAsync we need the partition leader, which catch the metastore event about new pending keys. + WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(1)).startInhibit(); + WatchListenerInhibitor.metastorageEventsInhibitor(cluster.node(2)).startInhibit(); + + cluster.doInSession(0, session -> { + session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2"); + }); + + // Check that metastore node schedule the rebalance procedure. + assertTrue(waitForCondition( + (() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of( + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(1).name())))), + 10_000)); + + // Remove the pending keys in a barbarian way. So, the rebalance can be triggered only by the recovery logic now. + Integer tableId = getTableId(node(0).catalogManager(), "TEST", new HybridClockImpl().nowLong()); + node(0) + .metaStorageManager() + .remove(pendingPartAssignmentsKey(new TablePartitionId(tableId, 0))).join(); + + restartNode(1); + restartNode(2); + + // Check that new replica from 'global' zone received the data and rebalance really happened. + assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000)); + } + + @Test + void testRebalanceTriggersRecoveryWhenUpdatesWereProcessedByAnotherNodesAlready() throws Exception { + // The nodes from different regions/zones needed to implement the predictable way of nodes choice. + startNode(1, US_NODE_BOOTSTRAP_CFG_TEMPLATE); + startNode(2, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE); + startNode(3); + + cluster.doInSession(0, session -> { + session.execute(null, "CREATE ZONE TEST_ZONE WITH PARTITIONS=1, REPLICAS=1, DATA_NODES_FILTER='$[?(@.region == \"US\")]'"); + session.execute(null, "CREATE TABLE TEST (id INT PRIMARY KEY, name INT) WITH PRIMARY_ZONE='TEST_ZONE'"); + session.execute(null, "INSERT INTO TEST VALUES (0, 0)"); + }); + + assertTrue(waitForCondition(() -> containsPartition(cluster.node(1)), 10_000)); + assertFalse(containsPartition(cluster.node(2))); + + stopNode(3); + + cluster.doInSession(0, session -> { + session.execute(null, "ALTER ZONE TEST_ZONE SET REPLICAS=2, DATA_NODES_FILTER='$[?(@.zone == \"global\")]'"); + }); + + // Check that new replica from 'global' zone received the data and rebalance really happened. + assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)), 10_000)); + assertTrue(waitForCondition( + (() -> getPartitionPendingClusterNodes(node(0), 0).equals(Set.of())), + 10_000)); + + TablePartitionId tablePartitionId = + new TablePartitionId( + getTableId(node(0).catalogManager(), + "TEST", + new HybridClockImpl().nowLong()), + 0 + ); + long pendingsKeysRevisionBeforeRecovery = node(0).metaStorageManager() + .get(pendingPartAssignmentsKey(tablePartitionId)) + .get(10, TimeUnit.SECONDS).revision(); + + + startNode(3, GLOBAL_NODE_BOOTSTRAP_CFG_TEMPLATE); + + long pendingsKeysRevisionAfterRecovery = node(0).metaStorageManager() + .get(pendingPartAssignmentsKey(tablePartitionId)) + .get(10, TimeUnit.SECONDS).revision(); + + // Check that recovered node doesn't produce new rebalances for already processed triggers. + assertEquals(pendingsKeysRevisionBeforeRecovery, pendingsKeysRevisionAfterRecovery); + } + + private static Set<Assignment> getPartitionPendingClusterNodes(IgniteImpl node, int partNum) { + return Optional.ofNullable(getTableId(node.catalogManager(), "TEST", new HybridClockImpl().nowLong())) + .map(tableId -> partitionPendingAssignments(node.metaStorageManager(), tableId, partNum).join()) + .orElse(Set.of()); + } + + private static CompletableFuture<Set<Assignment>> partitionPendingAssignments( + MetaStorageManager metaStorageManager, + int tableId, + int partitionNumber + ) { + return metaStorageManager + .get(pendingPartAssignmentsKey(new TablePartitionId(tableId, partitionNumber))) + .thenApply(e -> (e.value() == null) ? null : fromBytes(e.value())); + } + + private static boolean containsPartition(Ignite node) { + var tableManager = ((TableManager) node.tables()); + + MvPartitionStorage storage = tableManager.tableView("TEST") + .internalTable() + .storage() + .getMvPartition(0); + + return storage != null && storage.rowsCount() != 0; + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 976e0f2ae1..911aa1ff21 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -2140,10 +2140,17 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { int partitionId = tablePartitionId.partitionId(); - return allOf( - internalTable.storage().destroyPartition(partitionId), - runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor) - ); + List<CompletableFuture<?>> destroyFutures = new ArrayList<>(); + + if (internalTable.storage().getMvPartition(partitionId) != null) { + destroyFutures.add(internalTable.storage().destroyPartition(partitionId)); + } + + if (internalTable.txStateStorage().getTxStateStorage(partitionId) != null) { + destroyFutures.add(runAsync(() -> internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)); + } + + return allOf(destroyFutures.toArray(new CompletableFuture[]{})); } private int[] collectTableIndexIds(int tableId, int catalogVersion, boolean onNodeRecovery) {