This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 2d597d6738 IGNITE-20635 Cleanup code wrt IGNITE-18733 mentions (#2686) 2d597d6738 is described below commit 2d597d6738781c7e3a08e2a3231c49ccf5cc1682 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Oct 13 16:32:30 2023 +0400 IGNITE-20635 Cleanup code wrt IGNITE-18733 mentions (#2686) --- .../Table/SchemaSynchronizationTest.cs | 40 -------------- .../runner/app/ItIgniteNodeRestartTest.java | 62 ++++++++++++++-------- .../sql/engine/ClusterPerClassIntegrationTest.java | 1 - .../internal/sql/engine/ItBuildIndexTest.java | 6 +-- 4 files changed, 41 insertions(+), 68 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs index bdcc689cb9..c7abd1a87d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs @@ -73,7 +73,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase // Modify table, insert data - client will use old schema, receive error, retry with new schema. // The process is transparent for the user: updated schema is in effect immediately. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} DROP COLUMN NAME"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); var rec2 = new IgniteTuple { @@ -132,7 +131,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase // then force reload schema and retry. // The process is transparent for the user: updated schema is in effect immediately. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN NAME VARCHAR NOT NULL DEFAULT 'name2'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); var rec2 = new IgniteTuple { @@ -181,7 +179,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase // Modify table, insert data - client will use old schema, receive error, retry with new schema. // The process is transparent for the user: updated schema is in effect immediately. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); switch (testMode) { @@ -228,7 +225,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase await view.InsertAsync(null, rec); await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); var pocoView = table.GetRecordView<Poco>(); var poco = new Poco(1, string.Empty); @@ -279,7 +275,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase await view.InsertAsync(null, rec); await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); var pocoView = table.GetRecordView<Poco>(); @@ -328,7 +323,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase await view.InsertAsync(null, rec); await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN NAME VARCHAR NOT NULL DEFAULT 'name1'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); var pocoView = table.GetKeyValueView<int, string>(); var res = await pocoView.GetAsync(null, 1); @@ -373,7 +367,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase // Update schema. // New schema has a new column with a default value, so it is not required to provide it in the streamed data. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN VAL varchar DEFAULT 'FOO'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); for (int i = 10; i < 20; i++) { @@ -395,7 +388,6 @@ public class SchemaSynchronizationTest : IgniteTestsBase // Update schema. await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN VAL varchar DEFAULT 'FOO'"); - await WaitForNewSchemaOnAllNodes(TestTableName, 2); // Stream data with new schema. Client does not yet know about the new schema, // but unmapped column exception will trigger schema reload. @@ -410,37 +402,5 @@ public class SchemaSynchronizationTest : IgniteTestsBase Assert.AreEqual("BAR", res2.Value["VAL"]); } - private async Task WaitForNewSchemaOnAllNodes(string tableName, int schemaVer, int timeoutMs = 5000) - { - // TODO IGNITE-18733, IGNITE-18449: remove this workaround when issues are fixed. - // Currently new schema version is not immediately available on all nodes. - // Use separate client to check schema sync without affecting the system under test. - var configs = Client.Configuration.Endpoints.Select(e => new IgniteClientConfiguration(e)).ToList(); - - foreach (var cfg in configs) - { - using var client = await IgniteClient.StartAsync(cfg); - var table = await client.Tables.GetTableAsync(tableName); - var tableImpl = (Table)table!; - var sw = Stopwatch.StartNew(); - - while (true) - { - var schema = await tableImpl.GetSchemaAsync(Apache.Ignite.Internal.Table.Table.SchemaVersionForceLatest); - if (schema.Version >= schemaVer) - { - break; - } - - if (sw.Elapsed > TimeSpan.FromMilliseconds(timeoutMs)) - { - Assert.Fail($"Schema version {schema.Version} is not available on node {cfg.Endpoints[0]} after {timeoutMs}ms"); - } - - await Task.Delay(50); - } - } - } - private record Poco(int Id, string Name); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 93df66ac85..3344d97704 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -17,8 +17,9 @@ package org.apache.ignite.internal.runner.app; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry; @@ -59,6 +60,9 @@ import org.apache.ignite.internal.BaseIgniteRestartTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.events.CatalogEvent; +import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; @@ -637,11 +641,11 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { int intRes; try (Session session1 = ignite1.sql().createSession(); Session session2 = ignite2.sql().createSession()) { - session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME + "(id)"); + createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1); - waitForIndex(List.of(ignite1, ignite2), "idx1"); + session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME + "(id)"); - createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1); + waitForIndexToBecomeAvailable(List.of(ignite1, ignite2), "idx1"); ResultSet<SqlRow> plan = session1.execute(null, "EXPLAIN PLAN FOR " + sql); @@ -743,7 +747,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * Restarts the node which stores some data. */ @Test - public void nodeWithDataTest() throws InterruptedException { + public void nodeWithDataTest() { IgniteImpl ignite = startNode(0); createTableWithData(List.of(ignite), TABLE_NAME, 1); @@ -759,7 +763,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * Restarts the node which stores some data. */ @ParameterizedTest - @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") @ValueSource(booleans = {true, false}) public void metastorageRecoveryTest(boolean useSnapshot) throws InterruptedException { List<IgniteImpl> nodes = startNodes(2); @@ -881,7 +884,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * Starts two nodes and checks that the data are storing through restarts. Nodes restart in the same order when they started at first. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") public void testTwoNodesRestartDirect() throws InterruptedException { twoNodesRestart(true); } @@ -890,7 +892,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * Starts two nodes and checks that the data are storing through restarts. Nodes restart in reverse order when they started at first. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") public void testTwoNodesRestartReverse() throws InterruptedException { twoNodesRestart(false); } @@ -900,7 +901,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * * @param directOrder When the parameter is true, nodes restart in direct order, otherwise they restart in reverse order. */ - private void twoNodesRestart(boolean directOrder) throws InterruptedException { + private void twoNodesRestart(boolean directOrder) { List<IgniteImpl> nodes = startNodes(2); createTableWithData(nodes, TABLE_NAME, 2); @@ -958,7 +959,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { */ @Test @Disabled("https://issues.apache.org/jira/browse/IGNITE-20137") - public void testOneNodeRestartWithGap() throws InterruptedException { + public void testOneNodeRestartWithGap() { IgniteImpl ignite = startNode(0); startNode(1); @@ -1011,7 +1012,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * Checks that a cluster is able to restart when some changes were made in configuration. */ @Test - public void testRestartDiffConfig() throws InterruptedException { + public void testRestartDiffConfig() { List<IgniteImpl> ignites = startNodes(2); createTableWithData(ignites, TABLE_NAME, 2); @@ -1039,7 +1040,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * The test for node restart when there is a gap between the node local configuration and distributed configuration. */ @Test - public void testCfgGapWithoutData() throws InterruptedException { + public void testCfgGapWithoutData() { List<IgniteImpl> nodes = startNodes(3); createTableWithData(nodes, TABLE_NAME, nodes.size()); @@ -1120,8 +1121,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { * The test for node restart when there is a gap between the node local configuration and distributed configuration. */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") - public void testCfgGap() throws InterruptedException { + public void testCfgGap() { List<IgniteImpl> nodes = startNodes(4); createTableWithData(nodes, "t1", nodes.size()); @@ -1236,15 +1236,31 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { } } - private void waitForIndex(Collection<IgniteImpl> nodes, String indexName) throws InterruptedException { - // FIXME: Wait for the index to be created on all nodes, - // this is a workaround for https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to the PK index. - assertTrue(waitForCondition( - () -> nodes.stream() - .map(nodeImpl -> nodeImpl.catalogManager().index(indexName.toUpperCase(), nodeImpl.clock().nowLong())) - .allMatch(Objects::nonNull), - TIMEOUT_MILLIS - )); + private void waitForIndexToBecomeAvailable(Collection<IgniteImpl> nodes, String indexName) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(nodes.size()); + + nodes.forEach(node -> node.catalogManager().listen(CatalogEvent.INDEX_AVAILABLE, (event, ex) -> { + if (ex != null) { + return failedFuture(ex); + } + + MakeIndexAvailableEventParameters availableEvent = (MakeIndexAvailableEventParameters) event; + + CatalogIndexDescriptor index = node.catalogManager().index(availableEvent.indexId(), event.catalogVersion()); + + assertNotNull(index, "Cannot find an index by ID=" + availableEvent.indexId()); + + if (index.name().equalsIgnoreCase(indexName)) { + // That's our index. + latch.countDown(); + + return completedFuture(true); + } + + return completedFuture(false); + })); + + assertTrue(latch.await(10, TimeUnit.SECONDS)); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java index 8ed419cb29..f91270958a 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java @@ -450,7 +450,6 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe protected static Map<Integer, List<Ignite>> waitForIndexBuild(String tableName, String indexName) throws Exception { Map<Integer, List<Ignite>> partitionIdToNodes = new HashMap<>(); - // TODO: IGNITE-18733 We are waiting for the synchronization of schemes for (Ignite clusterNode : CLUSTER_NODES) { TableImpl tableImpl = getTableImpl(clusterNode, tableName); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java index 2425e508af..bb0da64ae2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java @@ -188,13 +188,11 @@ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { } /** - * Waits for all nodes in the cluster to have the given index in the configuration. + * Waits for all nodes in the cluster to have the given index in the Catalog. * - * @param indexName An index. + * @param indexName Name of an index to wait for. */ private static void waitForIndex(String indexName) throws InterruptedException { - // FIXME: Wait for the index to be created on all nodes, - // this is a workaround for https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to the index. assertFalse(nullOrEmpty(CLUSTER_NODES)); assertTrue(waitForCondition( () -> CLUSTER_NODES.stream().map(node -> getIndexDescriptor(node, indexName)).allMatch(Objects::nonNull),