This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new e06d6142d3 IGNITE-20883 ItDataSchemaSyncTest.checkSchemasCorrectlyRestore() is flaky with The query was cancelled while executing (#3010) e06d6142d3 is described below commit e06d6142d32da1684c777a1e9b38b4d92db9ac91 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Mon Jan 8 14:03:03 2024 +0300 IGNITE-20883 ItDataSchemaSyncTest.checkSchemasCorrectlyRestore() is flaky with The query was cancelled while executing (#3010) --- .../handler/ClientPrimaryReplicaTracker.java | 5 +- .../handler/ClientPrimaryReplicaTrackerTest.java | 3 +- .../ignite/client/handler/FakePlacementDriver.java | 9 +- .../ignite/client/TestClientHandlerModule.java | 6 +- .../internal/index/IndexBuildController.java | 6 +- .../internal/index/IndexBuildControllerTest.java | 8 +- .../event/PrimaryReplicaEventParameters.java | 18 ++- .../PlacementDriverManagerTest.java | 123 ++++++++++++++++++--- .../placementdriver/leases/LeaseTracker.java | 20 +++- .../placementdriver/PlacementDriverTest.java | 2 +- .../replicator/PartitionReplicaListener.java | 4 +- .../PartitionReplicaListenerDurableUnlockTest.java | 16 ++- 12 files changed, 185 insertions(+), 35 deletions(-) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java index 89d44e71dc..a607b41620 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java @@ -98,7 +98,8 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter PlacementDriver placementDriver, CatalogService catalogService, HybridClock clock, - SchemaSyncService schemaSyncService) { + SchemaSyncService schemaSyncService + ) { this.placementDriver = placementDriver; this.catalogService = catalogService; this.clock = clock; @@ -302,6 +303,8 @@ public class ClientPrimaryReplicaTracker implements EventListener<EventParameter } TablePartitionId tablePartitionId = (TablePartitionId) primaryReplicaEvent.groupId(); + + // TODO: IGNITE-21202 Use the leaseholder ID for thin clients as well. updatePrimaryReplica(tablePartitionId, primaryReplicaEvent.startTime(), primaryReplicaEvent.leaseholder()); return falseCompletedFuture(); // false: don't remove listener. diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java index 416bde7435..08e7d31107 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTrackerTest.java @@ -68,7 +68,8 @@ class ClientPrimaryReplicaTrackerTest extends BaseIgniteAbstractTest { driver, new FakeCatalogService(PARTITIONS), new TestHybridClock(currentTime::get), - new AlwaysSyncedSchemaSyncService()); + new AlwaysSyncedSchemaSyncService() + ); } @Test diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java index 9cd149bbc7..5853de37f1 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java @@ -73,7 +73,12 @@ public class FakePlacementDriver extends AbstractEventProducer<PrimaryReplicaEve TablePartitionId groupId = new TablePartitionId(tableId, partition); PrimaryReplicaEventParameters params = new PrimaryReplicaEventParameters( - 0, groupId, replica, HybridTimestamp.hybridTimestamp(leaseStartTime)); + 0, + groupId, + replica, + replica, + HybridTimestamp.hybridTimestamp(leaseStartTime) + ); fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, params); } @@ -108,7 +113,7 @@ public class FakePlacementDriver extends AbstractEventProducer<PrimaryReplicaEve @Override public String getLeaseholderId() { - return null; + return leaseholder; } @Override diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index b161f4df7b..bd40a832ee 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -221,7 +221,11 @@ public class TestClientHandlerModule implements IgniteComponent { catalogService, connectionIdGen.incrementAndGet(), new ClientPrimaryReplicaTracker( - placementDriver, catalogService, clock, new AlwaysSyncedSchemaSyncService()) + placementDriver, + catalogService, + clock, + new AlwaysSyncedSchemaSyncService() + ) ) ); } diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java index 81ae939949..e560aff1df 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java @@ -187,7 +187,7 @@ class IndexBuildController implements ManuallyCloseable { return inBusyLockAsync(busyLock, () -> { TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); - if (isLocalNode(parameters.leaseholder())) { + if (isLocalNode(parameters.leaseholderId())) { primaryReplicaIds.add(primaryReplicaId); // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the @@ -316,8 +316,8 @@ class IndexBuildController implements ManuallyCloseable { ); } - private boolean isLocalNode(String nodeConsistentId) { - return nodeConsistentId.equals(localNode().name()); + private boolean isLocalNode(String nodeId) { + return nodeId.equals(localNode().id()); } private ClusterNode localNode() { diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java index 4ed34ea4e4..1b5d6f1169 100644 --- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java @@ -308,7 +308,13 @@ public class IndexBuildControllerTest extends BaseIgniteAbstractTest { return replicaMetaFuture.thenCompose(replicaMeta -> fireEvent( PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, - new PrimaryReplicaEventParameters(causalityToken, replicaId, replicaMeta.getLeaseholder(), replicaMeta.getStartTime()) + new PrimaryReplicaEventParameters( + causalityToken, + replicaId, + replicaMeta.getLeaseholderId(), + replicaMeta.getLeaseholder(), + replicaMeta.getStartTime() + ) )); } } diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java index 8133ceeb56..ba377f043e 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEventParameters.java @@ -25,6 +25,8 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; public class PrimaryReplicaEventParameters extends CausalEventParameters { private final ReplicationGroupId groupId; + private final String leaseholderId; + private final String leaseholder; private final HybridTimestamp startTime; @@ -34,13 +36,21 @@ public class PrimaryReplicaEventParameters extends CausalEventParameters { * * @param causalityToken Causality token. * @param groupId Replication group ID. + * @param leaseholderId Leaseholder node ID. * @param leaseholder Leaseholder node consistent ID. * @param startTime Lease start timestamp. */ - public PrimaryReplicaEventParameters(long causalityToken, ReplicationGroupId groupId, String leaseholder, HybridTimestamp startTime) { + public PrimaryReplicaEventParameters( + long causalityToken, + ReplicationGroupId groupId, + String leaseholderId, + String leaseholder, + HybridTimestamp startTime + ) { super(causalityToken); this.groupId = groupId; + this.leaseholderId = leaseholderId; this.leaseholder = leaseholder; this.startTime = startTime; } @@ -50,7 +60,13 @@ public class PrimaryReplicaEventParameters extends CausalEventParameters { return groupId; } + /** Returns leaseholder node ID. */ + public String leaseholderId() { + return leaseholderId; + } + /** Returns leaseholder node consistent ID. */ + @Deprecated public String leaseholder() { return leaseholder; } diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index 05ed583589..89fc69306d 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -60,6 +61,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; @@ -113,6 +115,8 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { private ClusterService clusterService; + private LogicalTopologyServiceTestImpl logicalTopologyService; + /** This service is used to redirect a lease proposal. */ private ClusterService anotherClusterService; @@ -168,7 +172,7 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener(); - LogicalTopologyService logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService); + logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService); TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( clusterService, @@ -340,40 +344,122 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { } @Test - public void testPrimaryReplicaExpired() throws Exception { - AtomicBoolean leaseExpired = new AtomicBoolean(); - + public void testPrimaryReplicaEvents() throws Exception { TablePartitionId grpPart0 = createTableAssignment(metaStorageManager, nextTableId.incrementAndGet(), List.of(nodeName)); - placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (evt, e) -> { - log.info("Primary replica is expired [grp={}]", grpPart0); + Lease lease1 = checkLeaseCreated(grpPart0, true); + + ConcurrentHashMap<String, HybridTimestamp> electedEvts = new ConcurrentHashMap<>(2); + ConcurrentHashMap<String, HybridTimestamp> expiredEvts = new ConcurrentHashMap<>(2); + + placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (evt, e) -> { + log.info("Primary replica is elected [grp={}]", evt.groupId()); - leaseExpired.set(true); + electedEvts.put(evt.leaseholderId(), evt.startTime()); return falseCompletedFuture(); }); - Lease lease1 = checkLeaseCreated(grpPart0, true); + placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (evt, e) -> { + log.info("Primary replica is expired [grp={}]", evt.groupId()); + + expiredEvts.put(evt.leaseholderId(), evt.startTime()); - assertFalse(leaseExpired.get()); + return falseCompletedFuture(); + }); Set<Assignment> assignments = calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 1); metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + grpPart0), ByteUtils.toBytes(assignments)); assertTrue(waitForCondition(() -> { - var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY); + CompletableFuture<ReplicaMeta> fut = placementDriverManager.placementDriver() + .getPrimaryReplica(grpPart0, lease1.getExpirationTime()); - Lease lease = leaseFromBytes(fut.join().value(), grpPart0); + ReplicaMeta meta = fut.join(); - return lease.getLeaseholder().equals(anotherNodeName); + return meta != null && meta.getLeaseholder().equals(anotherNodeName); }, 10_000)); Lease lease2 = checkLeaseCreated(grpPart0, true); assertNotEquals(lease1.getLeaseholder(), lease2.getLeaseholder()); - assertTrue(leaseExpired.get()); + assertEquals(1, electedEvts.size()); + assertEquals(1, expiredEvts.size()); + + assertTrue(electedEvts.containsKey(lease2.getLeaseholderId())); + assertTrue(expiredEvts.containsKey(lease1.getLeaseholderId())); + + stopAnotherNode(anotherClusterService); + anotherClusterService = startAnotherNode(anotherNodeName, PORT + 1); + + assertTrue(waitForCondition(() -> { + CompletableFuture<ReplicaMeta> fut = placementDriverManager.placementDriver() + .getPrimaryReplica(grpPart0, lease2.getExpirationTime()); + + ReplicaMeta meta = fut.join(); + + return meta != null && meta.getLeaseholderId().equals(anotherClusterService.topologyService().localMember().id()); + }, 10_000)); + + Lease lease3 = checkLeaseCreated(grpPart0, true); + + assertEquals(2, electedEvts.size()); + assertEquals(2, expiredEvts.size()); + + assertTrue(electedEvts.containsKey(lease3.getLeaseholderId())); + assertTrue(expiredEvts.containsKey(lease2.getLeaseholderId())); + } + + /** + * Stops another node. + * + * @param nodeClusterService Node service to stop. + * @throws Exception If failed. + */ + private void stopAnotherNode(ClusterService nodeClusterService) throws Exception { + nodeClusterService.beforeNodeStop(); + nodeClusterService.stop(); + + assertTrue(waitForCondition( + () -> !clusterService.topologyService().allMembers().contains(nodeClusterService.topologyService().localMember()), + 10_000 + )); + + logicalTopologyService.updateTopology(); + } + + /** + * Starts another node. + * + * @param nodeName Node name. + * @param port Node port. + * @return Cluster service for the newly started node. + * @throws Exception If failed. + */ + private ClusterService startAnotherNode(String nodeName, int port) throws Exception { + ClusterService nodeClusterService = ClusterServiceTestUtils.clusterService( + testInfo, + port, + new StaticNodeFinder(Collections.singletonList(new NetworkAddress("localhost", PORT))) + ); + + nodeClusterService.messagingService().addMessageHandler( + PlacementDriverMessageGroup.class, + leaseGrantMessageHandler(nodeName) + ); + + nodeClusterService.start(); + + assertTrue(waitForCondition( + () -> clusterService.topologyService().allMembers().contains(nodeClusterService.topologyService().localMember()), + 10_000 + )); + + logicalTopologyService.updateTopology(); + + return nodeClusterService; } @Test @@ -566,6 +652,8 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { private List<LogicalTopologyEventListener> listeners; + private int ver = 1; + public LogicalTopologyServiceTestImpl(ClusterService clusterService) { this.clusterService = clusterService; this.listeners = new ArrayList<>(); @@ -586,16 +674,19 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { */ public void updateTopology() { if (listeners != null) { - var top = clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet()); + var topologySnapshot = new LogicalTopologySnapshot( + ++ver, + clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet()) + ); - listeners.forEach(lnsr -> lnsr.onTopologyLeap(new LogicalTopologySnapshot(2, top))); + listeners.forEach(lnsr -> lnsr.onTopologyLeap(topologySnapshot)); } } @Override public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() { return completedFuture(new LogicalTopologySnapshot( - 1, + ver, clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet())) ); } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index 42d8e1c120..d44d491fd7 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -323,7 +323,13 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri if (!sameLease) { CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, fireEvent( PRIMARY_REPLICA_EXPIRED, - new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder(), currentLease.getStartTime()) + new PrimaryReplicaEventParameters( + causalityToken, + grpId, + currentLease.getLeaseholderId(), + currentLease.getLeaseholder(), + currentLease.getStartTime() + ) )); assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; @@ -332,13 +338,19 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri } private CompletableFuture<Void> fireEventReplicaBecomePrimary(long causalityToken, Lease lease) { - String leaseholder = lease.getLeaseholder(); + String leaseholderId = lease.getLeaseholderId(); - assert leaseholder != null : lease; + assert leaseholderId != null : lease; return fireEvent( PRIMARY_REPLICA_ELECTED, - new PrimaryReplicaEventParameters(causalityToken, lease.replicationGroupId(), leaseholder, lease.getStartTime()) + new PrimaryReplicaEventParameters( + causalityToken, + lease.replicationGroupId(), + leaseholderId, + lease.getLeaseholder(), + lease.getStartTime() + ) ); } diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java index b1dd5b98df..231c645518 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java @@ -539,7 +539,7 @@ public class PlacementDriverTest extends BaseIgniteAbstractTest { PrimaryReplicaEventParameters parameters ) { assertThat(parameters.groupId(), equalTo(expLease.replicationGroupId())); - assertThat(parameters.leaseholder(), equalTo(expLease.getLeaseholder())); + assertThat(parameters.leaseholderId(), equalTo(expLease.getLeaseholderId())); } private LeaseTracker createPlacementDriver() { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index fb765d00c9..7d38fb75ad 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -353,7 +353,7 @@ public class PartitionReplicaListener implements ReplicaListener { evt.groupId() ); - if (!localNode.name().equals(evt.leaseholder())) { + if (!localNode.id().equals(evt.leaseholderId())) { return falseCompletedFuture(); } @@ -439,7 +439,7 @@ public class PartitionReplicaListener implements ReplicaListener { evt.groupId() ); - if (!localNode.name().equals(evt.leaseholder())) { + if (!localNode.id().equals(evt.leaseholderId())) { return falseCompletedFuture(); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java index 5ea1c8a6d0..23f168bbcf 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java @@ -195,7 +195,13 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes return nullCompletedFuture(); }; - PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now()); + PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters( + 0, + part0, + LOCAL_NODE.id(), + LOCAL_NODE.name(), + clock.now() + ); assertThat(partitionReplicaListener.onPrimaryElected(parameters, null), willSucceedIn(1, SECONDS)); @@ -225,7 +231,13 @@ public class PartitionReplicaListenerDurableUnlockTest extends IgniteAbstractTes return nullCompletedFuture(); }; - PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now()); + PrimaryReplicaEventParameters parameters = new PrimaryReplicaEventParameters( + 0, + part0, + LOCAL_NODE.id(), + LOCAL_NODE.name(), + clock.now() + ); assertThat(partitionReplicaListener.onPrimaryElected(parameters, null), willSucceedIn(1, SECONDS));