This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bcb20bd11e IGNITE-18737 Fixed updating of partition assignments when 
new assignments equal to table configuration assignments and stable assignments 
are empty. Fixes #1655
bcb20bd11e is described below

commit bcb20bd11ec9bf112b502f3a0fb18a5e696e864a
Author: Sergey Uttsel <utt...@gmail.com>
AuthorDate: Wed Feb 22 09:31:09 2023 +0200

    IGNITE-18737 Fixed updating of partition assignments when new assignments 
equal to table configuration assignments and stable assignments are empty. 
Fixes #1655
    
    Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com>
---
 .../runner/app/ItIgniteNodeRestartTest.java        |  47 +-
 .../internal/table/distributed/TableManager.java   |  23 +-
 .../ignite/internal/utils/RebalanceUtil.java       |  74 ++-
 .../TableManagerDistributionZonesTest.java         |  22 +
 .../utils/RebalanceUtilUpdateAssignmentsTest.java  | 541 +++++++++++++++++++++
 5 files changed, 652 insertions(+), 55 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 71e53c799e..340052a38a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -128,7 +127,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
 @ExtendWith(ConfigurationExtension.class)
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18737";)
 public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
     /** Default node port. */
     private static final int DEFAULT_NODE_PORT = 3344;
@@ -668,46 +666,11 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
         assertEquals(newPort, nodePort);
     }
 
-    /**
-     * Checks that the only one non-default property overwrites after another 
configuration is passed on the node restart.
-     */
-    @Test
-    public void twoCustomPropertiesTest() {
-        String startCfg = "network: {\n"
-                + "  port:3344,\n"
-                + "  nodeFinder: {netClusterNodes:[ \"localhost:3344\" ]}\n"
-                + "}";
-
-        IgniteImpl ignite = startNode(0, startCfg);
-
-        assertEquals(
-                3344,
-                
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value()
-        );
-
-        assertArrayEquals(
-                new String[]{"localhost:3344"},
-                
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value()
-        );
-
-        stopNode(0);
-
-        ignite = startNode(0, "network.nodeFinder.netClusterNodes=[ 
\"localhost:3344\", \"localhost:3343\" ]");
-
-        assertEquals(
-                3344,
-                
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).port().value()
-        );
-
-        assertArrayEquals(
-                new String[]{"localhost:3344", "localhost:3343"},
-                
ignite.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY).nodeFinder().netClusterNodes().value()
-        );
-    }
-
     /**
      * Restarts the node which stores some data.
      */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
+            + "but the table have not started yet.")
     @Test
     public void nodeWithDataTest() throws InterruptedException {
         IgniteImpl ignite = startNode(0);
@@ -724,6 +687,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Starts two nodes and checks that the data are storing through restarts. 
Nodes restart in the same order when they started at first.
      */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
+            + "but the table have not started yet.")
     @Test
     public void testTwoNodesRestartDirect() throws InterruptedException {
         twoNodesRestart(true);
@@ -732,6 +697,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Starts two nodes and checks that the data are storing through restarts. 
Nodes restart in reverse order when they started at first.
      */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
+            + "but the table have not started yet.")
     @Test
     public void testTwoNodesRestartReverse() throws InterruptedException {
         twoNodesRestart(false);
@@ -870,6 +837,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Checks that a cluster is able to restart when some changes were made in 
configuration.
      */
+    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
+            + "but the table have not started yet.")
     @Test
     public void testRestartDiffConfig() throws InterruptedException {
         List<IgniteImpl> ignites = startNodes(2);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3a176604b9..22ba3f815e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -632,12 +632,16 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[partCnt];
 
+                byte[] assignmentsBytes = ((ExtendedTableConfiguration) 
tblCfg).assignments().value();
+
+                List<Set<Assignment>> tableAssignments = 
ByteUtils.fromBytes(assignmentsBytes);
+
                 for (int i = 0; i < partCnt; i++) {
                     TablePartitionId replicaGrpId = new 
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
 
                     futures[i] = 
updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId,
                             
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()), 
newReplicas,
-                            replicasCtx.storageRevision(), metaStorageMgr, i);
+                            replicasCtx.storageRevision(), metaStorageMgr, i, 
tableAssignments.get(i));
                 }
 
                 return allOf(futures);
