This is an automated email from the ASF dual-hosted git repository.

ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d3c739b9e6c IGNITE-28013 Fix stale leaseholder ID handling in 
LeaseUpdater and lease batch serialization (#7769)
d3c739b9e6c is described below

commit d3c739b9e6c71d2e859b94cee1fa94331ade242a
Author: Anton Laletin <[email protected]>
AuthorDate: Wed Apr 8 17:13:01 2026 +0400

    IGNITE-28013 Fix stale leaseholder ID handling in LeaseUpdater and lease 
batch serialization (#7769)
---
 .../internal/placementdriver/LeaseUpdater.java     |   1 -
 .../leases/LeaseBatchSerializer.java               |  33 ++-
 .../placementdriver/leases/NodesDictionary.java    |   4 +
 .../internal/placementdriver/LeaseUpdaterTest.java | 246 +++++++++++++++++++--
 .../leases/LeaseBatchSerializerTest.java           | 118 ++++++++++
 ...ilablePartitionsRecoveryByFilterUpdateTest.java |   2 -
 6 files changed, 381 insertions(+), 23 deletions(-)

diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index b5cbf650144..6816eed83f5 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -486,7 +486,6 @@ public class LeaseUpdater {
                         : lease.proposedCandidate();
 
                 InternalClusterNode candidate = 
nextLeaseHolder(stableAssignments, pendingAssignments, grpId, 
proposedLeaseholder);
-
                 boolean canBeProlonged = lease.isAccepted()
                         && lease.isProlongable()
                         && candidate != null && 
candidate.id().equals(lease.getLeaseholderId());
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
index 6359bd2b527..11c82e47946 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
@@ -141,6 +141,15 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
     /** Mask to extract lease holder index from compact representation. */
     private static final int COMPACT_HOLDER_INDEX_MASK = (1 << 
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
 
+    private static final byte PROTOCOL_V1 = 1;
+
+    private static final byte PROTOCOL_V2 = 2;
+
+    @Override
+    protected byte getProtocolVersion() {
+        return PROTOCOL_V2;
+    }
+
     @Override
     protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out) 
throws IOException {
         long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch);
@@ -356,7 +365,9 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
     private static boolean 
holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) {
         // Up to 8 names means that for name index it's enough to have 3 bits, 
same for node index, so, in sum, they
         // require up to 6 bits, and we have 7 bits in a varint byte.
-        return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+        // We need to check both: name count (for proposed candidate index) 
and node count (for holder node index),
+        // as these can diverge when nodes restart with new UUIDs but the same 
name.
+        return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE && 
dictionary.nodeCount() <= MAX_NODES_FOR_COMPACT_MODE;
     }
 
     private static int flags(
@@ -378,6 +389,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
         long minExpirationTimePhysical = in.readVarInt();
         HybridTimestamp commonExpirationTime = new 
HybridTimestamp(minExpirationTimePhysical + in.readVarInt(), 
in.readVarIntAsInt());
         NodesDictionary nodesDictionary = NodesDictionary.readFrom(in);
+        boolean canReadNodesInfoCompactly = 
holderIdAndProposedCandidateFitIn1ByteForRead(protoVer, nodesDictionary);
 
         List<Lease> leases = new ArrayList<>();
 
@@ -385,6 +397,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
                 minExpirationTimePhysical,
                 commonExpirationTime,
                 nodesDictionary,
+                canReadNodesInfoCompactly,
                 leases,
                 in,
                 TablePartitionId::new
@@ -395,6 +408,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
                     minExpirationTimePhysical,
                     commonExpirationTime,
                     nodesDictionary,
+                    canReadNodesInfoCompactly,
                     leases,
                     in,
                     ZonePartitionId::new
@@ -408,6 +422,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
             long minExpirationTimePhysical,
             HybridTimestamp commonExpirationTime,
             NodesDictionary nodesDictionary,
+            boolean canReadNodesInfoCompactly,
             List<Lease> leases,
             IgniteDataInput in,
             GroupIdFactory groupIdFactory
@@ -420,6 +435,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
                     minExpirationTimePhysical,
                     commonExpirationTime,
                     nodesDictionary,
+                    canReadNodesInfoCompactly,
                     leases,
                     in,
                     groupIdFactory,
@@ -432,6 +448,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
             long minExpirationTimePhysical,
             HybridTimestamp commonExpirationTime,
             NodesDictionary nodesDictionary,
+            boolean canReadNodesInfoCompactly,
             List<Lease> leases,
             IgniteDataInput in,
             GroupIdFactory groupIdFactory,
@@ -447,6 +464,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
                     minExpirationTimePhysical,
                     commonExpirationTime,
                     nodesDictionary,
+                    canReadNodesInfoCompactly,
                     in,
                     groupIdFactory
             );
@@ -464,6 +482,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
             long minExpirationTimePhysical,
             HybridTimestamp commonExpirationTime,
             NodesDictionary nodesDictionary,
+            boolean canReadNodesInfoCompactly,
             IgniteDataInput in,
             GroupIdFactory groupIdFactory
     ) throws IOException {
@@ -477,7 +496,7 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
 
         int holderNodeIndex;
         int proposedCandidateNodeIndex = -1;
-        if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+        if (canReadNodesInfoCompactly) {
             int nodesInfo = in.readVarIntAsInt();
 
             holderNodeIndex = unpackHolderNodeIndex(nodesInfo);
@@ -538,6 +557,16 @@ public class LeaseBatchSerializer extends 
VersionedSerializer<LeaseBatch> {
         return (flags & mask) != 0;
     }
 
+    private static boolean holderIdAndProposedCandidateFitIn1ByteForRead(byte 
protoVer, NodesDictionary dictionary) {
+        if (protoVer == PROTOCOL_V1) {
+            // In V1 format, we assumed that name and node tables have the 
same size,
+            // so compact-mode eligibility was determined only by the name 
table size.
+            return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+        }
+
+        return holderIdAndProposedCandidateFitIn1Byte(dictionary);
+    }
+
     @FunctionalInterface
     private interface GroupIdFactory {
         PartitionGroupId create(int objectId, int partitionId);
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
index 967a0a4fa27..f1913bf8c2c 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
@@ -132,6 +132,10 @@ final class NodesDictionary {
         return nameIndexToName.size();
     }
 
+    int nodeCount() {
+        return nodeIndexToId.size();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index e23632ddef1..58181434cbe 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -38,9 +38,12 @@ import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -119,6 +123,9 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
     @Mock
     MetaStorageManager metaStorageManager;
 
+    @Mock
+    private LeaseTracker leaseTracker;
+
     /** Lease updater for tests. */
     private LeaseUpdater leaseUpdater;
 
@@ -127,6 +134,9 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
     /** Closure to get a lease that is passed in Meta storage. */
     private volatile Consumer<Lease> renewLeaseConsumer = null;
 
+    /** Closure to get a lease batch that is passed in Meta storage. */
+    private volatile Consumer<LeaseBatch> renewLeaseBatchConsumer = null;
+
     private static ZonePartitionId replicationGroupId(int objectId, int 
partId) {
         return new ZonePartitionId(objectId, partId);
     }
@@ -142,13 +152,12 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     void setUp(
             @Mock ClusterService clusterService,
-            @Mock LeaseTracker leaseTracker,
             @Mock MessagingService messagingService
     ) {
         mockStableAssignments(Set.of(Assignment.forPeer(stableNode.name())));
         mockPendingAssignments(Set.of(Assignment.forPeer(pendingNode.name())));
 
-        when(messagingService.invoke(anyString(), any(), anyLong()))
+        lenient().when(messagingService.invoke(anyString(), any(), anyLong()))
                 .then(i -> 
completedFuture(PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(true).build()));
 
         when(clusterService.messagingService()).thenReturn(messagingService);
@@ -163,6 +172,7 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
         lenient().when(metaStorageManager.invoke(any(Condition.class), 
any(Operation.class), any(Operation.class)))
                 .thenAnswer(invocation -> {
                     Consumer<Lease> leaseConsumer = renewLeaseConsumer;
+                    Consumer<LeaseBatch> leaseBatchConsumer = 
renewLeaseBatchConsumer;
 
                     if (leaseConsumer != null) {
                         OperationImpl op = invocation.getArgument(1);
@@ -173,6 +183,14 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
                         leaseConsumer.accept(lease);
                     }
 
+                    if (leaseBatchConsumer != null) {
+                        OperationImpl op = invocation.getArgument(1);
+
+                        LeaseBatch batch = 
LeaseBatch.fromBytes(toByteArray(op.value()));
+
+                        leaseBatchConsumer.accept(batch);
+                    }
+
                     return trueCompletedFuture();
                 });
 
@@ -338,6 +356,171 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
         assertEquals(lease.getLeaseholder(), renewedLease.getLeaseholder());
     }
 
+    @Test
+    public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch() 
throws Exception {
+        List<LogicalNode> currentTopologyNodes = IntStream.range(0, 8)
+                .mapToObj(i -> new LogicalNode(
+                        randomUUID(),
+                        "node-" + i,
+                        NetworkAddress.from("127.0.0.1:" + (11_000 + i))
+                ))
+                .collect(Collectors.toList());
+
+        
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new 
LogicalTopologySnapshot(1, currentTopologyNodes)));
+
+        Map<ZonePartitionId, Set<Assignment>> stableAssignmentsByGroup = new 
LinkedHashMap<>();
+        Map<ReplicationGroupId, Lease> staleLeasesByGroup = new 
LinkedHashMap<>();
+
+        List<LogicalNode> leaseholdersByGroup = List.of(0, 0, 1, 2, 3, 4, 5, 
6, 7).stream()
+                .map(currentTopologyNodes::get)
+                .collect(Collectors.toList());
+
+        long now = System.currentTimeMillis();
+        UUID staleNode0Id = randomUUID();
+
+        for (int i = 0; i < leaseholdersByGroup.size(); i++) {
+            ZonePartitionId grpId = replicationGroupId(1, i);
+            LogicalNode logicalNode = leaseholdersByGroup.get(i);
+
+            stableAssignmentsByGroup.put(grpId, 
Set.of(Assignment.forPeer(logicalNode.name())));
+
+            boolean expiredLease = i == 0;
+            UUID leaseholderId = i == 1
+                    ? staleNode0Id
+                    : logicalNode.id();
+            HybridTimestamp expirationTime = new HybridTimestamp(expiredLease 
? now - 1_000 : now + 60_000, 0);
+
+            staleLeasesByGroup.put(
+                    grpId,
+                    new Lease(
+                            logicalNode.name(),
+                            leaseholderId,
+                            new HybridTimestamp(now - 10_000, 0),
+                            expirationTime,
+                            true,
+                            true,
+                            null,
+                            grpId
+                    )
+            );
+        }
+
+        mockStableAssignments(stableAssignmentsByGroup);
+        mockPendingAssignments(Map.of());
+
+        Leases currentLeases = new Leases(staleLeasesByGroup, 
BYTE_EMPTY_ARRAY);
+
+        lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+        
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
 ->
+                
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), 
Lease.emptyLease(invocation.getArgument(0))));
+
+        initAndActivateLeaseUpdater();
+
+        LeaseBatch renewedBatch = awaitForLeaseBatch(5_000);
+
+        assertEquals(leaseholdersByGroup.size(), renewedBatch.leases().size());
+        assertEquals(8, 
renewedBatch.leases().stream().map(Lease::getLeaseholder).distinct().count());
+        assertEquals(9, 
renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count());
+
+        Map<ReplicationGroupId, Lease> renewedLeaseByGroup = 
renewedBatch.leases().stream()
+                .collect(Collectors.toMap(Lease::replicationGroupId, lease -> 
lease));
+
+        Lease expiredLeaseWithCurrentId = 
renewedLeaseByGroup.get(replicationGroupId(1, 0));
+        assertEquals(leaseholdersByGroup.get(0).id(), 
expiredLeaseWithCurrentId.getLeaseholderId());
+
+        Lease nonExpiredLeaseWithStaleId = 
renewedLeaseByGroup.get(replicationGroupId(1, 1));
+        assertEquals(staleNode0Id, 
nonExpiredLeaseWithStaleId.getLeaseholderId());
+    }
+
+    @Test
+    public void testNonExpiredAcceptedLeasesKeepLeaseholderIdentity() throws 
Exception {
+        List<LogicalNode> currentTopologyNodes = IntStream.range(0, 3)
+                .mapToObj(i -> new LogicalNode(
+                        randomUUID(),
+                        "node-" + i,
+                        NetworkAddress.from("127.0.0.1:" + (11_000 + i))
+                ))
+                .collect(Collectors.toList());
+
+        
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new 
LogicalTopologySnapshot(1, currentTopologyNodes)));
+
+        long now = System.currentTimeMillis();
+        UUID staleNode1Id = randomUUID();
+
+        ZonePartitionId expiredGrpId = replicationGroupId(1, 0);
+        ZonePartitionId nonExpiredStaleIdGrpId = replicationGroupId(1, 1);
+        ZonePartitionId nonExpiredCurrentIdGrpId = replicationGroupId(1, 2);
+
+        Map<ZonePartitionId, Set<Assignment>> stableAssignmentsByGroup = 
Map.of(
+                expiredGrpId, 
Set.of(Assignment.forPeer(currentTopologyNodes.get(0).name())),
+                nonExpiredStaleIdGrpId, 
Set.of(Assignment.forPeer(currentTopologyNodes.get(1).name())),
+                nonExpiredCurrentIdGrpId, 
Set.of(Assignment.forPeer(currentTopologyNodes.get(2).name()))
+        );
+
+        mockStableAssignments(stableAssignmentsByGroup);
+        mockPendingAssignments(Map.of());
+
+        Lease expiredLease = new Lease(
+                currentTopologyNodes.get(0).name(),
+                currentTopologyNodes.get(0).id(),
+                new HybridTimestamp(now - 10_000, 0),
+                new HybridTimestamp(now - 1_000, 0),
+                true,
+                true,
+                null,
+                expiredGrpId
+        );
+
+        Lease nonExpiredLeaseWithStaleId = new Lease(
+                currentTopologyNodes.get(1).name(),
+                staleNode1Id,
+                new HybridTimestamp(now - 10_000, 0),
+                new HybridTimestamp(now + 60_000, 0),
+                true,
+                true,
+                null,
+                nonExpiredStaleIdGrpId
+        );
+
+        Lease nonExpiredLeaseWithCurrentId = new Lease(
+                currentTopologyNodes.get(2).name(),
+                currentTopologyNodes.get(2).id(),
+                new HybridTimestamp(now - 10_000, 0),
+                new HybridTimestamp(now + 60_000, 0),
+                true,
+                true,
+                null,
+                nonExpiredCurrentIdGrpId
+        );
+
+        Map<ReplicationGroupId, Lease> leasesByGroup = Map.of(
+                expiredGrpId, expiredLease,
+                nonExpiredStaleIdGrpId, nonExpiredLeaseWithStaleId,
+                nonExpiredCurrentIdGrpId, nonExpiredLeaseWithCurrentId
+        );
+
+        Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY);
+
+        lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+        
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
 ->
