This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dcb7f1c7e4 IGNITE-24124 Fix pool starvation in 
TableManager#beforeNodeStop() (#4981)
dcb7f1c7e4 is described below

commit dcb7f1c7e45b09f1b769d9dae0d217a8b18f640a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 27 11:11:42 2024 +0400

    IGNITE-24124 Fix pool starvation in TableManager#beforeNodeStop() (#4981)
---
 .../internal/table/distributed/TableManager.java   | 62 +++++++++++++---------
 1 file changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 90fc3b4bb9..b5762c6786 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1515,40 +1515,50 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         var futures = new ArrayList<CompletableFuture<Void>>(tables.size());
 
         for (TableImpl table : tables.values()) {
-            futures.add(runAsync(() -> {
-                Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
+            futures.add(
+                    supplyAsync(() -> tableStopFuture(table), 
ioExecutor).thenCompose(identity())
+            );
+        }
 
-                InternalTable internalTable = table.internalTable();
+        try {
+            allOf(futures.toArray(CompletableFuture[]::new)).get(30, 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            LOG.error("Unable to clean table resources", e);
+        }
+    }
 
-                stopping.add(() -> {
-                    var stopReplicaFutures = new 
CompletableFuture<?>[internalTable.partitions()];
+    private CompletableFuture<Void> tableStopFuture(TableImpl table) {
+        InternalTable internalTable = table.internalTable();
 
-                    for (int p = 0; p < internalTable.partitions(); p++) {
-                        TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
+        var stopReplicaFutures = new 
CompletableFuture<?>[internalTable.partitions()];
 
-                        stopReplicaFutures[p] = 
stopPartition(replicationGroupId, table);
-                    }
+        for (int p = 0; p < internalTable.partitions(); p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(table.tableId(), p);
 
-                    allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
-                });
+            stopReplicaFutures[p] = stopPartition(replicationGroupId, table);
+        }
 
-                stopping.add(internalTable.storage());
-                stopping.add(internalTable.txStateStorage());
-                stopping.add(internalTable);
+        CompletableFuture<Void> stopPartitionReplicasFuture = 
allOf(stopReplicaFutures).orTimeout(10, TimeUnit.SECONDS);
 
-                try {
-                    IgniteUtils.closeAllManually(stopping.build());
-                } catch (Throwable t) {
-                    LOG.error("Unable to stop table [name={}, tableId={}]", t, 
table.name(), table.tableId());
-                }
-            }, ioExecutor));
-        }
+        return stopPartitionReplicasFuture
+                .whenCompleteAsync((res, ex) -> {
+                    Stream.Builder<ManuallyCloseable> stopping = 
Stream.builder();
 
-        try {
-            allOf(futures.toArray(CompletableFuture[]::new)).get(30, 
TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            LOG.error("Unable to clean table resources", e);
-        }
+                    stopping.add(internalTable.storage());
+                    stopping.add(internalTable.txStateStorage());
+                    stopping.add(internalTable);
+
+                    try {
+                        IgniteUtils.closeAllManually(stopping.build());
+                    } catch (Throwable e) {
+                        throw new CompletionException(e);
+                    }
+                }, ioExecutor)
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Unable to stop table [name={}, 
tableId={}]", ex, table.name(), table.tableId());
+                    }
+                });
     }
 
     /**

Reply via email to