This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 918becac48 IGNITE-20525 Fix
ItBuildIndexTest#testChangePrimaryReplicaOnMiddleBuildIndex (#4112)
918becac48 is described below
commit 918becac48b95ae73980ff7de97223d0c43d438c
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri Jul 19 15:59:12 2024 +0300
IGNITE-20525 Fix
ItBuildIndexTest#testChangePrimaryReplicaOnMiddleBuildIndex (#4112)
---
modules/index/build.gradle | 1 +
.../ignite/internal/index/ItBuildIndexTest.java | 285 ++++++++++-----------
2 files changed, 139 insertions(+), 147 deletions(-)
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index a2b0a66c47..0693fbe8c8 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -74,6 +74,7 @@ dependencies {
integrationTestImplementation project(':ignite-storage-rocksdb')
integrationTestImplementation project(':ignite-page-memory')
integrationTestImplementation project(':ignite-partition-replicator')
+ integrationTestImplementation project(':ignite-placement-driver-api')
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-sql-engine'))
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index e873d3e895..29cc42012e 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -17,61 +17,61 @@
package org.apache.ignite.internal.index;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.TokenizedAssignments;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.NetworkMessage;
-import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.NodeUtils;
import org.apache.ignite.internal.table.TableViewInternal;
-import
org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.table.Table;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
/** Integration test of index building. */
public class ItBuildIndexTest extends BaseSqlIntegrationTest {
@@ -90,7 +90,7 @@ public class ItBuildIndexTest extends BaseSqlIntegrationTest {
}
@ParameterizedTest(name = "replicas : {0}")
- @MethodSource("replicas")
+ @ValueSource(ints = {1, 2, 3})
void testBuildIndexOnStableTopology(int replicas) throws Exception {
int partitions = 2;
@@ -111,31 +111,32 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20525")
void testChangePrimaryReplicaOnMiddleBuildIndex() throws Exception {
- prepareBuildIndexToChangePrimaryReplica();
+ IgniteImpl currentPrimary = prepareBuildIndexToChangePrimaryReplica();
- // Let's change the primary replica for partition 0.
- List<Peer> peers = collectPeers(0);
+ changePrimaryReplica(currentPrimary);
- Peer newPrimaryPeer = peers.get(1);
+ // Let's make sure that the indexes are eventually built.
+ checkIndexBuild(1, initialNodes(), INDEX_NAME);
+ }
- IgniteImpl newPrimary =
findByConsistentId(newPrimaryPeer.consistentId());
- assertNotNull(newPrimary);
+ private static IgniteImpl primaryReplica(ReplicationGroupId groupId) {
+ IgniteImpl node = CLUSTER.aliveNode();
- CompletableFuture<Integer> sendBuildIndexCommandFuture = new
CompletableFuture<>();
+ CompletableFuture<ReplicaMeta> primaryReplicaMetaFuture =
node.placementDriver()
+ .awaitPrimaryReplica(groupId, node.clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
-
newPrimary.dropMessages(waitSendBuildIndexCommand(sendBuildIndexCommandFuture,
false));
+ assertThat(primaryReplicaMetaFuture, willCompleteSuccessfully());
- RaftGroupService raftClient = getRaftClient(newPrimary, 0);
+ String primaryReplicaName =
primaryReplicaMetaFuture.join().getLeaseholder();
- assertThat(raftClient.transferLeadership(newPrimaryPeer),
willSucceedFast());
+ assertNotNull(primaryReplicaName);
- // Make sure that the index build command will be sent from the new
primary replica.
- assertThat(sendBuildIndexCommandFuture, willSucceedFast());
+ IgniteImpl primaryReplicaNode = findByConsistentId(primaryReplicaName);
- // Let's make sure that the indexes are eventually built.
- checkIndexBuild(1, initialNodes(), INDEX_NAME);
+ assertNotNull(primaryReplicaNode, String.format("Node %s not found",
primaryReplicaName));
+
+ return primaryReplicaNode;
}
/**
@@ -146,33 +147,49 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
* <li>Drop send {@link BuildIndexCommand} from the primary
replica.</li>
* </ul>
*/
- private void prepareBuildIndexToChangePrimaryReplica() throws Exception {
+ private IgniteImpl prepareBuildIndexToChangePrimaryReplica() throws
Exception {
int nodes = initialNodes();
assertThat(nodes, greaterThanOrEqualTo(2));
createAndPopulateTable(nodes, 1);
- List<Peer> peers = collectPeers(0);
- assertThat(peers, hasSize(nodes));
+ var tableGroupId = new TablePartitionId(tableId(TABLE_NAME), 0);
- IgniteImpl primary = findByConsistentId(peers.get(0).consistentId());
- assertNotNull(primary);
+ IgniteImpl primary = primaryReplica(tableGroupId);
CompletableFuture<Integer> sendBuildIndexCommandFuture = new
CompletableFuture<>();
+
primary.dropMessages(waitSendBuildIndexCommand(sendBuildIndexCommandFuture,
true));
createIndex(INDEX_NAME);
- Integer indexId = indexId(primary, INDEX_NAME);
- assertNotNull(indexId);
+ assertThat(sendBuildIndexCommandFuture, willBe(indexId(INDEX_NAME)));
- assertThat(sendBuildIndexCommandFuture, willBe(indexId));
+ return primary;
}
- private static int[] replicas() {
- return new int[]{1, 2, 3};
+ private static void changePrimaryReplica(IgniteImpl currentPrimary) throws
InterruptedException {
+ IgniteImpl nextPrimary = CLUSTER.runningNodes()
+ .filter(n -> n != currentPrimary)
+ .findAny()
+ .orElseThrow();
+
+ CompletableFuture<Integer> sendBuildIndexCommandFuture = new
CompletableFuture<>();
+
+
nextPrimary.dropMessages(waitSendBuildIndexCommand(sendBuildIndexCommandFuture,
false));
+
+ // Let's change the primary replica for partition 0.
+ NodeUtils.transferPrimary(
+ CLUSTER.runningNodes().collect(toList()),
+ new TablePartitionId(tableId(TABLE_NAME), 0),
+ nextPrimary.name()
+ );
+
+ // Make sure that the index build command will be sent from the new
primary replica.
+ assertThat(sendBuildIndexCommandFuture, willSucceedFast());
}
+ @SafeVarargs
private static String toValuesString(List<Object>... values) {
return Stream.of(values)
.peek(Assertions::assertNotNull)
@@ -197,7 +214,11 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
private static void createIndex(String indexName) throws Exception {
- sql(format("CREATE INDEX {} ON {} (i1)", indexName, TABLE_NAME));
+ // We execute this operation asynchronously, because some tests block
network messages, which makes the underlying code
+ // stuck with timeouts. We don't need to wait for the operation to
complete, as we wait for the necessary invariants further
+ // below.
+ CLUSTER.aliveNode().sql()
+ .executeAsync(null, format("CREATE INDEX {} ON {} (i1)",
indexName, TABLE_NAME));
waitForIndex(indexName);
}
@@ -214,36 +235,6 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
);
}
- private static RaftGroupService getRaftClient(Ignite node, int
partitionId) {
- TableViewInternal table = getTableView(node, TABLE_NAME);
- assertNotNull(table);
-
- return
table.internalTable().tableRaftService().partitionRaftGroupService(partitionId);
- }
-
- /**
- * Collects peers for a partition, the first in the list is primary.
- *
- * @param partitionId Partition ID.
- */
- private static List<Peer> collectPeers(int partitionId) {
- RaftGroupService raftGroupService = getRaftClient(CLUSTER.aliveNode(),
partitionId);
-
- List<Peer> peers = raftGroupService.peers();
- assertNotNull(peers);
-
- Peer leader = raftGroupService.leader();
- assertNotNull(leader);
-
- List<Peer> result = new ArrayList<>(peers);
-
- assertTrue(result.remove(leader));
-
- result.add(0, leader);
-
- return result;
- }
-
/**
* Creates a drop {@link BuildIndexCommand} predicate for the node and
also allows you to track when this command will be sent and for
* which index.
@@ -252,17 +243,15 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
* the command was sent.
* @param dropBuildIndexCommand {@code True} to drop {@link
BuildIndexCommand}.
*/
- private BiPredicate<String, NetworkMessage> waitSendBuildIndexCommand(
+ private static BiPredicate<String, NetworkMessage>
waitSendBuildIndexCommand(
CompletableFuture<Integer> sendBuildIndexCommandFuture,
boolean dropBuildIndexCommand
) {
- IgniteImpl node = CLUSTER.node(0);
- MessageSerializationRegistry serializationRegistry =
node.raftManager().service().serializationRegistry();
- var commandsMarshaller = new
PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL);
-
return (nodeConsistentId, networkMessage) -> {
if (networkMessage instanceof WriteActionRequest) {
- Command command =
commandsMarshaller.unmarshall(((WriteActionRequest) networkMessage).command());
+ Command command = ((WriteActionRequest)
networkMessage).deserializedCommand();
+
+ assertNotNull(command);
if (command instanceof BuildIndexCommand) {
sendBuildIndexCommandFuture.complete(((BuildIndexCommand)
command).indexId());
@@ -276,13 +265,12 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
private static void checkIndexBuild(int partitions, int replicas, String
indexName) throws Exception {
- // TODO: IGNITE-20525 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- Map<Integer, List<Ignite>> nodesWithBuiltIndexesByPartitionId =
waitForIndexBuild(TABLE_NAME, indexName);
+ Map<Integer, Set<String>> nodesWithBuiltIndexesByPartitionId =
waitForIndexBuild(TABLE_NAME, indexName);
// Check that the number of nodes with built indexes is equal to the
number of replicas.
assertEquals(partitions, nodesWithBuiltIndexesByPartitionId.size());
- for (Entry<Integer, List<Ignite>> entry :
nodesWithBuiltIndexesByPartitionId.entrySet()) {
+ for (Entry<Integer, Set<String>> entry :
nodesWithBuiltIndexesByPartitionId.entrySet()) {
assertEquals(
replicas,
entry.getValue().size(),
@@ -290,21 +278,37 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
);
}
- assertTrue(waitForCondition(() -> isIndexAvailable(INDEX_NAME),
10_000));
+ assertTrue(waitForCondition(() ->
isIndexAvailable(CLUSTER.aliveNode(), INDEX_NAME), 10_000));
waitForReadTimestampThatObservesMostRecentCatalog();
}
/**
- * Returns the index ID from the catalog, {@code null} if there is no
index.
+ * Returns the index ID from the catalog.
*
- * @param node Node.
* @param indexName Index name.
*/
- private static @Nullable Integer indexId(Ignite node, String indexName) {
- CatalogIndexDescriptor indexDescriptor = getIndexDescriptor(node,
indexName);
+ private static Integer indexId(String indexName) {
+ CatalogIndexDescriptor indexDescriptor =
getIndexDescriptor(CLUSTER.aliveNode(), indexName);
- return indexDescriptor == null ? null : indexDescriptor.id();
+ assertNotNull(indexDescriptor, String.format("Index %s not found",
indexName));
+
+ return indexDescriptor.id();
+ }
+
+ /**
+ * Returns the table ID from the catalog.
+ *
+ * @param tableName Table name.
+ */
+ private static int tableId(String tableName) {
+ IgniteImpl node = CLUSTER.aliveNode();
+
+ CatalogTableDescriptor tableDescriptor =
node.catalogManager().table(tableName, node.clock().nowLong());
+
+ assertNotNull(tableDescriptor, String.format("Table %s not found",
tableName));
+
+ return tableDescriptor.id();
}
/**
@@ -312,73 +316,81 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
*
* @param tableName Table name.
* @param indexName Index name.
- * @return Nodes on which the partition index was built.
+ * @return Node names on which the partition index was built.
*/
- private static Map<Integer, List<Ignite>> waitForIndexBuild(String
tableName, String indexName) {
- Map<Integer, List<Ignite>> partitionIdToNodes = new HashMap<>();
+ private static Map<Integer, Set<String>> waitForIndexBuild(String
tableName, String indexName) {
+ Map<Integer, Set<String>> partitionIdToNodes =
collectAssignments(tableName);
- CLUSTER.runningNodes().forEach(clusterNode -> {
- try {
- InternalTable internalTable = getTableView(clusterNode,
tableName).internalTable();
- CatalogIndexDescriptor indexDescriptor =
getIndexDescriptor(clusterNode, indexName);
-
- assertNotNull(indexDescriptor);
-
- for (int partitionId = 0; partitionId <
internalTable.partitions(); partitionId++) {
- // Excluding partitions on the node outside of replication
group
- // TODO: will be replaced with replica usage in
https://issues.apache.org/jira/browse/IGNITE-22218
- RaftGroupService raftGroupService;
- try {
- raftGroupService =
internalTable.tableRaftService().partitionRaftGroupService(partitionId);
- } catch (IgniteInternalException e) {
- continue;
- }
+ int indexId = indexId(indexName);
- List<Peer> allPeers = raftGroupService.peers();
+ CLUSTER.runningNodes().forEach(node -> {
+ try {
+ InternalTable internalTable = internalTable(node, tableName);
+ for (Entry<Integer, Set<String>> entry :
partitionIdToNodes.entrySet()) {
// Let's check if there is a node in the partition
assignments.
- if
(allPeers.stream().map(Peer::consistentId).noneMatch(clusterNode.name()::equals))
{
+ if (!entry.getValue().contains(node.name())) {
continue;
}
- IndexStorage index =
internalTable.storage().getIndex(partitionId, indexDescriptor.id());
+ IndexStorage index =
internalTable.storage().getIndex(entry.getKey(), indexId);
- assertTrue(waitForCondition(() ->
index.getNextRowIdToBuild() == null, 10, TimeUnit.SECONDS.toMillis(10)));
+ assertNotNull(index, String.format("No index %d for
partition %d", indexId, entry.getKey()));
- partitionIdToNodes.computeIfAbsent(partitionId, p -> new
ArrayList<>()).add(clusterNode);
+ assertTrue(waitForCondition(() ->
index.getNextRowIdToBuild() == null, 10, SECONDS.toMillis(10)));
}
} catch (InterruptedException e) {
- throw new RuntimeException("Node operation failed: node=" +
clusterNode.name(), e);
+ throw new RuntimeException("Node operation failed: node=" +
node.name(), e);
}
});
return partitionIdToNodes;
}
- /**
- * Returns table descriptor of the given table at the given node, or
{@code null} if no such table exists.
- *
- * @param node Node.
- * @param tableName Table name.
- */
- private static @Nullable CatalogTableDescriptor getTableDescriptor(Ignite
node, String tableName) {
- IgniteImpl nodeImpl = (IgniteImpl) node;
+ private static Map<Integer, Set<String>> collectAssignments(String
tableName) {
+ Map<Integer, Set<String>> partitionIdToNodes = new HashMap<>();
+
+ IgniteImpl node = CLUSTER.aliveNode();
+
+ InternalTable internalTable = internalTable(node, tableName);
+
+ PlacementDriver placementDriver = node.placementDriver();
- return TableTestUtils.getTable(nodeImpl.catalogManager(), tableName,
nodeImpl.clock().nowLong());
+ HybridTimestamp now = node.clock().now();
+
+ for (int partitionId = 0; partitionId < internalTable.partitions();
partitionId++) {
+ var tableGroupId = new TablePartitionId(internalTable.tableId(),
partitionId);
+
+ CompletableFuture<TokenizedAssignments> assignmentsFuture =
placementDriver.getAssignments(tableGroupId, now);
+
+ assertThat(assignmentsFuture, willCompleteSuccessfully());
+
+ Set<String> assignments = assignmentsFuture.join()
+ .nodes()
+ .stream()
+ .map(Assignment::consistentId)
+ .collect(toSet());
+
+ partitionIdToNodes.put(partitionId, assignments);
+ }
+
+ return partitionIdToNodes;
}
/**
- * Returns the table by name, {@code null} if absent.
+ * Returns the table by name.
*
* @param node Node.
* @param tableName Table name.
*/
- private static @Nullable TableViewInternal getTableView(Ignite node,
String tableName) {
+ private static InternalTable internalTable(Ignite node, String tableName) {
CompletableFuture<Table> tableFuture =
node.tables().tableAsync(tableName);
assertThat(tableFuture, willSucceedFast());
- return unwrapTableViewInternal(tableFuture.join());
+ TableViewInternal tableViewInternal =
unwrapTableViewInternal(tableFuture.join());
+
+ return tableViewInternal.internalTable();
}
/**
@@ -387,28 +399,7 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
* @param node Node.
* @param indexName Index name.
*/
- private static @Nullable CatalogIndexDescriptor getIndexDescriptor(Ignite
node, String indexName) {
- IgniteImpl nodeImpl = (IgniteImpl) node;
-
- return nodeImpl.catalogManager().aliveIndex(indexName,
nodeImpl.clock().nowLong());
- }
-
- /**
- * Returns {@code true} if index with the given name is available.
- *
- * @param indexName Index nane.
- * @return True if index is available or false if index does not exist or
is not available.
- */
- private static boolean isIndexAvailable(String indexName) {
- IgniteImpl ignite = CLUSTER.runningNodes()
- .findAny()
- .orElseThrow(() -> new IllegalStateException("No running
nodes"));
-
- CatalogManager catalogManager = ignite.catalogManager();
- HybridClock clock = ignite.clock();
-
- CatalogIndexDescriptor indexDescriptor =
catalogManager.aliveIndex(indexName, clock.nowLong());
-
- return indexDescriptor != null && indexDescriptor.status() ==
AVAILABLE;
+ private static @Nullable CatalogIndexDescriptor
getIndexDescriptor(IgniteImpl node, String indexName) {
+ return node.catalogManager().aliveIndex(indexName,
node.clock().nowLong());
}
}