This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 473b54814c IGNITE-19979 Table start might fail due to an outdated causality token (#2316) 473b54814c is described below commit 473b54814c067fd41956941bf73eec4c5b3eec8a Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Jul 14 12:32:13 2023 +0400 IGNITE-19979 Table start might fail due to an outdated causality token (#2316) --- .../internal/table/distributed/TableManager.java | 34 +++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index e7bfde3fd0..aa419d043f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -909,27 +909,27 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return allOf(futures); }; - return localPartsByTableIdVv.update(causalityToken, (previous, throwable) -> inBusyLock(busyLock, () -> { - return getOrCreatePartitionStorages(table, parts).thenApply(u -> { - var newValue = new HashMap<>(previous); + // NB: all vv.update() calls must be made from the synchronous part of the method (not in thenCompose()/etc!). + CompletableFuture<?> localPartsUpdateFuture = localPartsByTableIdVv.update(causalityToken, + (previous, throwable) -> inBusyLock(busyLock, () -> { + return getOrCreatePartitionStorages(table, parts).thenApply(u -> { + var newValue = new HashMap<>(previous); - newValue.put(tableId, parts); + newValue.put(tableId, parts); - return newValue; - }); - })).thenCompose(unused -> { - CompletableFuture<Void> updateAssignmentsFuture = tablesByIdVv.get(causalityToken).thenComposeAsync( - tablesById -> inBusyLock(busyLock, updateAssignmentsClosure), - ioExecutor - ); + return newValue; + }); + })); - return assignmentsUpdatedVv.update(causalityToken, (token, e) -> { - if (e != null) { - return failedFuture(e); - } + return assignmentsUpdatedVv.update(causalityToken, (token, e) -> { + if (e != null) { + return failedFuture(e); + } - return updateAssignmentsFuture; - }); + return localPartsUpdateFuture.thenCompose(unused -> + tablesByIdVv.get(causalityToken) + .thenComposeAsync(tablesById -> inBusyLock(busyLock, updateAssignmentsClosure), ioExecutor) + ); }); }