ivanzlenko commented on code in PR #6693:
URL: https://github.com/apache/ignite-3/pull/6693#discussion_r2402027758
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java:
##########
@@ -213,6 +223,137 @@ private static void changePrimaryReplica(IgniteImpl
currentPrimary) throws Inter
assertThat(sendBuildIndexCommandFuture, willSucceedFast());
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
+ void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() throws Exception
{
+ createTable(1, 1);
+
+ disableWriteIntentSwitchExecution();
+
+ // Create and abandon a transaction.
+ int txCoordinatorOrdinal = 2;
+ Transaction tx =
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+ insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new
Object[]{1, 1});
+
+ CLUSTER.restartNode(txCoordinatorOrdinal);
+
+ createIndex(INDEX_NAME);
+ assertTrue(
+ waitForCondition(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME), 10_000),
+ "Index did not become available in time"
+ );
+
+ verifyNoNodesHaveAnythingInIndex();
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
+ void
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
throws Exception {
+ createTable(1, 1);
+
+ // Both disable write intent switch execution and track when we start
waiting for transactions to finish before index build.
+ CompletableFuture<Void> startedWaitForPreIndexTxsToFinish = new
CompletableFuture<>();
+ CLUSTER.nodes().forEach(node -> {
+ unwrapIgniteImpl(node).dropMessages((recipientId, message) -> {
+ if (message instanceof WriteIntentSwitchReplicaRequest) {
+ return true;
+ }
+
+ if (message instanceof
IsNodeFinishedRwTransactionsStartedBeforeRequest) {
+ startedWaitForPreIndexTxsToFinish.complete(null);
+ }
+
+ return false;
+ });
+ });
+
+ // Create and abandon a transaction.
+ int txCoordinatorOrdinal = 2;
+ Transaction tx =
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+ insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new
Object[]{1, 1});
+
+ createIndex(INDEX_NAME);
+ assertThat(startedWaitForPreIndexTxsToFinish,
willCompleteSuccessfully());
+
+ // The index pre-build wait has started, let's restart the coordinator
to abandon the transaction and abruptly terminate
+ // the pre-build wait.
+ CLUSTER.restartNode(txCoordinatorOrdinal);
+
+ assertTrue(
+ waitForCondition(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME), 30_000),
+ "Index did not become available in time"
+ );
+
+ verifyNoNodesHaveAnythingInIndex();
+ }
+
+ private void verifyNoNodesHaveAnythingInIndex() {
+ int nodesHavingSomethingInIndex = 0;
+ for (int nodeIndex = 0; nodeIndex < initialNodes(); nodeIndex++) {
+ IgniteImpl ignite = unwrapIgniteImpl(node(nodeIndex));
+
+ CatalogIndexDescriptor indexDescriptor =
indexDescriptor(INDEX_NAME, ignite);
+ SortedIndexStorage indexStorage = (SortedIndexStorage)
indexStorage(indexDescriptor, 0, ignite);
+
+ if (indexStorage != null) {
+ try (Cursor<IndexRow> indexRows =
indexStorage.readOnlyScan(null, null, 0)) {
+ if (indexRows.hasNext()) {
+ nodesHavingSomethingInIndex++;
+ }
+ }
+ }
+ }
+
+ assertThat("Nothing should have been put to the index",
nodesHavingSomethingInIndex, is(0));
+ }
+
+ private static void disableWriteIntentSwitchExecution() {
+ CLUSTER.runningNodes().forEach(ignite -> {
+ unwrapIgniteImpl(ignite).dropMessages((recipientId, message) ->
message instanceof WriteIntentSwitchReplicaRequest);
+ });
+ }
+
+ private static CatalogIndexDescriptor indexDescriptor(String indexName,
IgniteImpl ignite) {
+ return getIndexStrict(ignite.catalogManager(), indexName,
ignite.clock().nowLong());
+ }
+
+ private static @Nullable IndexStorage indexStorage(CatalogIndexDescriptor
indexDescriptor, int partitionId, IgniteImpl ignite) {
+ TableViewInternal tableViewInternal =
tableViewInternal(indexDescriptor.tableId(), ignite);
+
+ int indexId = indexDescriptor.id();
+
+ IndexStorage indexStorage;
+ try {
+ indexStorage =
tableViewInternal.internalTable().storage().getIndex(partitionId, indexId);
+ } catch (StorageException e) {
+ if (e.getMessage().contains("Partition ID " + partitionId + " does
not exist")) {
Review Comment:
While we could not find use for this exception in our code base, but having
different exception vastly improve easiness of working with application logs in
the future. But okay.
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java:
##########
@@ -230,19 +380,19 @@ private static void createAndPopulateTable(int replicas,
int partitions) {
"CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) ZONE {}",
TABLE_NAME, ZONE_NAME
));
-
- sql(format(
- "INSERT INTO {} VALUES {}",
- TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2),
List.of(3, 3), List.of(4, 4), List.of(5, 5))
- ));
}
- private static void createIndex(String indexName) throws Exception {
+ private void createIndex(String indexName) throws Exception {
// We execute this operation asynchronously, because some tests block
network messages, which makes the underlying code
// stuck with timeouts. We don't need to wait for the operation to
complete, as we wait for the necessary invariants further
// below.
CLUSTER.aliveNode().sql()
- .executeAsync(null, format("CREATE INDEX {} ON {} (i1)",
indexName, TABLE_NAME));
+ .executeAsync(null, format("CREATE INDEX {} ON {} (i1)",
indexName, TABLE_NAME))
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ log.error("Failed to create index", ex);
Review Comment:
It is at least not an obvious pattern to use a logger. And one day someone
will add private static final IgniteLogger into this class. But okay.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]