This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new edb86b038e6 IGNITE-25443 Fix tablesPerZoneLocks releasing (#5872)
edb86b038e6 is described below
commit edb86b038e6b28533b335be27bb8e310c93dec31
Author: Slava Koptilin <[email protected]>
AuthorDate: Thu May 22 15:10:16 2025 +0300
IGNITE-25443 Fix tablesPerZoneLocks releasing (#5872)
---
.../internal/table/distributed/TableManager.java | 79 ++++++++++++----------
1 file changed, 44 insertions(+), 35 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 91b3b148d1c..d979ce6ecd2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -712,37 +712,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return readLockAcquisitionFuture.thenCompose(stamp -> {
Set<TableImpl> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
- int partitionIndex = zonePartitionId.partitionId();
-
- PartitionSet singlePartitionIdSet =
PartitionSet.of(partitionIndex);
-
- CompletableFuture<?>[] futures = zoneTables.stream()
- .map(tbl -> inBusyLockAsync(busyLock, () -> {
- return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRunAsync(() ->
inBusyLock(busyLock, () -> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet)
-> extendPartitionSet(oldPartitionSet, partitionIndex)
- );
-
-
lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(
- tbl,
- catalogService,
-
singlePartitionIdSet,
- tbl.schemaView(),
- lwm
- )
- );
-
-
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId,
parameters.onRecovery());
- }), ioExecutor)
- // If the table is already closed,
it's not a problem (probably the node is stopping).
-
.exceptionally(ignoreTableClosedException());
- }))
- .toArray(CompletableFuture[]::new);
-
- return allOf(futures).whenComplete((unused, t) ->
zoneLock.unlockRead(stamp));
+ return
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables,
parameters.onRecovery())
+ .whenComplete((unused, t) ->
zoneLock.unlockRead(stamp));
});
} catch (Throwable t) {
readLockAcquisitionFuture.whenComplete((stamp, ex) ->
zoneLock.unlockRead(stamp));
@@ -752,6 +723,44 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
});
}
+ private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
+ ZonePartitionId zonePartitionId,
+ Set<TableImpl> zoneTables,
+ boolean onRecovery
+ ) {
+ int partitionIndex = zonePartitionId.partitionId();
+
+ PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
+
+ CompletableFuture<?>[] futures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () -> {
+ return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
+ .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ localPartsByTableId.compute(
+ tbl.tableId(),
+ (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
+ );
+
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(
+ tbl,
+ catalogService,
+ singlePartitionIdSet,
+ tbl.schemaView(),
+ lwm
+ )
+ );
+
+
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, onRecovery);
+ }), ioExecutor)
+ // If the table is already closed, it's not a
problem (probably the node is stopping).
+ .exceptionally(ignoreTableClosedException());
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ }
+
private static Function<Throwable, Void> ignoreTableClosedException() {
return ex -> {
if (hasCause(ex, TableClosedException.class)) {
@@ -780,8 +789,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
.map(this::stopTablePartitions)
.toArray(CompletableFuture[]::new);
- return allOf(futures).whenComplete((v, t) ->
zoneLock.unlockRead(stamp)).thenApply(v -> false);
- });
+ return allOf(futures);
+ }).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply(v ->
false);
} catch (Throwable t) {
readLockAcquisitionFuture.whenComplete((stamp, ex) ->
zoneLock.unlockRead(stamp));
@@ -817,8 +826,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
ioExecutor))
.toArray(CompletableFuture[]::new);
- return allOf(futures).whenComplete((v, t) ->
zoneLock.unlockRead(stamp));
- }).thenApply((unused) -> false);
+ return allOf(futures);
+ }).whenComplete((v, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead)).thenApply((unused)
-> false);
});
} catch (Throwable t) {
readLockAcquisitionFuture.whenComplete((stamp, ex) ->
zoneLock.unlockRead(stamp));