@@ -983,14 +987,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             return;
         }
 
+        busyLock.block();
+
         metaStorageMgr.unregisterWatch(distributionZonesDataNodesListener);
 
         metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
 
-        busyLock.block();
-
         Map<UUID, TableImpl> tables = tablesByIdVv.latest();
 
         cleanUpTablesResources(tables);
@@ -1863,19 +1867,26 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         if (zoneId == tableZoneId) {
                             TableConfiguration tableCfg = 
tables.get(tableView.name());
 
+                            byte[] assignmentsBytes = 
((ExtendedTableConfiguration) tableCfg).assignments().value();
+
+                            List<Set<Assignment>> tableAssignments = 
ByteUtils.fromBytes(assignmentsBytes);
+
                             for (int part = 0; part < tableView.partitions(); 
part++) {
                                 UUID tableId = ((ExtendedTableConfiguration) 
tableCfg).id().value();
 
                                 TablePartitionId replicaGrpId = new 
TablePartitionId(tableId, part);
 
+                                int replicas = tableView.replicas();
+
                                 int partId = part;
 
                                 updatePendingAssignmentsKeys(
-                                        tableView.name(), replicaGrpId, 
dataNodes, tableView.replicas(),
-                                        
evt.entryEvent().newEntry().revision(), metaStorageMgr, part
+                                        tableView.name(), replicaGrpId, 
dataNodes, replicas,
+                                        
evt.entryEvent().newEntry().revision(), metaStorageMgr, part, 
tableAssignments.get(part)
                                 ).exceptionally(e -> {
                                     LOG.error(
-                                            "Exception on updating assignments 
for [table={}, partition={}]", e, tableView.name(), partId
+                                            "Exception on updating assignments 
for [table={}, partition={}]", e, tableView.name(),
+                                            partId
                                     );
 
                                     return null;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index a3dea5ffa2..8d9d3953dc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.utils;
 
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -69,12 +71,22 @@ public class RebalanceUtil {
     /** Return code of metastore multi-invoke which identifies,
      * that planned key was removed, because current rebalance is already have 
the same target.
      */
-    private static final int PLANNED_KEY_REMOVED = 2;
+    private static final int PLANNED_KEY_REMOVED_EQUALS_PENDING = 2;
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that planned key was removed, because current assignment is empty.
+     */
+    private static final int PLANNED_KEY_REMOVED_EMPTY_PENDING = 3;
+
+    /** Return code of metastore multi-invoke which identifies,
+     * that assignments do not need to be updated.
+     */
+    private static final int ASSIGNMENT_NOT_UPDATED = 4;
 
     /** Return code of metastore multi-invoke which identifies,
      * that this trigger event was already processed by another node and must 
be skipped.
      */
-    private static final int OUTDATED_UPDATE_RECEIVED = 3;
+    private static final int OUTDATED_UPDATE_RECEIVED = 5;
 
     /**
      * Update keys that related to rebalance algorithm in Meta Storage. Keys 
are specific for partition.
@@ -85,11 +97,13 @@ public class RebalanceUtil {
      * @param replicas Number of replicas for a table.
      * @param revision Revision of Meta Storage that is specific for the 
assignment update.
      * @param metaStorageMgr Meta Storage manager.
+     * @param partNum Partition id.
+     * @param tableCfgPartAssignments Table configuration assignments.
      * @return Future representing result of updating keys in {@code 
metaStorageMgr}
      */
     public static @NotNull CompletableFuture<Void> 
updatePendingAssignmentsKeys(
             String tableName, TablePartitionId partId, Collection<String> 
dataNodes,
-            int replicas, long revision, MetaStorageManager metaStorageMgr, 
int partNum) {
+            int replicas, long revision, MetaStorageManager metaStorageMgr, 
int partNum, Set<Assignment> tableCfgPartAssignments) {
         ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
 
         ByteArray partAssignmentsPendingKey = 
pendingPartAssignmentsKey(partId);
@@ -100,32 +114,59 @@ public class RebalanceUtil {
 
         Set<Assignment> partAssignments = 
AffinityUtils.calculateAssignmentForPartition(dataNodes, partNum, replicas);
 
+        boolean isNewAssignments = 
!tableCfgPartAssignments.equals(partAssignments);
+
         byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
 
         //    if empty(partition.change.trigger.revision) || 
partition.change.trigger.revision < event.revision:
-        //        if empty(partition.assignments.pending) && 
partition.assignments.stable != calcPartAssighments():
+        //        if empty(partition.assignments.pending)
+        //              && ((isNewAssignments && 
empty(partition.assignments.stable))
+        //                  || (partition.assignments.stable != 
calcPartAssighments() && !empty(partition.assignments.stable))):
         //            partition.assignments.pending = calcPartAssignments()
         //            partition.change.trigger.revision = event.revision
         //        else:
-        //            if partition.assignments.pending != calcPartAssignments
+        //            if partition.assignments.pending != calcPartAssignments 
&& !empty(partition.assignments.pending)
         //                partition.assignments.planned = calcPartAssignments()
         //                partition.change.trigger.revision = event.revision
-        //            else
+        //            else if partition.assignments.pending == 
calcPartAssignments
+        //                remove(partition.assignments.planned)
+        //                message after the metastorage invoke:
+        //                "Remove planned key because current pending key has 
the same value."
+        //            else if empty(partition.assignments.pending)
         //                remove(partition.assignments.planned)
+        //                message after the metastorage invoke:
+        //                "Remove planned key because pending is empty and 
calculated assignments are equal to current assignments."
         //    else:
         //        skip
+
+        Condition newAssignmentsCondition;
+
+        if (isNewAssignments) {
+            newAssignmentsCondition = or(
+                    notExists(partAssignmentsStableKey),
+                    
and(value(partAssignmentsStableKey).ne(partAssignmentsBytes), 
exists(partAssignmentsStableKey))
+            );
+        } else {
+            newAssignmentsCondition = 
and(value(partAssignmentsStableKey).ne(partAssignmentsBytes), 
exists(partAssignmentsStableKey));
+        }
+
         var iif = iif(or(notExists(partChangeTriggerKey), 
value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
-                iif(and(notExists(partAssignmentsPendingKey), 
value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
+                iif(and(notExists(partAssignmentsPendingKey), 
newAssignmentsCondition),
                         ops(
                                 put(partAssignmentsPendingKey, 
partAssignmentsBytes),
                                 put(partChangeTriggerKey, 
ByteUtils.longToBytes(revision))
                         ).yield(PENDING_KEY_UPDATED),
-                        
iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
+                        
iif(and(value(partAssignmentsPendingKey).ne(partAssignmentsBytes), 
exists(partAssignmentsPendingKey)),
                                 ops(
                                         put(partAssignmentsPlannedKey, 
partAssignmentsBytes),
                                         put(partChangeTriggerKey, 
ByteUtils.longToBytes(revision))
                                 ).yield(PLANNED_KEY_UPDATED),
-                                
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED))),
+                                
iif(value(partAssignmentsPendingKey).eq(partAssignmentsBytes),
+                                        
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EQUALS_PENDING),
+                                        
iif(notExists(partAssignmentsPendingKey),
+                                                
ops(remove(partAssignmentsPlannedKey)).yield(PLANNED_KEY_REMOVED_EMPTY_PENDING),
+                                                
ops().yield(ASSIGNMENT_NOT_UPDATED))
+                                ))),
                 ops().yield(OUTDATED_UPDATE_RECEIVED));
 
         return metaStorageMgr.invoke(iif).thenAccept(sr -> {
@@ -143,11 +184,24 @@ public class RebalanceUtil {
                             partAssignmentsPlannedKey, partNum, tableName, 
ByteUtils.fromBytes(partAssignmentsBytes));
 
                     break;
-                case PLANNED_KEY_REMOVED:
+                case PLANNED_KEY_REMOVED_EQUALS_PENDING:
                     LOG.info(
                             "Remove planned key because current pending key 
has the same value [key={}, partition={}, table={}, val={}]",
                             partAssignmentsPlannedKey.toString(), partNum, 
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
 
+                    break;
+                case PLANNED_KEY_REMOVED_EMPTY_PENDING:
+                    LOG.info(
+                            "Remove planned key because pending is empty and 
calculated assignments are equal to current assignments "
+                                    + "[key={}, partition={}, table={}, 
val={}]",
+                            partAssignmentsPlannedKey.toString(), partNum, 
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
+                    break;
+                case ASSIGNMENT_NOT_UPDATED:
+                    LOG.debug(
+                            "Assignments are not updated [key={}, 
partition={}, table={}, val={}]",
+                            partAssignmentsPlannedKey.toString(), partNum, 
tableName, ByteUtils.fromBytes(partAssignmentsBytes));
+
                     break;
                 case OUTDATED_UPDATE_RECEIVED:
                     LOG.debug(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index fba4182dce..64e3fed14a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -88,6 +88,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -230,6 +231,15 @@ public class TableManagerDistributionZonesTest extends 
IgniteAbstractTest {
         );
     }
 
+    @AfterEach
+    public void tearDown() throws Exception {
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+
+        tableManager.stop();
+    }
+
     @Test
     void dataNodesTriggersAssignmentsChanging() {
         IgniteBiTuple<TableView, ExtendedTableConfiguration> table0 = 
mockTable(0, 1, 0);
@@ -429,6 +439,18 @@ public class TableManagerDistributionZonesTest extends 
IgniteAbstractTest {
         when(valueId.value()).thenReturn(new UUID(0, tableNum));
         when(tableCfg.id()).thenReturn(valueId);
 
+        List<Set<Assignment>> tableAssignments = new ArrayList<>();
+
+        for (int i = 0; i < partNum; i++) {
+            tableAssignments.add(Set.of(Assignment.forPeer("fakeAssignment")));
+        }
+
+        ConfigurationValue assignmentValue = mock(ConfigurationValue.class);
+
+        when(assignmentValue.value()).thenReturn(toBytes(tableAssignments));
+
+        when(tableCfg.assignments()).thenReturn(assignmentValue);
+
         return new IgniteBiTuple<>(tableView, tableCfg);
 
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
new file mode 100644
index 0000000000..a9f62a449c
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/utils/RebalanceUtilUpdateAssignmentsTest.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.utils;
+
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+/**
+ * Tests for updating assignment in the meta storage.
+ */
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(RebalanceUtilUpdateAssignmentsTest.class);
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterService clusterService;
+
+    private MetaStorageManager metaStorageManager;
+
+    private static final int partNum = 2;
+    private static final int replicas = 2;
+
+    private static final Set<String> nodes1 = IntStream.of(5).mapToObj(i -> 
"nodes1_" + i).collect(toSet());
+    private static final Set<String> nodes2 = IntStream.of(5).mapToObj(i -> 
"nodes2_" + i).collect(toSet());
+    private static final Set<String> nodes3 = IntStream.of(5).mapToObj(i -> 
"nodes3_" + i).collect(toSet());
+    private static final Set<String> nodes4 = IntStream.of(5).mapToObj(i -> 
"nodes4_" + i).collect(toSet());
+
+    private static final Set<Assignment> assignments1 = 
calculateAssignmentForPartition(nodes1, partNum, replicas);
+    private static final Set<Assignment> assignments2 = 
calculateAssignmentForPartition(nodes2, partNum, replicas);
+    private static final Set<Assignment> assignments3 = 
calculateAssignmentForPartition(nodes3, partNum, replicas);
+    private static final Set<Assignment> assignments4 = 
calculateAssignmentForPartition(nodes4, partNum, replicas);
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterService = mock(ClusterService.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        clusterCfgMgr.start();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage("test"));
+
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            Iif iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(iif).build();
+
+            return metaStorageService.run(multiInvokeCommand);
+        }).when(metaStorageManager).invoke(any());
+
+        when(clusterService.messagingService()).thenAnswer(invocation -> {
+            MessagingService ret = mock(MessagingService.class);
+
+            return ret;
+        });
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=null, pending=null, 
planned=null.
+     * Expected assignments in the metastorage after updating: stable=null, 
pending=assignments1, planned=null.
+     */
+    @Test
+    void test1() {
+        test(
+                nodes1, assignments2,
+                null, null, null,
+                null, assignments1, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=null, pending=null, 
planned=null.
+     * Expected assignments in the metastorage after updating: stable=null, 
pending=null, planned=null.
+     */
+    @Test
+    void test2() {
+        test(
+                nodes1, assignments1,
+                null, null, null,
+                null, null, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=null, 
pending=assignments3, planned=null.
+     * Expected assignments in the metastorage after updating: stable=null, 
pending=assignments3, planned=assignments1.
+     */
+    @Test
+    void test3() {
+        test(
+                nodes1, assignments2,
+                null, assignments3, null,
+                null, assignments3, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=null, 
pending=assignments3, planned=null.
+     * Expected assignments in the metastorage after updating: stable=null, 
pending=assignments3, planned=assignments1.
+     */
+    @Test
+    void test4() {
+        test(
+                nodes1, assignments1,
+                null, assignments3, null,
+                null, assignments3, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments3, 
pending=null, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments3, pending=assignments1, planned=null.
+     */
+    @Test
+    void test5() {
+        test(
+                nodes1, assignments2,
+                assignments3, null, null,
+                assignments3, assignments1, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=assignments3, 
pending=null, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments3, pending=assignments1, planned=null.
+     */
+    @Test
+    void test6() {
+        test(
+                nodes1, assignments1,
+                assignments3, null, null,
+                assignments3, assignments1, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=null, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=null, planned=null.
+     */
+    @Test
+    void test7() {
+        test(
+                nodes1, assignments2,
+                assignments1, null, null,
+                assignments1, null, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=null, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=null, planned=null.
+     */
+    @Test
+    void test8() {
+        test(
+                nodes1, assignments1,
+                assignments1, null, null,
+                assignments1, null, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments2, 
pending=null, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments2, pending=assignments1, planned=null.
+     */
+    @Test
+    void test9() {
+        test(
+                nodes1, assignments2,
+                assignments2, null, null,
+                assignments2, assignments1, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments4, 
pending=assignments3, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments4, pending=assignments3, planned=assignments1.
+     */
+    @Test
+    void test10() {
+        test(
+                nodes1, assignments2,
+                assignments4, assignments3, null,
+                assignments4, assignments3, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=assignments3, 
pending=assignments2, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments3, pending=assignments2, planned=assignments1.
+     */
+    @Test
+    void test11() {
+        test(
+                nodes1, assignments1,
+                assignments3, assignments2, null,
+                assignments3, assignments2, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments3, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments3, planned=assignments1.
+     */
+    @Test
+    void test12() {
+        test(
+                nodes1, assignments2,
+                assignments1, assignments3, null,
+                assignments1, assignments3, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments2, 
pending=assignments3, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments2, pending=assignments3, planned=assignments1.
+     */
+    @Test
+    void test13() {
+        test(
+                nodes1, assignments2,
+                assignments2, assignments3, null,
+                assignments2, assignments3, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments2, planned=null.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments2, planned=assignments1.
+     */
+    @Test
+    void test14() {
+        test(
+                nodes1, assignments1,
+                assignments1, assignments2, null,
+                assignments1, assignments2, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments1.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments2, planned=assignments3.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments2, planned=assignments1.
+     */
+    @Test
+    void test15() {
+        test(
+                nodes1, assignments1,
+                assignments1, assignments2, assignments3,
+                assignments1, assignments2, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes1.
+     * The table configuration assignments: assignments4.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments2, planned=assignments1.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments2, planned=assignments1.
+     */
+    @Test
+    void test16() {
+        test(
+                nodes1, assignments4,
+                assignments1, assignments2, assignments1,
+                assignments1, assignments2, assignments1
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes2.
+     * The table configuration assignments: assignments2.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments2, planned=assignments1.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments2, planned=null.
+     */
+    @Test
+    void test17() {
+        test(
+                nodes2, assignments2,
+                assignments1, assignments2, assignments1,
+                assignments1, assignments2, null
+        );
+    }
+
+    /**
+     * Nodes for new assignments calculating: nodes2.
+     * The table configuration assignments: assignments4.
+     * Current assignments in the metastorage: stable=assignments1, 
pending=assignments2, planned=assignments1.
+     * Expected assignments in the metastorage after updating: 
stable=assignments1, pending=assignments2, planned=null.
+     */
+    @Test
+    void test18() {
+        test(
+                nodes2, assignments4,
+                assignments1, assignments2, assignments1,
+                assignments1, assignments2, null
+        );
+    }
+
+    private void test(
+            Collection<String> nodesForNewAssignments,
+            Set<Assignment> tableCfgAssignments,
+            Set<Assignment> currentStableAssignments,
+            Set<Assignment> currentPendingAssignments,
+            Set<Assignment> currentPlannedAssignments,
+            Set<Assignment> expectedStableAssignments,
+            Set<Assignment> expectedPendingAssignments,
+            Set<Assignment> expectedPlannedAssignments
+    ) {
+        TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+        if (currentStableAssignments != null) {
+            
keyValueStorage.put(stablePartAssignmentsKey(tablePartitionId).bytes(), 
toBytes(currentStableAssignments));
+        }
+
+        if (currentPendingAssignments != null) {
+            
keyValueStorage.put(pendingPartAssignmentsKey(tablePartitionId).bytes(), 
toBytes(currentPendingAssignments));
+        }
+
+        if (currentPlannedAssignments != null) {
+            
keyValueStorage.put(plannedPartAssignmentsKey(tablePartitionId).bytes(), 
toBytes(currentPlannedAssignments));
+        }
+
+        RebalanceUtil.updatePendingAssignmentsKeys(
+                "table1", tablePartitionId, nodesForNewAssignments,
+                replicas, 1, metaStorageManager, partNum, tableCfgAssignments
+        );
+
+        byte[] actualStableBytes = 
keyValueStorage.get(stablePartAssignmentsKey(tablePartitionId).bytes()).value();
+        Set<Assignment> actualStableAssignments = null;
+
+        if (actualStableBytes != null) {
+            actualStableAssignments = ByteUtils.fromBytes(actualStableBytes);
+        }
+
+        byte[] actualPendingBytes = 
keyValueStorage.get(pendingPartAssignmentsKey(tablePartitionId).bytes()).value();
+        Set<Assignment> actualPendingAssignments = null;
+
+        if (actualPendingBytes != null) {
+            actualPendingAssignments = ByteUtils.fromBytes(actualPendingBytes);
+        }
+
+        byte[] actualPlannedBytes = 
keyValueStorage.get(plannedPartAssignmentsKey(tablePartitionId).bytes()).value();
+        Set<Assignment> actualPlannedAssignments = null;
+
+        if (actualPlannedBytes != null) {
+            actualPlannedAssignments = ByteUtils.fromBytes(actualPlannedBytes);
+        }
+
+        LOG.info("stableAssignments " + actualStableAssignments);
+        LOG.info("pendingAssignments " + actualPendingAssignments);
+        LOG.info("plannedAssignments " + actualPlannedAssignments);
+
+        if (expectedStableAssignments != null) {
+            assertNotNull(actualStableBytes);
+            assertEquals(actualStableAssignments, expectedStableAssignments);
+        } else {
+            assertNull(actualStableBytes);
+        }
+
+        if (expectedPendingAssignments != null) {
+            assertNotNull(actualPendingBytes);
+            assertEquals(actualPendingAssignments, expectedPendingAssignments);
+        } else {
+            assertNull(actualPendingBytes);
+        }
+
+        if (expectedPlannedAssignments != null) {
+            assertNotNull(actualPlannedBytes);
+            assertEquals(actualPlannedAssignments, expectedPlannedAssignments);
+        } else {
+            assertNull(actualPlannedBytes);
+        }
+    }
+}


Reply via email to