This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 89094b9dcf IGNITE-23019 Adjust tests in order to explicitly refresh
raft leader on client when needed. (#4477)
89094b9dcf is described below
commit 89094b9dcf795a3acae3865952692decdcc9551e
Author: Mikhail Efremov <[email protected]>
AuthorDate: Fri Oct 11 17:27:51 2024 +0600
IGNITE-23019 Adjust tests in order to explicitly refresh raft leader on
client when needed. (#4477)
---
.../ignite/internal/raft/ItLearnersTest.java | 39 +++++++++--
.../apache/ignite/internal/raft/ItLozaTest.java | 8 ++-
.../internal/raft/ItRaftGroupServiceTest.java | 4 +-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 2 +
.../raft/storage/impl/RocksDbSharedLogStorage.java | 2 +
.../ignite/internal/raft/RaftGroupServiceTest.java | 75 +++++++++++-----------
.../ItPlacementDriverReplicaSideTest.java | 16 +++--
.../AbstractTopologyAwareGroupServiceTest.java | 14 ++--
.../disaster/ItDisasterRecoverySystemViewTest.java | 44 ++++++++++++-
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 22 +++++++
10 files changed, 166 insertions(+), 60 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
index 03cb4690ef..7db4ad2d05 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLearnersTest.java
@@ -228,10 +228,18 @@ public class ItLearnersTest extends IgniteAbstractTest {
PeersAndLearners configuration =
createConfiguration(List.of(follower), List.of());
- CompletableFuture<RaftGroupService> service1 =
- startRaftGroup(follower,
configuration.peer(follower.consistentId()), configuration, new
TestRaftGroupListener());
+ CompletableFuture<RaftGroupService> service1 = startRaftGroup(
+ follower,
+ configuration.peer(follower.consistentId()),
+ configuration,
+ new TestRaftGroupListener()
+ );
- assertThat(service1.thenApply(RaftGroupService::leader),
willBe(follower.asPeer()));
+ assertThat(
+ service1.thenCompose(service -> service.refreshLeader()
+ .thenApply(v -> service.leader())),
+ willBe(follower.asPeer())
+ );
assertThat(service1.thenApply(RaftGroupService::learners),
willBe(empty()));
CompletableFuture<Void> addLearners = service1
@@ -330,10 +338,27 @@ public class ItLearnersTest extends IgniteAbstractTest {
CompletableFuture<RaftGroupService> peerService = startRaftGroup(node,
peer, configuration, peerListener);
CompletableFuture<RaftGroupService> learnerService =
startRaftGroup(node, learner, configuration, learnerListener);
- assertThat(peerService.thenApply(RaftGroupService::leader),
willBe(peer));
- assertThat(peerService.thenApply(RaftGroupService::leader),
willBe(not(learner)));
- assertThat(learnerService.thenApply(RaftGroupService::leader),
willBe(peer));
- assertThat(learnerService.thenApply(RaftGroupService::leader),
willBe(not(learner)));
+ assertThat(peerService.thenCompose(
+ service -> service.refreshLeader()
+ .thenApply(v -> service.leader())),
+ willBe(peer)
+ );
+ assertThat(
+ // the leader is already refreshed
+ peerService.thenApply(RaftGroupService::leader),
+ willBe(not(learner))
+ );
+
+ assertThat(learnerService.thenCompose(
+ service -> service.refreshLeader()
+ .thenApply(v -> service.leader())),
+ willBe(peer)
+ );
+ assertThat(
+ // the leader is already refreshed
+ learnerService.thenApply(RaftGroupService::leader),
+ willBe(not(learner))
+ );
// Test writing data.
CompletableFuture<?> writeFuture = peerService
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 818c7638d1..76d8e82bac 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -196,7 +196,13 @@ public class ItLozaTest extends IgniteAbstractTest {
.doCallRealMethod()
.when(messagingServiceMock).invoke(any(ClusterNode.class),
any(), anyLong());
- startClient(new TestReplicationGroupId(Integer.toString(i)),
spyService.topologyService().localMember(), partitionsConfigurer);
+ RaftGroupService client = startClient(
+ new TestReplicationGroupId(Integer.toString(i)),
+ spyService.topologyService().localMember(),
+ partitionsConfigurer
+ );
+
+ assertThat(client.refreshLeader(), willCompleteSuccessfully());
verify(messagingServiceMock, times(3 * (i + 1)))
.invoke(any(ClusterNode.class), any(), anyLong());
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index bc41eaad81..124dbe4bec 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -129,7 +129,9 @@ public class ItRaftGroupServiceTest extends
IgniteAbstractTest {
public void testTransferLeadership() {
assertThat(nodes.get(0).raftGroupService, willCompleteSuccessfully());
- Peer leader = nodes.get(0).raftGroupService.join().leader();
+ Peer leader = nodes.get(0).raftGroupService
+ .thenCompose(service -> service.refreshLeader().thenApply(v ->
service.leader()))
+ .join();
TestNode oldLeaderNode = nodes.stream()
.filter(node -> node.name().equals(leader.consistentId()))
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 3d92e3feb9..e6f6a8eec7 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -207,6 +207,8 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
);
}
+ getLeader = false;
+
if (!getLeader) {
return completedFuture(service);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 036233ac44..20e250be78 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -380,6 +380,8 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
}
protected byte[] getValueFromRocksDb(byte[] keyBytes) throws
RocksDBException {
+ assert !db.isClosed() : "RocksDB is already closed.";
+
return this.db.get(this.dataHandle, keyBytes);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index 88bd49cd36..38ee0fda36 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -180,7 +180,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
public void testRefreshLeaderStable() {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertNull(service.leader());
@@ -196,7 +196,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
// Simulate running elections.
leader = null;
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertNull(service.leader());
@@ -212,7 +212,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
executor.schedule((Runnable) () -> leader = NODES.get(0), 500,
TimeUnit.MILLISECONDS);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertNull(service.leader());
@@ -225,7 +225,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
public void testRefreshLeaderWithTimeout() {
mockLeaderRequest(true);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.refreshLeader(), willThrow(TimeoutException.class,
500, TimeUnit.MILLISECONDS));
}
@@ -235,7 +235,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
mockUserInput(false, null);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.refreshLeader(), willCompleteSuccessfully());
@@ -247,7 +247,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
mockUserInput(false, null);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertNull(service.leader());
@@ -261,17 +261,16 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
mockUserInput(true, null);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.run(testWriteCommand()),
willThrow(TimeoutException.class, 500, TimeUnit.MILLISECONDS));
}
@Test
public void testUserRequestLeaderNotElected() {
- mockLeaderRequest(false);
mockUserInput(false, null);
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
Peer leader = this.leader;
@@ -286,10 +285,9 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testUserRequestLeaderElectedAfterDelay() {
- mockLeaderRequest(false);
mockUserInput(false, null);
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
Peer leader = this.leader;
@@ -308,14 +306,13 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testUserRequestLeaderElectedAfterDelayWithFailedNode() {
- mockLeaderRequest(false);
mockUserInput(false, NODES.get(0));
CompletableFuture<Void> confUpdateFuture =
raftConfiguration.retryTimeout().update(TIMEOUT * 3);
assertThat(confUpdateFuture, willCompleteSuccessfully());
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
Peer leader = this.leader;
@@ -341,10 +338,9 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testUserRequestLeaderChanged() {
- mockLeaderRequest(false);
mockUserInput(false, null);
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
Peer leader = this.leader;
@@ -365,18 +361,18 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testSnapshotExecutionException() {
- mockSnapshotRequest(1);
+ mockSnapshotRequest(false);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.snapshot(new Peer("localhost-8082")),
willThrow(IgniteInternalException.class));
}
@Test
public void testSnapshotExecutionFailedResponse() {
- mockSnapshotRequest(0);
+ mockSnapshotRequest(true);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.snapshot(new Peer("localhost-8082")),
willThrow(RaftException.class));
}
@@ -391,7 +387,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.peers(), containsInAnyOrder(NODES.toArray()));
assertThat(service.learners(), is(empty()));
@@ -411,7 +407,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES.subList(0, 2),
true);
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 2));
assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
2).toArray()));
assertThat(service.learners(), is(empty()));
@@ -431,7 +427,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service = startRaftGroupService(NODES);
assertThat(service.peers(), containsInAnyOrder(NODES.toArray()));
assertThat(service.learners(), is(empty()));
@@ -469,7 +465,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES.subList(0, 2),
true);
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 2));
assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
2).toArray()));
assertThat(service.learners(), is(empty()));
@@ -505,9 +501,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
when(messagingService.invoke(any(ClusterNode.class),
any(TransferLeaderRequest.class), anyLong()))
.then(invocation ->
completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY, Status.OK())));
- mockLeaderRequest(false);
-
- RaftGroupService service = startRaftGroupService(NODES, true);
+ RaftGroupService service =
startRaftGroupServiceWithRefreshLeader(NODES);
assertEquals(NODES.get(0), service.leader());
@@ -525,7 +519,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1));
assertThat(service.peers(), containsInAnyOrder(NODES.subList(0,
1).toArray()));
assertThat(service.learners(), is(empty()));
@@ -549,7 +543,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1));
assertThat(service.addLearners(NODES.subList(1, 3)),
willCompleteSuccessfully());
@@ -575,7 +569,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES.subList(0, 1),
true);
+ RaftGroupService service = startRaftGroupService(NODES.subList(0, 1));
assertThat(service.addLearners(NODES.subList(1, 3)),
willCompleteSuccessfully());
@@ -592,7 +586,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
public void testGetLeaderRequest() {
mockLeaderRequest(false);
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
assertNull(service.leader());
@@ -609,7 +603,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testReadIndex() {
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
mockReadIndex(false);
CompletableFuture<Long> fut = service.readIndex();
@@ -621,7 +615,7 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
@Test
public void testReadIndexWithMessageSendTimeout() {
- RaftGroupService service = startRaftGroupService(NODES, false);
+ RaftGroupService service = startRaftGroupService(NODES);
mockReadIndex(true);
CompletableFuture<Long> fut = service.readIndex();
@@ -629,13 +623,13 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
assertThat(fut, willThrowFast(TimeoutException.class));
}
- private RaftGroupService startRaftGroupService(List<Peer> peers, boolean
getLeader) {
+ private RaftGroupService startRaftGroupService(List<Peer> peers) {
PeersAndLearners memberConfiguration =
PeersAndLearners.fromPeers(peers, Set.of());
var commandsSerializer = new
ThreadLocalOptimizedMarshaller(cluster.serializationRegistry());
CompletableFuture<RaftGroupService> service =
RaftGroupServiceImpl.start(
- TEST_GRP, cluster, FACTORY, raftConfiguration,
memberConfiguration, getLeader, executor, commandsSerializer
+ TEST_GRP, cluster, FACTORY, raftConfiguration,
memberConfiguration, false, executor, commandsSerializer
);
assertThat(service, willCompleteSuccessfully());
@@ -643,6 +637,15 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
return service.join();
}
+ private RaftGroupService startRaftGroupServiceWithRefreshLeader(List<Peer>
peers) {
+ RaftGroupService service = startRaftGroupService(peers);
+
+ mockLeaderRequest(false);
+ service.refreshLeader().join();
+
+ return service;
+ }
+
/**
* Mock read index request.
*/
@@ -738,10 +741,10 @@ public class RaftGroupServiceTest extends
BaseIgniteAbstractTest {
});
}
- private void mockSnapshotRequest(int mode) {
+ private void mockSnapshotRequest(boolean returnResponseWithError) {
when(messagingService.invoke(any(ClusterNode.class),
any(CliRequests.SnapshotRequest.class), anyLong()))
.then(invocation -> {
- if (mode == 0) {
+ if (returnResponseWithError) {
ErrorResponse response = FACTORY.errorResponse()
.errorCode(RaftError.UNKNOWN.getNumber())
.errorMsg("Failed to create a snapshot")
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 1148c48b07..1868b5b143 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -125,7 +125,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
private static final TestReplicaMessagesFactory
TEST_REPLICA_MESSAGES_FACTORY = new TestReplicaMessagesFactory();
- @InjectConfiguration("mock {retryTimeout=2000, responseTimeout=1000}")
+ @InjectConfiguration("mock {retryTimeout=4000, responseTimeout=1000}")
private RaftConfiguration raftConfiguration;
@InjectConfiguration
@@ -153,6 +153,8 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
/** List of services to have to close before the test will be completed. */
private final List<Closeable> servicesToClose = new ArrayList<>();
+ private Set<String> grpNodes = null;
+
private BiFunction<ReplicaRequest, UUID, CompletableFuture<ReplicaResult>>
replicaListener = null;
@BeforeEach
@@ -257,13 +259,19 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
() ->
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS)
));
}
+
+ grpNodes = chooseRandomNodes(3);
}
@AfterEach
public void afterTest() throws Exception {
+ stopReplicationGroup(GROUP_ID, grpNodes);
+
closeAll(servicesToClose);
replicaListener = null;
+
+ grpNodes = null;
}
/**
@@ -316,8 +324,6 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@Test
public void testNotificationToPlacementDriverAboutConnectivityProblem()
throws Exception {
- Set<String> grpNodes = chooseRandomNodes(3);
-
log.info("Replication group is based on {}", grpNodes);
var raftClientFut = createReplicationGroup(GROUP_ID, grpNodes);
@@ -367,8 +373,6 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@Test
public void testNotificationToPlacementDriverAboutMajorityLoss() throws
Exception {
- Set<String> grpNodes = chooseRandomNodes(3);
-
log.info("Replication group is based on {}", grpNodes);
var raftClientFut = createReplicationGroup(GROUP_ID, grpNodes);
@@ -429,8 +433,6 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
assertTrue(placementDriverNodeNames.contains(nodeName));
}
-
- stopReplicationGroup(GROUP_ID, grpNodes);
}
/**
diff --git
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
index eb1c89774e..192f69d542 100644
---
a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
+++
b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java
@@ -234,6 +234,8 @@ public abstract class AbstractTopologyAwareGroupServiceTest
extends IgniteAbstra
false
);
+ raftClientNoInitialNotify.refreshLeader().get();
+
List<NetworkAddress> clientAddress = findLocalAddresses(clientPort,
clientPort + 1);
assertEquals(1, clientAddress.size());
clusterServices.put(clientAddress.get(0), clientClusterService);
@@ -289,10 +291,10 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
assertNull(leaderRefNoInitialNotify.get());
// Forcing the leader change by stopping the actual leader.
- var raftServiceToStop = raftServers.remove(new
NetworkAddress("localhost", leader.address().port()));
- raftServiceToStop.stopRaftNodes(GROUP_ID);
+ var raftServerToStop = raftServers.remove(new
NetworkAddress("localhost", leader.address().port()));
+ raftServerToStop.stopRaftNodes(GROUP_ID);
ComponentContext componentContext = new ComponentContext();
- assertThat(raftServiceToStop.stopAsync(componentContext),
willCompleteSuccessfully());
+ assertThat(raftServerToStop.stopAsync(componentContext),
willCompleteSuccessfully());
afterNodeStop(leader.name());
@@ -305,16 +307,16 @@ public abstract class
AbstractTopologyAwareGroupServiceTest extends IgniteAbstra
assertThat(stopFuture, willCompleteSuccessfully());
// Waiting for the notifications to check.
- if (!leader.address().equals(new NetworkAddress("localhost",
PORT_BASE))) {
+ if (leader.address().port() != PORT_BASE) {
// leaderRef is updated through raftClient hosted on PORT_BASE,
thus if corresponding node was stopped (and it will be stopped
// if it occurred to be a leader) leaderRef won't be updated.
assertTrue(waitForCondition(() -> !leader.equals(leaderRef.get()),
WAIT_TIMEOUT_MILLIS));
}
assertTrue(waitForCondition(() ->
!leader.equals(leaderRefNoInitialNotify.get()), WAIT_TIMEOUT_MILLIS));
- log.info("New Leader: " + leaderRef.get());
+ log.info("New Leader: " + leaderRefNoInitialNotify.get());
- afterLeaderChange(leaderRef.get().name());
+ afterLeaderChange(leaderRefNoInitialNotify.get().name());
raftClientNoInitialNotify.refreshLeader().get();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
index e07ec149ff..85348fd70c 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoverySystemViewTest.java
@@ -26,8 +26,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.restart.RestartProofIgnite;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.PublicApiThreadingTable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -64,7 +71,11 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
@Test
void testGlobalPartitionStatesSystemView() {
- createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+ int partitionsCount = 2;
+
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
+
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
assertQuery(globalPartitionStatesSystemViewSql())
.returns(ZONE_NAME, TABLE_NAME, 0, AVAILABLE.name())
@@ -76,7 +87,11 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
void testLocalPartitionStatesSystemView() {
assertEquals(2, initialNodes());
- createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(), 2);
+ int partitionsCount = 2;
+
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, initialNodes(),
partitionsCount);
+
+ waitLeaderOnAllPartitions(TABLE_NAME, partitionsCount);
List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).sorted().collect(toList());
@@ -91,6 +106,31 @@ public class ItDisasterRecoverySystemViewTest extends
BaseSqlIntegrationTest {
.check();
}
+ /**
+ * waiting a leader for all partitions because later we expect that
partitions will be in AVAILABLE state. Without it there won't be
+ * log updating (see {@link LocalPartitionStateEnumWithLogIndex#of}) and
then in SYSTEM.*_PARTITION_STATES we will get UNAVAILABLE state
+ * instead of the desired one. That's why in {@link
#testGlobalPartitionStatesSystemView()} and
+ * {@link #testLocalPartitionStatesSystemView()} we must manually trigger
{@link RaftGroupService#refreshLeader()} that will lead
+ * partitions to the proper states.
+ *
+ * @param tableName A table whose partitions will do a leader refresh.
+ * @param partitionsCount Expected the table partitions count for
iterating over them.
+ */
+ private static void waitLeaderOnAllPartitions(String tableName, int
partitionsCount) {
+ IgniteImpl node = ((RestartProofIgnite)
CLUSTER.node(0)).unwrap(IgniteImpl.class);
+
+ TableImpl table = ((PublicApiThreadingTable)
node.tables().table(tableName)).unwrap(TableImpl.class);
+
+ int tableId = table.tableId();
+
+ IntStream.range(0, partitionsCount).forEach(partId -> assertThat(
+ node.replicaManager()
+ .replica(new TablePartitionId(tableId, partId))
+ .thenCompose(replica ->
replica.raftClient().refreshLeader()),
+ willCompleteSuccessfully()
+ ));
+ }
+
private static String globalPartitionStatesSystemViewSql() {
return "SELECT ZONE_NAME, TABLE_NAME, PARTITION_ID, STATE FROM
SYSTEM.GLOBAL_PARTITION_STATES";
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index e1447cf36c..42e56854c1 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -28,6 +28,7 @@ import static
org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -62,6 +63,7 @@ import
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -102,6 +104,7 @@ import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@@ -115,9 +118,12 @@ import org.junit.jupiter.params.provider.ValueSource;
@SuppressWarnings("resource")
@Timeout(90)
@ExtendWith(WorkDirectoryExtension.class)
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-23379")
class ItTableRaftSnapshotsTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG =
Loggers.forClass(ItTableRaftSnapshotsTest.class);
+ private static final int AWAIT_PRIMARY_REPLICA_SECONDS = 10;
+
/**
* Nodes bootstrap configuration pattern.
*
@@ -257,6 +263,20 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
causeLogTruncationOnSolePartitionLeader(0);
}
+ private void waitForPrimaryReplica() {
+ IgniteImpl node = unwrapIgniteImpl(cluster.node(0));
+
+ CompletableFuture<ReplicaMeta> primary =
node.placementDriver().awaitPrimaryReplica(
+ cluster.solePartitionId(),
+ node.clockService().now(),
+ AWAIT_PRIMARY_REPLICA_SECONDS,
+ TimeUnit.SECONDS);
+
+ assertThat(primary, willCompleteSuccessfully());
+
+ LOG.info("Lease is accepted by [nodeConsistentId={}].",
primary.join().getLeaseholder());
+ }
+
private void startAndInitCluster() {
cluster.startAndInit(3, IntStream.range(0, 3).toArray());
}
@@ -293,6 +313,8 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
+
+ waitForPrimaryReplica();
}
/**