This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 97c531227e IGNITE-19234 Enable and fix group reentry logic for 
volatile storages (#3763)
97c531227e is described below

commit 97c531227ebe74a2211626da5472ec05818152d4
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu May 16 10:52:24 2024 +0400

    IGNITE-19234 Enable and fix group reentry logic for volatile storages 
(#3763)
    
    The reentry logic (see IGNITE-16668) was erroneously disabled. This commit 
enables it back and contains fixes required to make it work (due to changes 
made to the common rebalancing code since it was disabled).
---
 .../ignite/internal/affinity/Assignments.java      |  16 +++
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |   6 +-
 .../PartitionReplicatorNodeRecovery.java           | 113 +++++++++++++++------
 .../internal/table/distributed/TableManager.java   |  17 ++--
 .../{HasDataResponse.java => DataPresence.java}    |  18 ++--
 .../table/distributed/message/HasDataResponse.java |  13 ++-
 .../ignite/internal/utils/RebalanceUtilEx.java     |  19 ++--
 7 files changed, 143 insertions(+), 59 deletions(-)

diff --git 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
index d0498b8047..2f0d7fcbee 100644
--- 
a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
+++ 
b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
@@ -107,6 +107,22 @@ public class Assignments implements Serializable {
         return force;
     }
 
+    /**
+     * Adds an assignment to this collection of assignments.
+     *
+     * @param assignment Assignment to add.
+     */
+    public void add(Assignment assignment) {
+        nodes.add(assignment);
+    }
+
+    /**
+     * Returns {@code true} if this collection has no assignments, {@code 
false} if it has some assignments.
+     */
+    public boolean isEmpty() {
+        return nodes.isEmpty();
+    }
+
     /**
      * Serializes the instance into an array of bytes.
      */
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 70b68a98e6..fadc8e6510 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -65,6 +65,7 @@ import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import 
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
@@ -172,9 +173,8 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
     @ParameterizedTest
     @ValueSource(strings = {
             RocksDbStorageEngine.ENGINE_NAME,
-            PersistentPageMemoryStorageEngine.ENGINE_NAME
-            // TODO: uncomment when 
https://issues.apache.org/jira/browse/IGNITE-19234 is fixed
-//            VolatilePageMemoryStorageEngine.ENGINE_NAME
+            PersistentPageMemoryStorageEngine.ENGINE_NAME,
+            VolatilePageMemoryStorageEngine.ENGINE_NAME
     })
     void leaderFeedsFollowerWithSnapshot(String storageEngine) throws 
Exception {
         testLeaderFeedsFollowerWithSnapshot(storageEngine);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index fc34504bd5..8c22b1030e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -20,17 +20,20 @@ package org.apache.ignite.internal.table.distributed;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.IntFunction;
@@ -43,9 +46,12 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageClosedException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.message.DataPresence;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.utils.RebalanceUtilEx;
@@ -70,6 +76,8 @@ class PartitionReplicatorNodeRecovery {
 
     private final TopologyService topologyService;
 
+    private final Executor storageAccessExecutor;
+
     /** Obtains a TableImpl instance by a table ID. */
     private final IntFunction<TableViewInternal> tableById;
 
@@ -77,11 +85,13 @@ class PartitionReplicatorNodeRecovery {
             MetaStorageManager metaStorageManager,
             MessagingService messagingService,
             TopologyService topologyService,
+            Executor storageAccessExecutor,
             IntFunction<TableViewInternal> tableById
     ) {
         this.metaStorageManager = metaStorageManager;
         this.messagingService = messagingService;
         this.topologyService = topologyService;
+        this.storageAccessExecutor = storageAccessExecutor;
         this.tableById = tableById;
     }
 
@@ -100,39 +110,58 @@ class PartitionReplicatorNodeRecovery {
 
                 HasDataRequest msg = (HasDataRequest) message;
 
-                int tableId = msg.tableId();
-                int partitionId = msg.partitionId();
+                storageAccessExecutor.execute(() -> handleHasDataRequest(msg, 
sender, correlationId));
+            }
+        });
+    }
 
-                boolean storageHasData = false;
+    private void handleHasDataRequest(HasDataRequest msg, ClusterNode sender, 
Long correlationId) {
+        int tableId = msg.tableId();
+        int partitionId = msg.partitionId();
 
-                TableViewInternal table = tableById.apply(tableId);
+        DataPresence dataPresence = DataPresence.UNKNOWN;
 
-                if (table != null) {
-                    MvTableStorage storage = table.internalTable().storage();
+        TableViewInternal table = tableById.apply(tableId);
 
-                    MvPartitionStorage mvPartition = 
storage.getMvPartition(partitionId);
+        if (table != null) {
+            MvTableStorage storage = table.internalTable().storage();
 
-                    // If node's recovery process is incomplete (no partition 
storage), then we consider this node's
-                    // partition storage empty.
-                    if (mvPartition != null) {
-                        storageHasData = 
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
-                    }
-                }
+            MvPartitionStorage mvPartition = 
storage.getMvPartition(partitionId);
 
-                messagingService.respond(sender, 
TABLE_MESSAGES_FACTORY.hasDataResponse().result(storageHasData).build(), 
correlationId);
+            if (mvPartition != null) {
+                try {
+                    dataPresence = 
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null
+                            ? DataPresence.HAS_DATA : DataPresence.EMPTY;
+                } catch (StorageClosedException | StorageRebalanceException 
ignored) {
+                    // Ignoring so we'll return UNKNOWN for storageHasData 
meaning that we have no idea.
+                }
             }
-        });
+        }
+
+        messagingService.respond(
+                sender,
+                
TABLE_MESSAGES_FACTORY.hasDataResponse().presenceString(dataPresence.name()).build(),
+                correlationId
+        );
     }
 
     /**
-     * Returns a future that completes with a decision: should we start the 
corresponding group locally or not.
+     * Initiates group reentry (that is, exits the group and then enters it 
again) if there is a possibility that
+     * this node lost its Raft metastorage state. This trick allows to solve 
the double-voting problem (this node
+     * could vote for one candidate, then do a restart (losing its Raft 
metastorage, including votedFor field), then
+     * vote for another candidate in the same term. As a result of removing 
itself and adding self back, the term
+     * will be incremented, so the possible old vote will be invalidated.
+     *
+     * <p>The possibility of losing the Raft metastorage state is detected by 
checking if the partition storage is
+     * volatile (and hence Raft metastorage is also volatile).
      *
      * @param tablePartitionId ID of the table partition.
      * @param internalTable Table we are working with.
      * @param newConfiguration New configuration that is going to be applied 
if we'll start the group.
      * @param localMemberAssignment Assignment of this node in this group.
+     * @return A future that completes with a decision: should we start the 
corresponding group locally or not.
      */
