This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new edb86b038e6 IGNITE-25443 Fix tablesPerZoneLocks releasing (#5872)
edb86b038e6 is described below

commit edb86b038e6b28533b335be27bb8e310c93dec31
Author: Slava Koptilin <[email protected]>
AuthorDate: Thu May 22 15:10:16 2025 +0300

    IGNITE-25443 Fix tablesPerZoneLocks releasing (#5872)
---
 .../internal/table/distributed/TableManager.java   | 79 ++++++++++++----------
 1 file changed, 44 insertions(+), 35 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91b3b148d1c..d979ce6ecd2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -712,37 +712,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 return readLockAcquisitionFuture.thenCompose(stamp -> {
                     Set<TableImpl> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
 
-                    int partitionIndex = zonePartitionId.partitionId();
-
-                    PartitionSet singlePartitionIdSet = 
PartitionSet.of(partitionIndex);
-
-                    CompletableFuture<?>[] futures = zoneTables.stream()
-                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                                return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
-                                        .thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
-                                            localPartsByTableId.compute(
-                                                    tbl.tableId(),
-                                                    (tableId, oldPartitionSet) 
-> extendPartitionSet(oldPartitionSet, partitionIndex)
-                                            );
-
-                                            
lowWatermark.getLowWatermarkSafe(lwm ->
-                                                    registerIndexesToTable(
-                                                            tbl,
-                                                            catalogService,
-                                                            
singlePartitionIdSet,
-                                                            tbl.schemaView(),
-                                                            lwm
-                                                    )
-                                            );
-
-                                            
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, 
parameters.onRecovery());
-                                        }), ioExecutor)
-                                        // If the table is already closed, 
it's not a problem (probably the node is stopping).
-                                        
.exceptionally(ignoreTableClosedException());
-                            }))
-                            .toArray(CompletableFuture[]::new);
-
-                    return allOf(futures).whenComplete((unused, t) -> 
zoneLock.unlockRead(stamp));
+                    return 
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, 
parameters.onRecovery())
+                            .whenComplete((unused, t) -> 
zoneLock.unlockRead(stamp));
                 });
             } catch (Throwable t) {
                 readLockAcquisitionFuture.whenComplete((stamp, ex) -> 
zoneLock.unlockRead(stamp));
@@ -752,6 +723,44 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         });
     }
 
+    private CompletableFuture<Void> 
createPartitionsAndLoadResourcesToZoneReplica(
+            ZonePartitionId zonePartitionId,
+            Set<TableImpl> zoneTables,
+            boolean onRecovery
+    ) {
+        int partitionIndex = zonePartitionId.partitionId();
+
+        PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
+
+        CompletableFuture<?>[] futures = zoneTables.stream()
+                .map(tbl -> inBusyLockAsync(busyLock, () -> {
+                    return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
+                            .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+                                localPartsByTableId.compute(
+                                        tbl.tableId(),
+                                        (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
+                                );
+
+                                lowWatermark.getLowWatermarkSafe(lwm ->
+                                        registerIndexesToTable(
+                                                tbl,
+                                                catalogService,
+                                                singlePartitionIdSet,
+                                                tbl.schemaView(),
+                                                lwm
+                                        )
+                                );
+
+                                
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, onRecovery);
+                            }), ioExecutor)
+                            // If the table is already closed, it's not a 
problem (probably the node is stopping).
+                            .exceptionally(ignoreTableClosedException());
+                }))
+                .toArray(CompletableFuture[]::new);
+
+        return allOf(futures);
+    }
+
     private static Function<Throwable, Void> ignoreTableClosedException() {
         return ex -> {
             if (hasCause(ex, TableClosedException.class)) {
@@ -780,8 +789,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         .map(this::stopTablePartitions)
                         .toArray(CompletableFuture[]::new);
 
-                return allOf(futures).whenComplete((v, t) -> 
zoneLock.unlockRead(stamp)).thenApply(v -> false);
-            });
+                return allOf(futures);
+            }).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v -> 
false);
         } catch (Throwable t) {
             readLockAcquisitionFuture.whenComplete((stamp, ex) -> 
zoneLock.unlockRead(stamp));
 
@@ -817,8 +826,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                     ioExecutor))
                             .toArray(CompletableFuture[]::new);
 
-                    return allOf(futures).whenComplete((v, t) -> 
zoneLock.unlockRead(stamp));
-                }).thenApply((unused) -> false);
+                    return allOf(futures);
+                }).whenComplete((v, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply((unused) 
-> false);
             });
         } catch (Throwable t) {
             readLockAcquisitionFuture.whenComplete((stamp, ex) -> 
zoneLock.unlockRead(stamp));

Reply via email to