This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dcb7f1c7e4 IGNITE-24124 Fix pool starvation in
TableManager#beforeNodeStop() (#4981)
dcb7f1c7e4 is described below
commit dcb7f1c7e45b09f1b769d9dae0d217a8b18f640a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Dec 27 11:11:42 2024 +0400
IGNITE-24124 Fix pool starvation in TableManager#beforeNodeStop() (#4981)
---
.../internal/table/distributed/TableManager.java | 62 +++++++++++++---------
1 file changed, 36 insertions(+), 26 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 90fc3b4bb9..b5762c6786 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1515,40 +1515,50 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
var futures = new ArrayList<CompletableFuture<Void>>(tables.size());
for (TableImpl table : tables.values()) {
- futures.add(runAsync(() -> {
- Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
+ futures.add(
+ supplyAsync(() -> tableStopFuture(table),
ioExecutor).thenCompose(identity())
+ );
+ }
- InternalTable internalTable = table.internalTable();
+ try {
+ allOf(futures.toArray(CompletableFuture[]::new)).get(30,
TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ LOG.error("Unable to clean table resources", e);
+ }
+ }
- stopping.add(() -> {
- var stopReplicaFutures = new
CompletableFuture<?>[internalTable.partitions()];
+ private CompletableFuture<Void> tableStopFuture(TableImpl table) {
+ InternalTable internalTable = table.internalTable();
- for (int p = 0; p < internalTable.partitions(); p++) {
- TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
+ var stopReplicaFutures = new
CompletableFuture<?>[internalTable.partitions()];
- stopReplicaFutures[p] =
stopPartition(replicationGroupId, table);
- }
+ for (int p = 0; p < internalTable.partitions(); p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
- allOf(stopReplicaFutures).get(10, TimeUnit.SECONDS);
- });
+ stopReplicaFutures[p] = stopPartition(replicationGroupId, table);
+ }
- stopping.add(internalTable.storage());
- stopping.add(internalTable.txStateStorage());
- stopping.add(internalTable);
+ CompletableFuture<Void> stopPartitionReplicasFuture =
allOf(stopReplicaFutures).orTimeout(10, TimeUnit.SECONDS);
- try {
- IgniteUtils.closeAllManually(stopping.build());
- } catch (Throwable t) {
- LOG.error("Unable to stop table [name={}, tableId={}]", t,
table.name(), table.tableId());
- }
- }, ioExecutor));
- }
+ return stopPartitionReplicasFuture
+ .whenCompleteAsync((res, ex) -> {
+ Stream.Builder<ManuallyCloseable> stopping =
Stream.builder();
- try {
- allOf(futures.toArray(CompletableFuture[]::new)).get(30,
TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- LOG.error("Unable to clean table resources", e);
- }
+ stopping.add(internalTable.storage());
+ stopping.add(internalTable.txStateStorage());
+ stopping.add(internalTable);
+
+ try {
+ IgniteUtils.closeAllManually(stopping.build());
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ }, ioExecutor)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Unable to stop table [name={},
tableId={}]", ex, table.name(), table.tableId());
+ }
+ });
}
/**