This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new bcb20bd11e IGNITE-18737 Fixed updating of partition assignments when new assignments equal to table configuration assignments and stable assignments are empty. Fixes #1655 bcb20bd11e is described below commit bcb20bd11ec9bf112b502f3a0fb18a5e696e864a Author: Sergey Uttsel <utt...@gmail.com> AuthorDate: Wed Feb 22 09:31:09 2023 +0200 IGNITE-18737 Fixed updating of partition assignments when new assignments equal to table configuration assignments and stable assignments are empty. Fixes #1655 Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com> --- .../runner/app/ItIgniteNodeRestartTest.java | 47 +- .../internal/table/distributed/TableManager.java | 23 +- .../ignite/internal/utils/RebalanceUtil.java | 74 ++- .../TableManagerDistributionZonesTest.java | 22 + .../utils/RebalanceUtilUpdateAssignmentsTest.java | 541 +++++++++++++++++++++ 5 files changed, 652 insertions(+), 55 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 71e53c799e..340052a38a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -23,7 +23,6 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -128,7 +127,6 @@ import org.junit.jupiter.api.extension.ExtendWith; */ @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = "0") @ExtendWith(ConfigurationExtension.class) -@Disabled("https://issues.apache.org/jira/browse/IGNITE-18737") public class ItIgniteNodeRestartTest extends IgniteAbstractTest { /** Default node port. */ private static final int DEFAULT_NODE_PORT = 3344; @@ -668,46 +666,11 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { assertEquals(newPort, nodePort); } - /** - * Checks that the only one non-default property overwrites after another configuration is passed on the node restart. - */ - @Test - public void twoCustomPropertiesTest() { - String startCfg = "network: {\n" - + " port:3344,\n" - + " nodeFinder: {netClusterNodes:[ \"localhost:3344\" ]}\n" - + "}"; - - IgniteImpl ignite = startNode(0, startCfg); - - assertEquals( - 3344, - ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value() - ); - - assertArrayEquals( - new String[]{"localhost:3344"}, - ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value() - ); - - stopNode(0); - - ignite = startNode(0, "network.nodeFinder.netClusterNodes=[ \"localhost:3344\", \"localhost:3343\" ]"); - - assertEquals( - 3344, - ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value() - ); - - assertArrayEquals( - new String[]{"localhost:3344", "localhost:3343"}, - ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value() - ); - } - /** * Restarts the node which stores some data. */ + @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart, " + + "but the table have not started yet.") @Test public void nodeWithDataTest() throws InterruptedException { IgniteImpl ignite = startNode(0); @@ -724,6 +687,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { /** * Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first. */ + @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart, " + + "but the table have not started yet.") @Test public void testTwoNodesRestartDirect() throws InterruptedException { twoNodesRestart(true); @@ -732,6 +697,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { /** * Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first. */ + @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart, " + + "but the table have not started yet.") @Test public void testTwoNodesRestartReverse() throws InterruptedException { twoNodesRestart(false); @@ -870,6 +837,8 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { /** * Checks that a cluster is able to restart when some changes were made in configuration. */ + @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because indexes are required to apply RAFT commands on restart, " + + "but the table have not started yet.") @Test public void testRestartDiffConfig() throws InterruptedException { List<IgniteImpl> ignites = startNodes(2); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 3a176604b9..22ba3f815e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -632,12 +632,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp CompletableFuture<?>[] futures = new CompletableFuture<?>[partCnt]; + byte[] assignmentsBytes = ((ExtendedTableConfiguration) tblCfg).assignments().value(); + + List<Set<Assignment>> tableAssignments = ByteUtils.fromBytes(assignmentsBytes); + for (int i = 0; i < partCnt; i++) { TablePartitionId replicaGrpId = new TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i); futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId, baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), newReplicas, - replicasCtx.storageRevision(), metaStorageMgr, i); + replicasCtx.storageRevision(), metaStorageMgr, i, tableAssignments.get(i)); } return allOf(futures); @@ -983,14 +987,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return; } + busyLock.block(); + metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener); metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener); metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener); metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener); - busyLock.block(); - Map<UUID, TableImpl> tables = tablesByIdVv.latest(); cleanUpTablesResources(tables); @@ -1863,19 +1867,26 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp if (zoneId == tableZoneId) { TableConfiguration tableCfg = tables.get(tableView.name()); + byte[] assignmentsBytes = ((ExtendedTableConfiguration) tableCfg).assignments().value(); + + List<Set<Assignment>> tableAssignments = ByteUtils.fromBytes(assignmentsBytes); + for (int part = 0; part < tableView.partitions(); part++) { UUID tableId = ((ExtendedTableConfiguration) tableCfg).id().value(); TablePartitionId replicaGrpId = new TablePartitionId(tableId, part); + int replicas = tableView.replicas(); + int partId = part; updatePendingAssignmentsKeys( - tableView.name(), replicaGrpId, dataNodes, tableView.replicas(), - evt.entryEvent().newEntry().revision(), metaStorageMgr, part + tableView.name(), replicaGrpId, dataNodes, replicas, + evt.entryEvent().newEntry().revision(), metaStorageMgr, part, tableAssignments.get(part) ).exceptionally(e -> { LOG.error( - "Exception on updating assignments for [table={}, partition={}]", e, tableView.name(), partId + "Exception on updating assignments for [table={}, partition={}]", e, tableView.name(), + partId ); return null; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java index a3dea5ffa2..8d9d3953dc 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.utils; import static org.apache.ignite.internal.metastorage.dsl.Conditions.and; +import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.or; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; @@ -69,12 +71,22 @@ public class RebalanceUtil { /** Return code of metastore multi-invoke which identifies, * that planned key was removed, because current rebalance is already have the same target. */ - private static final int PLANNED_KEY_REMOVED = 2; + private static final int PLANNED_KEY_REMOVED_EQUALS_PENDING = 2; + + /** Return code of metastore multi-invoke which identifies, + * that planned key was removed, because current assignment is empty. + */ + private static final int PLANNED_KEY_REMOVED_EMPTY_PENDING = 3; + + /** Return code of metastore multi-invoke which identifies, + * that assignments do not need to be updated. + */ + private static final int ASSIGNMENT_NOT_UPDATED = 4; /** Return code of metastore multi-invoke which identifies, * that this trigger event was already processed by another node and must be skipped. */ - private static final int OUTDATED_UPDATE_RECEIVED = 3; + private static final int OUTDATED_UPDATE_RECEIVED = 5; /** * Update keys that related to rebalance algorithm in Meta Storage. Keys are specific for partition. @@ -85,11 +97,13 @@ public class RebalanceUtil { * @param replicas Number of replicas for a table. * @param revision Revision of Meta Storage that is specific for the assignment update. * @param metaStorageMgr Meta Storage manager. + * @param partNum Partition id. + * @param tableCfgPartAssignments Table configuration assignments. * @return Future representing result of updating keys in {@code metaStorageMgr} */ public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys( String tableName, TablePartitionId partId, Collection<String> dataNodes, - int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) { + int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum, Set<Assignment> tableCfgPartAssignments) { ByteArray partChangeTriggerKey = partChangeTriggerKey(partId); ByteArray partAssignmentsPendingKey = pendingPartAssignmentsKey(partId); @@ -100,32 +114,59 @@ public class RebalanceUtil { Set<Assignment> partAssignments = AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas); + boolean isNewAssignments = !tableCfgPartAssignments.equals(partAssignments); + byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments); // if empty(partition.change.trigger.revision) || partition.change.trigger.revision < event.revision: - // if empty(partition.assignments.pending) && partition.assignments.stable != calcPartAssighments(): + // if empty(partition.assignments.pending) + // && ((isNewAssignments && empty(partition.assignments.stable)) + // || (partition.assignments.stable != calcPartAssighments() && !empty(partition.assignments.stable))): // partition.assignments.pending = calcPartAssignments() // partition.change.trigger.revision = event.revision // else: - // if partition.assignments.pending != calcPartAssignments + // if partition.assignments.pending != calcPartAssignments && !empty(partition.assignments.pending) // partition.assignments.planned = calcPartAssignments() // partition.change.trigger.revision = event.revision - // else + // else if partition.assignments.pending == calcPartAssignments + // remove(partition.assignments.planned) + // message after the metastorage invoke: + // "Remove planned key because current pending key has the same value." + // else if empty(partition.assignments.pending) // remove(partition.assignments.planned) + // message after the metastorage invoke: + // "Remove planned key because pending is empty and calculated assignments are equal to current assignments." // else: // skip + + Condition newAssignmentsCondition; + + if (isNewAssignments) { + newAssignmentsCondition = or( + notExists(partAssignmentsStableKey), + and(value(partAssignmentsStableKey).ne(partAssignmentsBytes), exists(partAssignmentsStableKey)) + ); + } else { + newAssignmentsCondition = and(value(partAssignmentsStableKey).ne(partAssignmentsBytes), exists(partAssignmentsStableKey)); + } + var iif = iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))), - iif(and(notExists(partAssignmentsPendingKey), value(partAssignmentsStableKey).ne(partAssignmentsBytes)), + iif(and(notExists(partAssignmentsPendingKey), newAssignmentsCondition), ops( put(partAssignmentsPendingKey, partAssignmentsBytes), put(partChangeTriggerKey, ByteUtils.longToBytes(revision)) ).yield(PENDING_KEY_UPDATED), - iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes), + iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsBytes), exists(partAssignmentsPendingKey)), ops( put(partAssignmentsPlannedKey, partAssignmentsBytes), put(partChangeTriggerKey, ByteUtils.longToBytes(revision)) ).yield(PLANNED_KEY_UPDATED), - ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED))), + iif(value(partAssignmentsPendingKey).eq(partAssignmentsBytes), + ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING), + iif(notExists(partAssignmentsPendingKey), + ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING), + ops().yield(ASSIGNMENT_NOT_UPDATED)) + ))), ops().yield(OUTDATED_UPDATE_RECEIVED)); return metaStorageMgr.invoke(iif).thenAccept(sr -> { @@ -143,11 +184,24 @@ public class RebalanceUtil { partAssignmentsPlannedKey, partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes)); break; - case PLANNED_KEY_REMOVED: + case PLANNED_KEY_REMOVED_EQUALS_PENDING: LOG.info( "Remove planned key because current pending key has the same value [key={}, partition={}, table={}, val={}]", partAssignmentsPlannedKey.toString(), partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes)); + break; + case PLANNED_KEY_REMOVED_EMPTY_PENDING: + LOG.info( + "Remove planned key because pending is empty and calculated assignments are equal to current assignments " + + "[key={}, partition={}, table={}, val={}]", + partAssignmentsPlannedKey.toString(), partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes)); + + break; + case ASSIGNMENT_NOT_UPDATED: + LOG.debug( + "Assignments are not updated [key={}, partition={}, table={}, val={}]", + partAssignmentsPlannedKey.toString(), partNum, tableName, ByteUtils.fromBytes(partAssignmentsBytes)); + break; case OUTDATED_UPDATE_RECEIVED: LOG.debug( diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java index fba4182dce..64e3fed14a 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java @@ -88,6 +88,7 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.MessagingService; import org.apache.ignite.network.TopologyService; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -230,6 +231,15 @@ public class TableManagerDistributionZonesTest extends IgniteAbstractTest { ); } + @AfterEach + public void tearDown() throws Exception { + clusterCfgMgr.stop(); + + keyValueStorage.close(); + + tableManager.stop(); + } + @Test void dataNodesTriggersAssignmentsChanging() { IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 = mockTable(0, 1, 0); @@ -429,6 +439,18 @@ public class TableManagerDistributionZonesTest extends IgniteAbstractTest { when(valueId.value()).thenReturn(new UUID(0, tableNum)); when(tableCfg.id()).thenReturn(valueId); + List<Set<Assignment>> tableAssignments = new ArrayList<>(); + + for (int i = 0; i < partNum; i++) { + tableAssignments.add(Set.of(Assignment.forPeer("fakeAssignment"))); + } + + ConfigurationValue assignmentValue = mock(ConfigurationValue.class); + + when(assignmentValue.value()).thenReturn(toBytes(tableAssignments)); + + when(tableCfg.assignments()).thenReturn(assignmentValue); + return new IgniteBiTuple<>(tableView, tableCfg); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java new file mode 100644 index 0000000000..a9f62a449c --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.utils; + +import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED; +import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; +import static org.apache.ignite.internal.util.ByteUtils.toBytes; +import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey; +import static org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey; +import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; +import org.apache.ignite.internal.affinity.Assignment; +import org.apache.ignite.internal.configuration.ConfigurationManager; +import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; +import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; +import org.apache.ignite.internal.metastorage.dsl.Iif; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener; +import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.service.CommandClosure; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** + * Tests for updating assignment in the meta storage. + */ +@ExtendWith({MockitoExtension.class, ConfigurationExtension.class}) +@MockitoSettings(strictness = Strictness.LENIENT) +public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class); + + private SimpleInMemoryKeyValueStorage keyValueStorage; + + private ConfigurationManager clusterCfgMgr; + + private ClusterService clusterService; + + private MetaStorageManager metaStorageManager; + + private static final int partNum = 2; + private static final int replicas = 2; + + private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i -> "nodes1_" + i).collect(toSet()); + private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i -> "nodes2_" + i).collect(toSet()); + private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i -> "nodes3_" + i).collect(toSet()); + private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i -> "nodes4_" + i).collect(toSet()); + + private static final Set<Assignment> assignments1 = calculateAssignmentForPartition(nodes1, partNum, replicas); + private static final Set<Assignment> assignments2 = calculateAssignmentForPartition(nodes2, partNum, replicas); + private static final Set<Assignment> assignments3 = calculateAssignmentForPartition(nodes3, partNum, replicas); + private static final Set<Assignment> assignments4 = calculateAssignmentForPartition(nodes4, partNum, replicas); + + @BeforeEach + public void setUp() { + clusterCfgMgr = new ConfigurationManager( + List.of(DistributionZonesConfiguration.KEY), + Set.of(), + new TestConfigurationStorage(DISTRIBUTED), + List.of(), + List.of() + ); + + clusterService = mock(ClusterService.class); + + metaStorageManager = mock(MetaStorageManager.class); + + clusterCfgMgr.start(); + + AtomicLong raftIndex = new AtomicLong(); + + keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test")); + + MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage); + + RaftGroupService metaStorageService = mock(RaftGroupService.class); + + // Delegate directly to listener. + lenient().doAnswer( + invocationClose -> { + Command cmd = invocationClose.getArgument(0); + + long commandIndex = raftIndex.incrementAndGet(); + + CompletableFuture<Serializable> res = new CompletableFuture<>(); + + CommandClosure<WriteCommand> clo = new CommandClosure<>() { + /** {@inheritDoc} */ + @Override + public long index() { + return commandIndex; + } + + /** {@inheritDoc} */ + @Override + public WriteCommand command() { + return (WriteCommand) cmd; + } + + /** {@inheritDoc} */ + @Override + public void result(@Nullable Serializable r) { + if (r instanceof Throwable) { + res.completeExceptionally((Throwable) r); + } else { + res.complete(r); + } + } + }; + + try { + metaStorageListener.onWrite(List.of(clo).iterator()); + } catch (Throwable e) { + res.completeExceptionally(new IgniteInternalException(e)); + } + + return res; + } + ).when(metaStorageService).run(any()); + + MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory(); + + lenient().doAnswer(invocationClose -> { + Iif iif = invocationClose.getArgument(0); + + MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(iif).build(); + + return metaStorageService.run(multiInvokeCommand); + }).when(metaStorageManager).invoke(any()); + + when(clusterService.messagingService()).thenAnswer(invocation -> { + MessagingService ret = mock(MessagingService.class); + + return ret; + }); + } + + @AfterEach + public void tearDown() throws Exception { + clusterCfgMgr.stop(); + + keyValueStorage.close(); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=null, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=null, pending=assignments1, planned=null. + */ + @Test + void test1() { + test( + nodes1, assignments2, + null, null, null, + null, assignments1, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=null, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=null, pending=null, planned=null. + */ + @Test + void test2() { + test( + nodes1, assignments1, + null, null, null, + null, null, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=null, pending=assignments3, planned=null. + * Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1. + */ + @Test + void test3() { + test( + nodes1, assignments2, + null, assignments3, null, + null, assignments3, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=null, pending=assignments3, planned=null. + * Expected assignments in the metastorage after updating: stable=null, pending=assignments3, planned=assignments1. + */ + @Test + void test4() { + test( + nodes1, assignments1, + null, assignments3, null, + null, assignments3, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments3, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null. + */ + @Test + void test5() { + test( + nodes1, assignments2, + assignments3, null, null, + assignments3, assignments1, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=assignments3, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments1, planned=null. + */ + @Test + void test6() { + test( + nodes1, assignments1, + assignments3, null, null, + assignments3, assignments1, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments1, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null. + */ + @Test + void test7() { + test( + nodes1, assignments2, + assignments1, null, null, + assignments1, null, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=assignments1, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=null, planned=null. + */ + @Test + void test8() { + test( + nodes1, assignments1, + assignments1, null, null, + assignments1, null, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments2, pending=null, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments1, planned=null. + */ + @Test + void test9() { + test( + nodes1, assignments2, + assignments2, null, null, + assignments2, assignments1, null + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments4, pending=assignments3, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments4, pending=assignments3, planned=assignments1. + */ + @Test + void test10() { + test( + nodes1, assignments2, + assignments4, assignments3, null, + assignments4, assignments3, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=assignments3, pending=assignments2, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments3, pending=assignments2, planned=assignments1. + */ + @Test + void test11() { + test( + nodes1, assignments1, + assignments3, assignments2, null, + assignments3, assignments2, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments1, pending=assignments3, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments3, planned=assignments1. + */ + @Test + void test12() { + test( + nodes1, assignments2, + assignments1, assignments3, null, + assignments1, assignments3, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments2, pending=assignments3, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments2, pending=assignments3, planned=assignments1. + */ + @Test + void test13() { + test( + nodes1, assignments2, + assignments2, assignments3, null, + assignments2, assignments3, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=null. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. + */ + @Test + void test14() { + test( + nodes1, assignments1, + assignments1, assignments2, null, + assignments1, assignments2, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments1. + * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments3. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. + */ + @Test + void test15() { + test( + nodes1, assignments1, + assignments1, assignments2, assignments3, + assignments1, assignments2, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes1. + * The table configuration assignments: assignments4. + * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=assignments1. + */ + @Test + void test16() { + test( + nodes1, assignments4, + assignments1, assignments2, assignments1, + assignments1, assignments2, assignments1 + ); + } + + /** + * Nodes for new assignments calculating: nodes2. + * The table configuration assignments: assignments2. + * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null. + */ + @Test + void test17() { + test( + nodes2, assignments2, + assignments1, assignments2, assignments1, + assignments1, assignments2, null + ); + } + + /** + * Nodes for new assignments calculating: nodes2. + * The table configuration assignments: assignments4. + * Current assignments in the metastorage: stable=assignments1, pending=assignments2, planned=assignments1. + * Expected assignments in the metastorage after updating: stable=assignments1, pending=assignments2, planned=null. + */ + @Test + void test18() { + test( + nodes2, assignments4, + assignments1, assignments2, assignments1, + assignments1, assignments2, null + ); + } + + private void test( + Collection<String> nodesForNewAssignments, + Set<Assignment> tableCfgAssignments, + Set<Assignment> currentStableAssignments, + Set<Assignment> currentPendingAssignments, + Set<Assignment> currentPlannedAssignments, + Set<Assignment> expectedStableAssignments, + Set<Assignment> expectedPendingAssignments, + Set<Assignment> expectedPlannedAssignments + ) { + TablePartitionId tablePartitionId = new TablePartitionId(UUID.randomUUID(), 1); + + if (currentStableAssignments != null) { + keyValueStorage.put(stablePartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentStableAssignments)); + } + + if (currentPendingAssignments != null) { + keyValueStorage.put(pendingPartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentPendingAssignments)); + } + + if (currentPlannedAssignments != null) { + keyValueStorage.put(plannedPartAssignmentsKey(tablePartitionId).bytes(), toBytes(currentPlannedAssignments)); + } + + RebalanceUtil.updatePendingAssignmentsKeys( + "table1", tablePartitionId, nodesForNewAssignments, + replicas, 1, metaStorageManager, partNum, tableCfgAssignments + ); + + byte[] actualStableBytes = keyValueStorage.get(stablePartAssignmentsKey(tablePartitionId).bytes()).value(); + Set<Assignment> actualStableAssignments = null; + + if (actualStableBytes != null) { + actualStableAssignments = ByteUtils.fromBytes(actualStableBytes); + } + + byte[] actualPendingBytes = keyValueStorage.get(pendingPartAssignmentsKey(tablePartitionId).bytes()).value(); + Set<Assignment> actualPendingAssignments = null; + + if (actualPendingBytes != null) { + actualPendingAssignments = ByteUtils.fromBytes(actualPendingBytes); + } + + byte[] actualPlannedBytes = keyValueStorage.get(plannedPartAssignmentsKey(tablePartitionId).bytes()).value(); + Set<Assignment> actualPlannedAssignments = null; + + if (actualPlannedBytes != null) { + actualPlannedAssignments = ByteUtils.fromBytes(actualPlannedBytes); + } + + LOG.info("stableAssignments " + actualStableAssignments); + LOG.info("pendingAssignments " + actualPendingAssignments); + LOG.info("plannedAssignments " + actualPlannedAssignments); + + if (expectedStableAssignments != null) { + assertNotNull(actualStableBytes); + assertEquals(actualStableAssignments, expectedStableAssignments); + } else { + assertNull(actualStableBytes); + } + + if (expectedPendingAssignments != null) { + assertNotNull(actualPendingBytes); + assertEquals(actualPendingAssignments, expectedPendingAssignments); + } else { + assertNull(actualPendingBytes); + } + + if (expectedPlannedAssignments != null) { + assertNotNull(actualPlannedBytes); + assertEquals(actualPlannedAssignments, expectedPlannedAssignments); + } else { + assertNull(actualPlannedBytes); + } + } +}