+                
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0), 
Lease.emptyLease(invocation.getArgument(0))));
+
+        initAndActivateLeaseUpdater();
+
+        LeaseBatch renewedBatch = awaitForLeaseBatch(5_000);
+
+        Map<ReplicationGroupId, Lease> renewedLeaseByGroup = 
renewedBatch.leases().stream()
+                .collect(Collectors.toMap(Lease::replicationGroupId, lease -> 
lease));
+
+        Lease renewedNonExpiredWithStaleId = 
renewedLeaseByGroup.get(nonExpiredStaleIdGrpId);
+        assertEquals(nonExpiredLeaseWithStaleId.getLeaseholder(), 
renewedNonExpiredWithStaleId.getLeaseholder());
+        assertEquals(nonExpiredLeaseWithStaleId.getLeaseholderId(), 
renewedNonExpiredWithStaleId.getLeaseholderId());
+
+        Lease renewedNonExpiredWithCurrentId = 
renewedLeaseByGroup.get(nonExpiredCurrentIdGrpId);
+        assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholder(), 
renewedNonExpiredWithCurrentId.getLeaseholder());
+        assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholderId(), 
renewedNonExpiredWithCurrentId.getLeaseholderId());
+    }
+
     @Test
     public void testLeaseAmongPendings() throws Exception {
         
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new 
LogicalTopologySnapshot(1, List.of(pendingNode))));
@@ -382,29 +565,43 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
     }
 
     private void mockPendingAssignments(Set<Assignment> assignments) {
-        Entry pendingEntry = new EntryImpl(
-                pendingAssignmentsQueueKey(replicationGroupId(1, 0)).bytes(),
-                
AssignmentsQueue.toBytes(Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
assignments.toArray(Assignment[]::new))),
-                1,
-                new HybridClockImpl().now()
-        );
+        mockPendingAssignments(Map.of(replicationGroupId(1, 0), assignments));
+    }
+
+    private void mockPendingAssignments(Map<ZonePartitionId, Set<Assignment>> 
assignmentsByGroup) {
+        List<Entry> pendingEntries = assignmentsByGroup.entrySet().stream()
+                .map(entry -> new EntryImpl(
+                        pendingAssignmentsQueueKey(entry.getKey()).bytes(),
+                        AssignmentsQueue.toBytes(
+                                
Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
entry.getValue().toArray(Assignment[]::new))
+                        ),
+                        1,
+                        new HybridClockImpl().now()
+                ))
+                .collect(Collectors.toList());
 
         byte[] prefixBytes = 
ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES;
         when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), 
