This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 361714948e IGNITE-19238 ItDataTypesTest and ItCreateTableDdlTest fixed (#1908) 361714948e is described below commit 361714948eed537c51e8222f945b5a95618db4a3 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Fri Apr 7 12:46:02 2023 +0300 IGNITE-19238 ItDataTypesTest and ItCreateTableDdlTest fixed (#1908) --- .../runner/app/ItIgniteNodeRestartTest.java | 5 ++-- .../internal/table/distributed/TableManager.java | 27 +++++++++------------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index a338b6b790..36c44b39d2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -782,8 +782,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { res2.close(); } - // TODO: Uncomment after IGNITE-18203 - /*stopNode(0); + stopNode(0); ignite1 = startNode(0); @@ -791,7 +790,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { ResultSet<SqlRow> res3 = session1.execute(null, sql); assertEquals(intRes, res3.next().intValue(0)); - }*/ + } } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index ea43056438..e7cef89955 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.configuration.ConfigurationChangeException; import org.apache.ignite.configuration.ConfigurationProperty; @@ -249,9 +250,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp /** * {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails, so we - * can stop resources associated with the table. + * can stop resources associated with the table or to clean up table resources on {@code TableManager#stop()}. */ - private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new ConcurrentHashMap<>(); + private final Map<UUID, TableImpl> pendingTables = new ConcurrentHashMap<>(); /** Resolver that resolves a node consistent ID to cluster node. */ private final Function<String, ClusterNode> clusterNodeResolver; @@ -382,12 +383,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new); - registry.accept(token -> { - tablesToStopInCaseOfError.clear(); - - return completedFuture(null); - }); - txStateStorageScheduledPool = Executors.newSingleThreadScheduledExecutor( NamedThreadFactory.create(nodeName, "tx-state-storage-scheduled-pool", LOG)); @@ -534,7 +529,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return failedFuture(new NodeStoppingException()); } - try { return createTableLocally( ctx.storageRevision(), @@ -995,13 +989,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener); metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener); - Map<UUID, TableImpl> tables = tablesByIdVv.latest(); + Map<UUID, TableImpl> tablesToStop = Stream.concat(tablesByIdVv.latest().entrySet().stream(), pendingTables.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v1)); - cleanUpTablesResources(tables); - - cleanUpTablesResources(tablesToStopInCaseOfError); - - tablesToStopInCaseOfError.clear(); + cleanUpTablesResources(tablesToStop); shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS); shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS); @@ -1156,7 +1147,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp }); })); - tablesToStopInCaseOfError.put(tblId, table); + pendingTables.put(tblId, table); + + tablesByIdVv.get(causalityToken).thenAccept(ignored -> inBusyLock(busyLock, () -> { + pendingTables.remove(tblId); + })); tablesByIdVv.get(causalityToken) .thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));