This is an automated email from the ASF dual-hosted git repository.
ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3c739b9e6c IGNITE-28013 Fix stale leaseholder ID handling in
LeaseUpdater and lease batch serialization (#7769)
d3c739b9e6c is described below
commit d3c739b9e6c71d2e859b94cee1fa94331ade242a
Author: Anton Laletin <[email protected]>
AuthorDate: Wed Apr 8 17:13:01 2026 +0400
IGNITE-28013 Fix stale leaseholder ID handling in LeaseUpdater and lease
batch serialization (#7769)
---
.../internal/placementdriver/LeaseUpdater.java | 1 -
.../leases/LeaseBatchSerializer.java | 33 ++-
.../placementdriver/leases/NodesDictionary.java | 4 +
.../internal/placementdriver/LeaseUpdaterTest.java | 246 +++++++++++++++++++--
.../leases/LeaseBatchSerializerTest.java | 118 ++++++++++
...ilablePartitionsRecoveryByFilterUpdateTest.java | 2 -
6 files changed, 381 insertions(+), 23 deletions(-)
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index b5cbf650144..6816eed83f5 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -486,7 +486,6 @@ public class LeaseUpdater {
: lease.proposedCandidate();
InternalClusterNode candidate =
nextLeaseHolder(stableAssignments, pendingAssignments, grpId,
proposedLeaseholder);
-
boolean canBeProlonged = lease.isAccepted()
&& lease.isProlongable()
&& candidate != null &&
candidate.id().equals(lease.getLeaseholderId());
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
index 6359bd2b527..11c82e47946 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializer.java
@@ -141,6 +141,15 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
/** Mask to extract lease holder index from compact representation. */
private static final int COMPACT_HOLDER_INDEX_MASK = (1 <<
BIT_WIDTH_TO_FIT_IN_HALF_BYTE) - 1;
+ private static final byte PROTOCOL_V1 = 1;
+
+ private static final byte PROTOCOL_V2 = 2;
+
+ @Override
+ protected byte getProtocolVersion() {
+ return PROTOCOL_V2;
+ }
+
@Override
protected void writeExternalData(LeaseBatch batch, IgniteDataOutput out)
throws IOException {
long minExpirationTimePhysical = minExpirationTimePhysicalPart(batch);
@@ -356,7 +365,9 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
private static boolean
holderIdAndProposedCandidateFitIn1Byte(NodesDictionary dictionary) {
// Up to 8 names means that for name index it's enough to have 3 bits,
same for node index, so, in sum, they
// require up to 6 bits, and we have 7 bits in a varint byte.
- return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+ // We need to check both: name count (for proposed candidate index)
and node count (for holder node index),
+ // as these can diverge when nodes restart with new UUIDs but the same
name.
+ return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE &&
dictionary.nodeCount() <= MAX_NODES_FOR_COMPACT_MODE;
}
private static int flags(
@@ -378,6 +389,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
long minExpirationTimePhysical = in.readVarInt();
HybridTimestamp commonExpirationTime = new
HybridTimestamp(minExpirationTimePhysical + in.readVarInt(),
in.readVarIntAsInt());
NodesDictionary nodesDictionary = NodesDictionary.readFrom(in);
+ boolean canReadNodesInfoCompactly =
holderIdAndProposedCandidateFitIn1ByteForRead(protoVer, nodesDictionary);
List<Lease> leases = new ArrayList<>();
@@ -385,6 +397,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
minExpirationTimePhysical,
commonExpirationTime,
nodesDictionary,
+ canReadNodesInfoCompactly,
leases,
in,
TablePartitionId::new
@@ -395,6 +408,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
minExpirationTimePhysical,
commonExpirationTime,
nodesDictionary,
+ canReadNodesInfoCompactly,
leases,
in,
ZonePartitionId::new
@@ -408,6 +422,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
long minExpirationTimePhysical,
HybridTimestamp commonExpirationTime,
NodesDictionary nodesDictionary,
+ boolean canReadNodesInfoCompactly,
List<Lease> leases,
IgniteDataInput in,
GroupIdFactory groupIdFactory
@@ -420,6 +435,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
minExpirationTimePhysical,
commonExpirationTime,
nodesDictionary,
+ canReadNodesInfoCompactly,
leases,
in,
groupIdFactory,
@@ -432,6 +448,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
long minExpirationTimePhysical,
HybridTimestamp commonExpirationTime,
NodesDictionary nodesDictionary,
+ boolean canReadNodesInfoCompactly,
List<Lease> leases,
IgniteDataInput in,
GroupIdFactory groupIdFactory,
@@ -447,6 +464,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
minExpirationTimePhysical,
commonExpirationTime,
nodesDictionary,
+ canReadNodesInfoCompactly,
in,
groupIdFactory
);
@@ -464,6 +482,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
long minExpirationTimePhysical,
HybridTimestamp commonExpirationTime,
NodesDictionary nodesDictionary,
+ boolean canReadNodesInfoCompactly,
IgniteDataInput in,
GroupIdFactory groupIdFactory
) throws IOException {
@@ -477,7 +496,7 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
int holderNodeIndex;
int proposedCandidateNodeIndex = -1;
- if (holderIdAndProposedCandidateFitIn1Byte(nodesDictionary)) {
+ if (canReadNodesInfoCompactly) {
int nodesInfo = in.readVarIntAsInt();
holderNodeIndex = unpackHolderNodeIndex(nodesInfo);
@@ -538,6 +557,16 @@ public class LeaseBatchSerializer extends
VersionedSerializer<LeaseBatch> {
return (flags & mask) != 0;
}
+ private static boolean holderIdAndProposedCandidateFitIn1ByteForRead(byte
protoVer, NodesDictionary dictionary) {
+ if (protoVer == PROTOCOL_V1) {
+ // In V1 format, we assumed that name and node tables have the
same size,
+ // so compact-mode eligibility was determined only by the name
table size.
+ return dictionary.nameCount() <= MAX_NODES_FOR_COMPACT_MODE;
+ }
+
+ return holderIdAndProposedCandidateFitIn1Byte(dictionary);
+ }
+
@FunctionalInterface
private interface GroupIdFactory {
PartitionGroupId create(int objectId, int partitionId);
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
index 967a0a4fa27..f1913bf8c2c 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/NodesDictionary.java
@@ -132,6 +132,10 @@ final class NodesDictionary {
return nameIndexToName.size();
}
+ int nodeCount() {
+ return nodeIndexToId.size();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
index e23632ddef1..58181434cbe 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseUpdaterTest.java
@@ -38,9 +38,12 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@@ -49,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -119,6 +123,9 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
@Mock
MetaStorageManager metaStorageManager;
+ @Mock
+ private LeaseTracker leaseTracker;
+
/** Lease updater for tests. */
private LeaseUpdater leaseUpdater;
@@ -127,6 +134,9 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
/** Closure to get a lease that is passed in Meta storage. */
private volatile Consumer<Lease> renewLeaseConsumer = null;
+ /** Closure to get a lease batch that is passed in Meta storage. */
+ private volatile Consumer<LeaseBatch> renewLeaseBatchConsumer = null;
+
private static ZonePartitionId replicationGroupId(int objectId, int
partId) {
return new ZonePartitionId(objectId, partId);
}
@@ -142,13 +152,12 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp(
@Mock ClusterService clusterService,
- @Mock LeaseTracker leaseTracker,
@Mock MessagingService messagingService
) {
mockStableAssignments(Set.of(Assignment.forPeer(stableNode.name())));
mockPendingAssignments(Set.of(Assignment.forPeer(pendingNode.name())));
- when(messagingService.invoke(anyString(), any(), anyLong()))
+ lenient().when(messagingService.invoke(anyString(), any(), anyLong()))
.then(i ->
completedFuture(PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse().accepted(true).build()));
when(clusterService.messagingService()).thenReturn(messagingService);
@@ -163,6 +172,7 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
lenient().when(metaStorageManager.invoke(any(Condition.class),
any(Operation.class), any(Operation.class)))
.thenAnswer(invocation -> {
Consumer<Lease> leaseConsumer = renewLeaseConsumer;
+ Consumer<LeaseBatch> leaseBatchConsumer =
renewLeaseBatchConsumer;
if (leaseConsumer != null) {
OperationImpl op = invocation.getArgument(1);
@@ -173,6 +183,14 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
leaseConsumer.accept(lease);
}
+ if (leaseBatchConsumer != null) {
+ OperationImpl op = invocation.getArgument(1);
+
+ LeaseBatch batch =
LeaseBatch.fromBytes(toByteArray(op.value()));
+
+ leaseBatchConsumer.accept(batch);
+ }
+
return trueCompletedFuture();
});
@@ -338,6 +356,171 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
assertEquals(lease.getLeaseholder(), renewedLease.getLeaseholder());
}
+ @Test
+ public void testStaleLeaseholderIdCanCoexistWithCurrentNodeIdsInBatch()
throws Exception {
+ List<LogicalNode> currentTopologyNodes = IntStream.range(0, 8)
+ .mapToObj(i -> new LogicalNode(
+ randomUUID(),
+ "node-" + i,
+ NetworkAddress.from("127.0.0.1:" + (11_000 + i))
+ ))
+ .collect(Collectors.toList());
+
+
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, currentTopologyNodes)));
+
+ Map<ZonePartitionId, Set<Assignment>> stableAssignmentsByGroup = new
LinkedHashMap<>();
+ Map<ReplicationGroupId, Lease> staleLeasesByGroup = new
LinkedHashMap<>();
+
+ List<LogicalNode> leaseholdersByGroup = List.of(0, 0, 1, 2, 3, 4, 5,
6, 7).stream()
+ .map(currentTopologyNodes::get)
+ .collect(Collectors.toList());
+
+ long now = System.currentTimeMillis();
+ UUID staleNode0Id = randomUUID();
+
+ for (int i = 0; i < leaseholdersByGroup.size(); i++) {
+ ZonePartitionId grpId = replicationGroupId(1, i);
+ LogicalNode logicalNode = leaseholdersByGroup.get(i);
+
+ stableAssignmentsByGroup.put(grpId,
Set.of(Assignment.forPeer(logicalNode.name())));
+
+ boolean expiredLease = i == 0;
+ UUID leaseholderId = i == 1
+ ? staleNode0Id
+ : logicalNode.id();
+ HybridTimestamp expirationTime = new HybridTimestamp(expiredLease
? now - 1_000 : now + 60_000, 0);
+
+ staleLeasesByGroup.put(
+ grpId,
+ new Lease(
+ logicalNode.name(),
+ leaseholderId,
+ new HybridTimestamp(now - 10_000, 0),
+ expirationTime,
+ true,
+ true,
+ null,
+ grpId
+ )
+ );
+ }
+
+ mockStableAssignments(stableAssignmentsByGroup);
+ mockPendingAssignments(Map.of());
+
+ Leases currentLeases = new Leases(staleLeasesByGroup,
BYTE_EMPTY_ARRAY);
+
+ lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
->
+
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0),
Lease.emptyLease(invocation.getArgument(0))));
+
+ initAndActivateLeaseUpdater();
+
+ LeaseBatch renewedBatch = awaitForLeaseBatch(5_000);
+
+ assertEquals(leaseholdersByGroup.size(), renewedBatch.leases().size());
+ assertEquals(8,
renewedBatch.leases().stream().map(Lease::getLeaseholder).distinct().count());
+ assertEquals(9,
renewedBatch.leases().stream().map(Lease::getLeaseholderId).distinct().count());
+
+ Map<ReplicationGroupId, Lease> renewedLeaseByGroup =
renewedBatch.leases().stream()
+ .collect(Collectors.toMap(Lease::replicationGroupId, lease ->
lease));
+
+ Lease expiredLeaseWithCurrentId =
renewedLeaseByGroup.get(replicationGroupId(1, 0));
+ assertEquals(leaseholdersByGroup.get(0).id(),
expiredLeaseWithCurrentId.getLeaseholderId());
+
+ Lease nonExpiredLeaseWithStaleId =
renewedLeaseByGroup.get(replicationGroupId(1, 1));
+ assertEquals(staleNode0Id,
nonExpiredLeaseWithStaleId.getLeaseholderId());
+ }
+
+ @Test
+ public void testNonExpiredAcceptedLeasesKeepLeaseholderIdentity() throws
Exception {
+ List<LogicalNode> currentTopologyNodes = IntStream.range(0, 3)
+ .mapToObj(i -> new LogicalNode(
+ randomUUID(),
+ "node-" + i,
+ NetworkAddress.from("127.0.0.1:" + (11_000 + i))
+ ))
+ .collect(Collectors.toList());
+
+
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, currentTopologyNodes)));
+
+ long now = System.currentTimeMillis();
+ UUID staleNode1Id = randomUUID();
+
+ ZonePartitionId expiredGrpId = replicationGroupId(1, 0);
+ ZonePartitionId nonExpiredStaleIdGrpId = replicationGroupId(1, 1);
+ ZonePartitionId nonExpiredCurrentIdGrpId = replicationGroupId(1, 2);
+
+ Map<ZonePartitionId, Set<Assignment>> stableAssignmentsByGroup =
Map.of(
+ expiredGrpId,
Set.of(Assignment.forPeer(currentTopologyNodes.get(0).name())),
+ nonExpiredStaleIdGrpId,
Set.of(Assignment.forPeer(currentTopologyNodes.get(1).name())),
+ nonExpiredCurrentIdGrpId,
Set.of(Assignment.forPeer(currentTopologyNodes.get(2).name()))
+ );
+
+ mockStableAssignments(stableAssignmentsByGroup);
+ mockPendingAssignments(Map.of());
+
+ Lease expiredLease = new Lease(
+ currentTopologyNodes.get(0).name(),
+ currentTopologyNodes.get(0).id(),
+ new HybridTimestamp(now - 10_000, 0),
+ new HybridTimestamp(now - 1_000, 0),
+ true,
+ true,
+ null,
+ expiredGrpId
+ );
+
+ Lease nonExpiredLeaseWithStaleId = new Lease(
+ currentTopologyNodes.get(1).name(),
+ staleNode1Id,
+ new HybridTimestamp(now - 10_000, 0),
+ new HybridTimestamp(now + 60_000, 0),
+ true,
+ true,
+ null,
+ nonExpiredStaleIdGrpId
+ );
+
+ Lease nonExpiredLeaseWithCurrentId = new Lease(
+ currentTopologyNodes.get(2).name(),
+ currentTopologyNodes.get(2).id(),
+ new HybridTimestamp(now - 10_000, 0),
+ new HybridTimestamp(now + 60_000, 0),
+ true,
+ true,
+ null,
+ nonExpiredCurrentIdGrpId
+ );
+
+ Map<ReplicationGroupId, Lease> leasesByGroup = Map.of(
+ expiredGrpId, expiredLease,
+ nonExpiredStaleIdGrpId, nonExpiredLeaseWithStaleId,
+ nonExpiredCurrentIdGrpId, nonExpiredLeaseWithCurrentId
+ );
+
+ Leases currentLeases = new Leases(leasesByGroup, BYTE_EMPTY_ARRAY);
+
+ lenient().when(leaseTracker.leasesLatest()).thenReturn(currentLeases);
+
lenient().when(leaseTracker.getLease(any(ReplicationGroupId.class))).thenAnswer(invocation
->
+
currentLeases.leaseByGroupId().getOrDefault(invocation.getArgument(0),
Lease.emptyLease(invocation.getArgument(0))));
+
+ initAndActivateLeaseUpdater();
+
+ LeaseBatch renewedBatch = awaitForLeaseBatch(5_000);
+
+ Map<ReplicationGroupId, Lease> renewedLeaseByGroup =
renewedBatch.leases().stream()
+ .collect(Collectors.toMap(Lease::replicationGroupId, lease ->
lease));
+
+ Lease renewedNonExpiredWithStaleId =
renewedLeaseByGroup.get(nonExpiredStaleIdGrpId);
+ assertEquals(nonExpiredLeaseWithStaleId.getLeaseholder(),
renewedNonExpiredWithStaleId.getLeaseholder());
+ assertEquals(nonExpiredLeaseWithStaleId.getLeaseholderId(),
renewedNonExpiredWithStaleId.getLeaseholderId());
+
+ Lease renewedNonExpiredWithCurrentId =
renewedLeaseByGroup.get(nonExpiredCurrentIdGrpId);
+ assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholder(),
renewedNonExpiredWithCurrentId.getLeaseholder());
+ assertEquals(nonExpiredLeaseWithCurrentId.getLeaseholderId(),
renewedNonExpiredWithCurrentId.getLeaseholderId());
+ }
+
@Test
public void testLeaseAmongPendings() throws Exception {
when(topologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, List.of(pendingNode))));
@@ -382,29 +565,43 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
}
private void mockPendingAssignments(Set<Assignment> assignments) {
- Entry pendingEntry = new EntryImpl(
- pendingAssignmentsQueueKey(replicationGroupId(1, 0)).bytes(),
-
AssignmentsQueue.toBytes(Assignments.of(HybridTimestamp.MIN_VALUE.longValue(),
assignments.toArray(Assignment[]::new))),
- 1,
- new HybridClockImpl().now()
- );
+ mockPendingAssignments(Map.of(replicationGroupId(1, 0), assignments));
+ }
+
+ private void mockPendingAssignments(Map<ZonePartitionId, Set<Assignment>>
assignmentsByGroup) {
+ List<Entry> pendingEntries = assignmentsByGroup.entrySet().stream()
+ .map(entry -> new EntryImpl(
+ pendingAssignmentsQueueKey(entry.getKey()).bytes(),
+ AssignmentsQueue.toBytes(
+
Assignments.of(HybridTimestamp.MIN_VALUE.longValue(),
entry.getValue().toArray(Assignment[]::new))
+ ),
+ 1,
+ new HybridClockImpl().now()
+ ))
+ .collect(Collectors.toList());
byte[] prefixBytes =
ZoneRebalanceUtil.PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES;
when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)),
anyLong()))
- .thenReturn(Cursor.fromIterable(List.of(pendingEntry)));
+ .thenReturn(Cursor.fromIterable(pendingEntries));
}
private void mockStableAssignments(Set<Assignment> assignments) {
- Entry stableEntry = new EntryImpl(
- stableAssignmentsKey(replicationGroupId(1, 0)).bytes(),
- Assignments.of(HybridTimestamp.MIN_VALUE.longValue(),
assignments.toArray(Assignment[]::new)).toBytes(),
- 1,
- new HybridClockImpl().now()
- );
+ mockStableAssignments(Map.of(replicationGroupId(1, 0), assignments));
+ }
+
+ private void mockStableAssignments(Map<ZonePartitionId, Set<Assignment>>
assignmentsByGroup) {
+ List<Entry> stableEntries = assignmentsByGroup.entrySet().stream()
+ .map(entry -> new EntryImpl(
+ stableAssignmentsKey(entry.getKey()).bytes(),
+ Assignments.of(HybridTimestamp.MIN_VALUE.longValue(),
entry.getValue().toArray(Assignment[]::new)).toBytes(),
+ 1,
+ new HybridClockImpl().now()
+ ))
+ .collect(Collectors.toList());
byte[] prefixBytes = ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
when(metaStorageManager.prefixLocally(eq(new ByteArray(prefixBytes)),
anyLong()))
- .thenReturn(Cursor.fromIterable(List.of(stableEntry)));
+ .thenReturn(Cursor.fromIterable(stableEntries));
}
private void initAndActivateLeaseUpdater() {
@@ -453,8 +650,8 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
* Waits for lease write to Meta storage.
*
* @param needAccepted Whether to wait only for accepted lease.
- * @param previousLease Previous lease. If not null, then wait for any
lease having expiration time other than the previous has (i.e.
- * either another lease or prolonged lease).
+ * @param previousLease Previous lease. If not null, then wait for any
lease having expiration time other than the previous has
+ * (i.e. either another lease or prolonged lease).
* @param timeoutMillis Timeout in milliseconds to wait for lease.
* @return A lease.
* @throws InterruptedException if the wait is interrupted.
@@ -481,6 +678,19 @@ public class LeaseUpdaterTest extends
BaseIgniteAbstractTest {
return renewedLease.get();
}
+ private LeaseBatch awaitForLeaseBatch(long timeoutMillis) throws
InterruptedException {
+ AtomicReference<LeaseBatch> renewedBatch = new AtomicReference<>();
+
+ renewLeaseBatchConsumer = batch -> {
+ renewedBatch.set(batch);
+ renewLeaseBatchConsumer = null;
+ };
+
+ assertTrue(IgniteTestUtils.waitForCondition(() -> renewedBatch.get()
!= null, timeoutMillis));
+
+ return renewedBatch.get();
+ }
+
/**
* Gets a lease updater tread.
*
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
index e359c152fc3..a6ee8fe24e0 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/leases/LeaseBatchSerializerTest.java
@@ -25,9 +25,11 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
+import java.io.IOException;
import java.time.LocalDateTime;
import java.time.Month;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.UUID;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.util.io.IgniteUnsafeDataOutput;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -202,6 +205,22 @@ class LeaseBatchSerializerTest {
verifySerializationAndDeserializationGivesSameResult(originalBatch);
}
+ @Test
+ void batchWithExactly8NodeNamesAndMoreThan8NodeIds() {
+ List<Lease> originalLeases = IntStream.range(0, 8)
+ .mapToObj(n -> {
+ String nodeName = "node" + n;
+ return tableLease(nodeName, randomUUID(), nodeName, n);
+ })
+ .collect(toList());
+
+ originalLeases.add(tableLease("node0", randomUUID(), "node0", 8));
+
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+
+ verifySerializationAndDeserializationGivesSameResult(originalBatch);
+ }
+
@Test
void batchWithMoreThan8NodeNames() {
List<Lease> originalLeases = IntStream.range(0, 9)
@@ -300,6 +319,105 @@ class LeaseBatchSerializerTest {
);
}
+ @Test
+ void v2CanBeDeserialized() {
+ LeaseBatch originalBatch = new LeaseBatch(createLeases(baseTs(),
TablePartitionId::new));
+
+ byte[] bytes = VersionedSerialization.toBytes(originalBatch,
serializer);
+
+ // VersionedSerializer header stores protocol version in the first
byte.
+ assertEquals(2, bytes[0] & 0xFF);
+
+ LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredBatch.leases(),
containsInAnyOrder(originalBatch.leases().toArray()));
+ assertEquals(
+
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+ );
+ }
+
+ @Test
+ void
v2WithExactly8NamesAndMoreThan8NodeIdsDoesNotUseCompactNodesInfoEncoding() {
+ List<Lease> originalLeases = new ArrayList<>();
+
+ for (int i = 0; i < 8; i++) {
+ originalLeases.add(tableLease("node" + i, new UUID(0, i + 1),
null, i));
+ }
+
+ originalLeases.add(tableLease("node2", new UUID(0, 9), "node3", 8));
+
+ LeaseBatch originalBatch = new LeaseBatch(originalLeases);
+ byte[] bytes = VersionedSerialization.toBytes(originalBatch,
serializer);
+
+ LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ assertThat(restoredBatch.leases(),
containsInAnyOrder(originalBatch.leases().toArray()));
+ assertEquals(
+
originalBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()),
+
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList())
+ );
+ }
+
+ @Test
+ void v1WithExactly8NamesAndMoreThan8NodeIdsCanBeDeserialized() throws
IOException {
+ byte[] bytes =
v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding();
+
+ LeaseBatch restoredBatch = VersionedSerialization.fromBytes(bytes,
serializer);
+
+ Lease expectedLease = new Lease(
+ "node0",
+ new UUID(0, 1),
+ new HybridTimestamp(900, 0),
+ new HybridTimestamp(1000, 0),
+ true,
+ true,
+ "node1",
+ new TablePartitionId(1, 0)
+ );
+
+ assertThat(restoredBatch.leases(), containsInAnyOrder(expectedLease));
+ assertEquals(List.of("node1"),
restoredBatch.leases().stream().map(Lease::proposedCandidate).collect(toList()));
+ }
+
+ private static byte[]
v1BytesWithExactly8NamesAndMoreThan8NodeIdsUsingCompactEncoding() throws
IOException {
+ // Crafted manually to reproduce a V1-only layout: V1 allowed compact
nodes info based only on nameCount,
+ // while the current serializer writes V2 and would not emit compact
encoding when nodeCount > 8.
+ try (IgniteUnsafeDataOutput out = new IgniteUnsafeDataOutput(256)) {
+ // Header written by VersionedSerializer.writeExternal():
MAGIC(0x43BEEF00) + protocolVersion(1).
+ out.writeInt(0x43BEEF01);
+
+ // Lease batch header.
+ out.writeVarInt(1000); // minExpirationTimePhysical
+ out.writeVarInt(0); // commonExpirationTimePhysicalDelta
+ out.writeVarInt(0); // commonExpirationTimeLogical
+
+ // Nodes dictionary: 8 names.
+ out.writeVarInt(8);
+ for (int i = 0; i < 8; i++) {
+ out.writeUTF("node" + i);
+ }
+
+ // Nodes dictionary: 9 node IDs, with names cycling over 8 entries.
+ out.writeVarInt(9);
+ for (int i = 0; i < 9; i++) {
+ out.writeUuid(new UUID(0, i + 1));
+ out.writeVarInt(i % 8);
+ }
+
+ // Table section with one object and one lease.
+ out.writeVarInt(1); // tableCount
+ out.writeVarInt(1); // objectId delta
+ out.writeVarInt(1); // leaseCount
+ out.write(0x07); // accepted + prolongable + hasProposedCandidate
+ out.writeVarInt(8); // compact nodes info: holder=0,
proposedCandidate=1
+ out.writeVarInt(100); // period (expirationPhysical -
startPhysical)
+ out.writeVarInt(0); // startLogical
+
+ return out.array();
+ }
+ }
+
@SuppressWarnings("unused")
private String v1LeaseBatchAsBase64WithTablePartitions() {
HybridTimestamp baseTs = baseTs();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 582bee31800..6d4e941551e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -254,7 +254,6 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316")
void testNodesWaitForLastNodeFromChainToComeBackOnlineAfterMajorityStops()
throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);
@@ -316,7 +315,6 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-28316")
void
testNodesWaitForNodesFromGracefulChainToComeBackOnlineAfterMajorityStops()
throws Exception {
for (int i = 1; i < 8; i++) {
startNode(i, CUSTOM_NODES_CONFIG);