-    CompletableFuture<Boolean> shouldStartGroup(
+    CompletableFuture<Boolean> initiateGroupReentryIfNeeded(
             TablePartitionId tablePartitionId,
             InternalTable internalTable,
             PeersAndLearners newConfiguration,
@@ -163,15 +192,15 @@ class PartitionReplicatorNodeRecovery {
 
         // No majority and not a full partition restart - need to 'remove, 
then add' nodes
         // with current partition.
-        return waitForPeersAndQueryDataNodesCount(tableId, partId, 
newConfiguration.peers())
-                .thenApply(dataNodesCount -> {
-                    boolean fullPartitionRestart = dataNodesCount == 0;
+        return waitForPeersAndQueryDataNodesCounts(tableId, partId, 
newConfiguration.peers())
+                .thenApply(dataNodesCounts -> {
+                    boolean fullPartitionRestart = dataNodesCounts.emptyNodes 
== newConfiguration.peers().size();
 
                     if (fullPartitionRestart) {
                         return true;
                     }
 
-                    boolean majorityAvailable = dataNodesCount >= 
(newConfiguration.peers().size() / 2) + 1;
+                    boolean majorityAvailable = dataNodesCounts.nonEmptyNodes 
>= (newConfiguration.peers().size() / 2) + 1;
 
                     if (majorityAvailable) {
                         RebalanceUtilEx.startPeerRemoval(tablePartitionId, 
localMemberAssignment, metaStorageManager);
@@ -193,13 +222,13 @@ class PartitionReplicatorNodeRecovery {
      * @param tblId Table id.
      * @param partId Partition id.
      * @param peers Raft peers.
-     * @return A future that will hold the quantity of data nodes.
+     * @return A future that will hold the counts of data nodes.
      */
-    private CompletableFuture<Long> waitForPeersAndQueryDataNodesCount(int 
tblId, int partId, Collection<Peer> peers) {
+    private CompletableFuture<DataNodesCounts> 
waitForPeersAndQueryDataNodesCounts(int tblId, int partId, Collection<Peer> 
peers) {
         HasDataRequest request = 
TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId).build();
 
         return allPeersAreInTopology(peers)
-                .thenCompose(unused -> queryDataNodesCount(peers, request));
+                .thenCompose(unused -> queryDataNodesCounts(peers, request));
     }
 
     private CompletableFuture<?> allPeersAreInTopology(Collection<Peer> peers) 
{
@@ -274,9 +303,9 @@ class PartitionReplicatorNodeRecovery {
                 .thenCompose(identity());
     }
 
-    private CompletableFuture<Long> queryDataNodesCount(Collection<Peer> 
peers, HasDataRequest request) {
+    private CompletableFuture<DataNodesCounts> 
queryDataNodesCounts(Collection<Peer> peers, HasDataRequest request) {
         //noinspection unchecked
-        CompletableFuture<Boolean>[] requestFutures = peers.stream()
+        CompletableFuture<DataPresence>[] presenceFutures = peers.stream()
                 .map(Peer::consistentId)
                 .map(topologyService::getByConsistentId)
                 .filter(Objects::nonNull)
@@ -285,12 +314,36 @@ class PartitionReplicatorNodeRecovery {
                         .thenApply(response -> {
                             assert response instanceof HasDataResponse : 
response;
 
-                            return ((HasDataResponse) response).result();
+                            return ((HasDataResponse) response).presence();
                         })
-                        .exceptionally(unused -> false))
+                        .exceptionally(unused -> DataPresence.UNKNOWN))
                 .toArray(CompletableFuture[]::new);
 
-        return allOf(requestFutures)
-                .thenApply(unused -> 
Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+        return allOf(presenceFutures)
+                .thenApply(unused -> {
+                    List<DataPresence> hasDataFlags = 
Arrays.stream(presenceFutures)
+                            .map(CompletableFuture::join)
+                            .collect(toList());
+
+                    long nodesSurelyHavingData = 
hasDataFlags.stream().filter(presence -> presence == 
DataPresence.HAS_DATA).count();
+                    long nodesSurelyEmpty = 
hasDataFlags.stream().filter(presence -> presence == 
DataPresence.EMPTY).count();
+                    return new DataNodesCounts(nodesSurelyHavingData, 
nodesSurelyEmpty);
+                });
+    }
+
+    /**
+     * It is not guaranteed that {@link #nonEmptyNodes} plus {@link 
#emptyNodes} gives the replicator group size
+     * as for some nodes we don't know at the moment whether they have data or 
not.
+     */
+    private static class DataNodesCounts {
+        /** Number of nodes that reported that they have some data for the 
partition of interest. */
+        private final long nonEmptyNodes;
+        /* Number of nodes that reported that they don't have any data for the 
partition of interest. */
+        private final long emptyNodes;
+
+        private DataNodesCounts(long nonEmptyNodes, long emptyNodes) {
+            this.nonEmptyNodes = nonEmptyNodes;
+            this.emptyNodes = emptyNodes;
+        }
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index bda552db27..5dfb1ad1e4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -287,7 +287,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /**
      * Versioned value for tracking RAFT groups initialization and starting 
completion.
      *
-     * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int)}.
+     * <p>Only explicitly updated in {@link 
#startLocalPartitionsAndClients(CompletableFuture, TableImpl, int, boolean)}.
      *
      * <p>Completed strictly after {@link #localPartitionsVv}.
      */
@@ -385,7 +385,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     /** Versioned value used only at manager startup to correctly fire table 
creation events. */
     private final IncrementalVersionedValue<Void> startVv;
 
-    /** Ends at the {@link #stop()} with an {@link NodeStoppingException}. */
+    /** Ends at the {@link #stopAsync()} with an {@link 
NodeStoppingException}. */
     private final CompletableFuture<Void> stopManagerFuture = new 
CompletableFuture<>();
 
     /** Configuration for {@link StorageUpdateHandler}. */
@@ -570,6 +570,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 metaStorageMgr,
                 messagingService,
                 topologyService,
+                partitionOperationsExecutor,
                 tableId -> tablesById().get(tableId)
         );
 
@@ -830,12 +831,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param assignmentsFuture Table assignments.
      * @param table Initialized table entity.
      * @param zoneId Zone id.
+     * @param isRecovery {@code true} if the node is being started up.
      * @return future, which will be completed when the partitions creations 
done.
      */
     private CompletableFuture<Void> startLocalPartitionsAndClients(
             CompletableFuture<List<Assignments>> assignmentsFuture,
             TableImpl table,
-            int zoneId
+            int zoneId,
+            boolean isRecovery
     ) {
         int tableId = table.tableId();
 
@@ -858,7 +861,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                 assignments.get(partId),
                                 null,
                                 zoneId,
-                                false
+                                isRecovery
                         )
                         .whenComplete((res, ex) -> {
                             if (ex != null) {
@@ -934,7 +937,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         if (localMemberAssignment != null) {
             CompletableFuture<Boolean> shouldStartGroupFut = isRecovery
-                    ? partitionReplicatorNodeRecovery.shouldStartGroup(
+                    ? 
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
                             replicaGrpId,
                             internalTbl,
                             newConfiguration,
@@ -1392,7 +1395,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                             registerIndexesToTable(table, catalogService, 
partitionSet, schemaRegistry, lwm);
                         }
-                        return 
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id());
+                        return 
startLocalPartitionsAndClients(assignmentsFuture, table, zoneDescriptor.id(), 
onNodeRecovery);
                     }
             ), ioExecutor);
         });
@@ -2528,7 +2531,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     /**
      * Returns the future that will complete when, either the future from the 
argument or {@link #stopManagerFuture} will complete,
-     * successfully or exceptionally. Allows to protect from getting stuck at 
{@link #stop()} when someone is blocked (by using
+     * successfully or exceptionally. Allows to protect from getting stuck at 
{@link #stopAsync()} when someone is blocked (by using
      * {@link #busyLock}) for a long time.
      *
      * @param future Future.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
similarity index 65%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
index 66f7b0f6d7..67fc4cbdbc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/DataPresence.java
@@ -17,15 +17,15 @@
 
 package org.apache.ignite.internal.table.distributed.message;
 
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.table.distributed.TableMessageGroup;
-
 /**
- * A response to the {@link HasDataRequest}.
+ * Whether a node has data or not (or it's not known because it did not 
respond in time, or the corresopnding storage is
+ * already closed or still being rebalanced to).
  */
-@Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
-public interface HasDataResponse extends NetworkMessage {
-    /** {@code true} if a node has data for a partition of a table, {@code 
false} otherwise. */
-    boolean result();
+public enum DataPresence {
+    /** The storage is empty. */
+    EMPTY,
+    /** The storage has some data. */
+    HAS_DATA,
+    /** We don't know for some reason. */
+    UNKNOWN
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
index 66f7b0f6d7..69d7bd994e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/message/HasDataResponse.java
@@ -26,6 +26,15 @@ import 
org.apache.ignite.internal.table.distributed.TableMessageGroup;
  */
 @Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
 public interface HasDataResponse extends NetworkMessage {
-    /** {@code true} if a node has data for a partition of a table, {@code 
false} otherwise. */
-    boolean result();
+    /**
+     * Data presence indicator.
+     */
+    default DataPresence presence() {
+        return DataPresence.valueOf(presenceString());
+    }
+
+    /**
+     * String representation of {@link #presence()}.
+     */
+    String presenceString();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
index 451e98c4c3..7a2a4da589 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtilEx.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -73,23 +74,23 @@ public class RebalanceUtilEx {
                     byte[] prevValue = 
retrievedAssignmentsSwitchReduce.value();
 
                     if (prevValue != null) {
-                        Set<Assignment> prev = ByteUtils.fromBytes(prevValue);
+                        Assignments prev = Assignments.fromBytes(prevValue);
 
                         prev.add(peerAssignment);
 
                         return metaStorageMgr.invoke(
                                 
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
-                                put(key, ByteUtils.toBytes(prev)),
+                                put(key, prev.toBytes()),
                                 Operations.noop()
                         );
                     } else {
-                        var newValue = new HashSet<>();
+                        var newValue = Assignments.of(new HashSet<>());
 
                         newValue.add(peerAssignment);
 
                         return metaStorageMgr.invoke(
                                 notExists(key),
-                                put(key, ByteUtils.toBytes(newValue)),
+                                put(key, newValue.toBytes()),
                                 Operations.noop()
                         );
                     }
@@ -118,7 +119,9 @@ public class RebalanceUtilEx {
         Entry entry = event.entryEvent().newEntry();
         byte[] eventData = entry.value();
 
-        Set<Assignment> switchReduce = ByteUtils.fromBytes(eventData);
+        assert eventData != null : "Null event data for " + partId;
+
+        Assignments switchReduce = Assignments.fromBytes(eventData);
 
         if (switchReduce.isEmpty()) {
             return nullCompletedFuture();
@@ -128,10 +131,10 @@ public class RebalanceUtilEx {
 
         ByteArray pendingKey = pendingPartAssignmentsKey(partId);
 
-        Set<Assignment> pendingAssignments = difference(assignments, 
switchReduce);
+        Set<Assignment> pendingAssignments = difference(assignments, 
switchReduce.nodes());
 
-        byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
-        byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+        byte[] pendingByteArray = Assignments.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = Assignments.toBytes(assignments);
 
         ByteArray changeTriggerKey = pendingChangeTriggerKey(partId);
         byte[] rev = ByteUtils.longToBytes(entry.revision());

Reply via email to