anyLong()))
-                .thenReturn(Cursor.fromIterable(List.of(pendingEntry)));
+                .thenReturn(Cursor.fromIterable(pendingEntries));
     }
 
     private void mockStableAssignments(Set<Assignment> assignments) {
-        Entry stableEntry = new EntryImpl(
-                stableAssignmentsKey(replicationGroupId(1, 0)).bytes(),
-                Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
assignments.toArray(Assignment[]::new)).toBytes(),
-                1,
-                new HybridClockImpl().now()
-        );
+        mockStableAssignments(Map.of(replicationGroupId(1, 0), assignments));
+    }
+
+    private void mockStableAssignments(Map<ZonePartitionId, Set<Assignment>> 
assignmentsByGroup) {
+        List<Entry> stableEntries = assignmentsByGroup.entrySet().stream()
+                .map(entry -> new EntryImpl(
+                        stableAssignmentsKey(entry.getKey()).bytes(),
+                        Assignments.of(HybridTimestamp.MIN_VALUE.longValue(), 
entry.getValue().toArray(Assignment[]::new)).toBytes(),
+                        1,
+                        new HybridClockImpl().now()
+                ))
+                .collect(Collectors.toList());
 
         byte[] prefixBytes = ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
         when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)), 
anyLong()))
-                .thenReturn(Cursor.fromIterable(List.of(stableEntry)));
+                .thenReturn(Cursor.fromIterable(stableEntries));
     }
 
     private void initAndActivateLeaseUpdater() {
@@ -453,8 +650,8 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
      * Waits for lease write to Meta storage.
      *
      * @param needAccepted Whether to wait only for accepted lease.
-     * @param previousLease Previous lease. If not null, then wait for any 
lease having expiration time other than the previous has (i.e.
-     *      either another lease or prolonged lease).
+     * @param previousLease Previous lease. If not null, then wait for any 
lease having expiration time other than the previous has
+     *         (i.e. either another lease or prolonged lease).
      * @param timeoutMillis Timeout in milliseconds to wait for lease.
      * @return A lease.
      * @throws InterruptedException if the wait is interrupted.
@@ -481,6 +678,19 @@ public class LeaseUpdaterTest extends 
BaseIgniteAbstractTest {
         return renewedLease.get();
     }
 
+    private LeaseBatch awaitForLeaseBatch(long timeoutMillis) throws 
InterruptedException {
+        AtomicReference<LeaseBatch> renewedBatch = new AtomicReference<>();
+
+        renewLeaseBatchConsumer = batch -> {
+            renewedBatch.set(batch);
+            renewLeaseBatchConsumer = null;
+        };
+
+        assertTrue(IgniteTestUtils.waitForCondition(() -> renewedBatch.get() 
!= null, timeoutMillis));
+
+        return renewedBatch.get();
+    }
+
     /**
      * Gets a lease updater tread.
      *
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
index e359c152fc3..a6ee8fe24e0 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
@@ -25,9 +25,11 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
+import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.Month;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.UUID;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
 import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -202,6 +205,22 @@ class LeaseBatchSerializerTest {
         verifySerializationAndDeserializationGivesSameResult(originalBatch);
     }
 
+    @Test
+    void batchWithExactly8NodeNamesAndMoreThan8NodeIds() {
+        List<Lease> originalLeases = IntStream.range(0, 8)
+                .mapToObj(n -> {
+                    String nodeName = "node" + n;
+                    return tableLease(nodeName, randomUUID(), nodeName, n);
+                })
+                .collect(toList());
+
+        originalLeases.add(tableLease("node0", randomUUID(), "node0", 8));
+
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+        verifySerializationAndDeserializationGivesSameResult(originalBatch);
+    }
+
     @Test
     void batchWithMoreThan8NodeNames() {
         List<Lease> originalLeases = IntStream.range(0, 9)
@@ -300,6 +319,105 @@ class LeaseBatchSerializerTest {
         );
     }
 
+    @Test
+    void v2CanBeDeserialized() {
+        LeaseBatch originalBatch = new LeaseBatch(createLeases(baseTs(), 
TablePartitionId::new));
+
+        byte[] bytes = VersionedSerialization.toBytes(originalBatch, 
serializer);
+
+        // VersionedSerializer header stores protocol version in the first 
byte.
+        assertEquals(2, bytes[0] & 0xFF);
+
+        LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredBatch.leases(), 
containsInAnyOrder(originalBatch.leases().toArray()));
+        assertEquals(
+                
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+                
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+        );
+    }
+
+    @Test
+    void 
v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() {
+        List<Lease> originalLeases = new ArrayList<>();
+
+        for (int i = 0; i < 8; i++) {
+            originalLeases.add(tableLease("node" + i, new UUID(0, i + 1), 
null, i));
+        }
+
+        originalLeases.add(tableLease("node2", new UUID(0, 9), "node3", 8));
+
+        LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+        byte[] bytes = VersionedSerialization.toBytes(originalBatch, 
serializer);
+
+        LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        assertThat(restoredBatch.leases(), 
containsInAnyOrder(originalBatch.leases().toArray()));
+        assertEquals(
+                
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+                
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+        );
+    }
+
+    @Test
+    void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws 
IOException {
+        byte[] bytes = 
v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding();
+
+        LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes, 
serializer);
+
+        Lease expectedLease = new Lease(
+                "node0",
+                new UUID(0, 1),
+                new HybridTimestamp(900, 0),
+                new HybridTimestamp(1000, 0),
+                true,
+                true,
+                "node1",
+                new TablePartitionId(1, 0)
+        );
+
+        assertThat(restoredBatch.leases(), containsInAnyOrder(expectedLease));
+        assertEquals(List.of("node1"), 
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()));
+    }
+
+    private static byte[] 
v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding() throws 
IOException {
+        // Crafted manually to reproduce a V1-only layout: V1 allowed compact 
nodes info based only on nameCount,
+        // while the current serializer writes V2 and would not emit compact 
encoding when nodeCount > 8.
+        try (IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(256)) {
+            // Header written by VersionedSerializer.writeExternal(): 
MAGIC(0x43BEEF00) + protocolVersion(1).
+            out.writeInt(0x43BEEF01);
+
+            // Lease batch header.
+            out.writeVarInt(1000); // minExpirationTimePhysical
+            out.writeVarInt(0); // commonExpirationTimePhysicalDelta
+            out.writeVarInt(0); // commonExpirationTimeLogical
+
+            // Nodes dictionary: 8 names.
+            out.writeVarInt(8);
+            for (int i = 0; i < 8; i++) {
+                out.writeUTF("node" + i);
+            }
+
+            // Nodes dictionary: 9 node IDs, with names cycling over 8 entries.
+            out.writeVarInt(9);
+            for (int i = 0; i < 9; i++) {
+                out.writeUuid(new UUID(0, i + 1));
+                out.writeVarInt(i % 8);
+            }
+
+            // Table section with one object and one lease.
+            out.writeVarInt(1); // tableCount
+            out.writeVarInt(1); // objectId delta
+            out.writeVarInt(1); // leaseCount
+            out.write(0x07); // accepted + prolongable + hasProposedCandidate
+            out.writeVarInt(8); // compact nodes info: holder=0, 
proposedCandidate=1
+            out.writeVarInt(100); // period (expirationPhysical - 
startPhysical)
+            out.writeVarInt(0); // startLogical
+
+            return out.array();
+        }
+    }
+
     @SuppressWarnings("unused")
     private String v1LeaseBatchAsBase64WithTablePartitions() {
         HybridTimestamp baseTs = baseTs();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 582bee31800..6d4e941551e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -254,7 +254,6 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316";)
     void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops() 
throws Exception {
         for (int i = 1; i < 8; i++) {
             startNode(i, CUSTOM_NODES_CONFIG);
@@ -316,7 +315,6 @@ public class 
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316";)
     void 
testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops() 
throws Exception {
         for (int i = 1; i < 8; i++) {
             startNode(i, CUSTOM_NODES_CONFIG);

Reply via email to