This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fa238da3a IGNITE-16863 Implement compound VersisionedValue
fa238da3a is described below

commit fa238da3a209344b13deddd4152b1209b3a93921
Author: Denis Chudov <moongll...@gmail.com>
AuthorDate: Fri May 13 10:04:33 2022 +0300

    IGNITE-16863 Implement compound VersisionedValue
---
 .../ignite/internal/causality/VersionedValue.java  | 437 ++++++++++++---------
 .../org/apache/ignite/lang/IgniteTriConsumer.java  |  57 +++
 .../internal/causality/VersionedValueTest.java     | 352 ++++++++++++++---
 .../runner/app/ItIgniteNodeRestartTest.java        |  12 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  10 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |   5 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 113 +++---
 .../internal/sql/engine/StopCalciteModuleTest.java |  21 +-
 .../sql/engine/exec/MockedStructuresTest.java      |  19 +-
 .../engine/exec/schema/SqlSchemaManagerTest.java   |  21 +-
 .../internal/table/distributed/TableManager.java   |  81 ++--
 .../ignite/internal/table/TableManagerTest.java    |  11 +-
 12 files changed, 784 insertions(+), 355 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index 9e7b8d8ed..4109f2a00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -17,20 +17,28 @@
 
 package org.apache.ignite.internal.causality;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.lang.IgniteTriConsumer;
 
 /**
  * Parametrized type to store several versions of the value.
@@ -45,14 +53,11 @@ public class VersionedValue<T> {
     /** Default history size. */
     private static final int DEFAULT_HISTORY_SIZE = 2;
 
-    /** Last applied causality token. */
-    private volatile long actualToken = NOT_INITIALIZED;
-
     /** Size of stored history. */
     private final int historySize;
 
-    /** Closure applied on storage revision update. */
-    private final BiConsumer<VersionedValue<T>, Long> 
storageRevisionUpdateCallback;
+    /** List of completion listeners, see {@link 
#whenComplete(IgniteTriConsumer)}. */
+    private final List<IgniteTriConsumer<Long, T, Throwable>> 
completionListeners = new CopyOnWriteArrayList<>();
 
     /** Versioned value storage. */
     private final ConcurrentNavigableMap<Long, CompletableFuture<T>> history = 
new ConcurrentSkipListMap<>();
@@ -63,101 +68,88 @@ public class VersionedValue<T> {
      */
     private final ReadWriteLock trimHistoryLock = new ReentrantReadWriteLock();
 
-    /** Temporary value for {@link #update(long, Function, Function)}. */
-    private volatile T tempValue = null;
-
-    /** Whether {@link #update(long, Function, Function)} was called since the 
last revision update. */
-    private volatile boolean isUpdating = false;
-
-    /** Update mutex. */
-    private final Object updateMutex = new Object();
-
     /** Initial future. The future will be completed when {@link 
VersionedValue} sets a first value. */
     private final CompletableFuture<T> initFut = new CompletableFuture<>();
 
     /** The supplier may provide a value which will used as a default. */
     private final Supplier<T> defaultValSupplier;
 
+    /** Update mutex. */
+    private final Object updateMutex = new Object();
+
     /** Value that can be used as default. */
-    private volatile T defaultVal;
+    private final AtomicReference<T> defaultValRef;
+
+    /** Last applied causality token. */
+    private volatile long actualToken = NOT_INITIALIZED;
+
+    /**
+     * Future that will be completed after all updates over the value in 
context of current causality token will be performed.
+     * This {@code updaterFuture} is {@code null} if no updates in context of 
current causality token have been initiated.
+     * See {@link #update(long, BiFunction)}.
+     */
+    private volatile CompletableFuture<T> updaterFuture = null;
 
     /**
      * Constructor.
      *
-     * @param storageRevisionUpdateCallback Closure applied on storage 
revision update (see {@link #onStorageRevisionUpdate(long)}).
      * @param observableRevisionUpdater     A closure intended to connect this 
VersionedValue with a revision updater, that this
      *                                      VersionedValue should be able to 
listen to, for receiving storage revision updates.
      *                                      This closure is called once on a 
construction of this VersionedValue and accepts a
-     *                                      {@code Consumer<Long>} that should 
be called on every update of storage revision as a
-     *                                      listener. IMPORTANT: Revision 
update shouldn't happen concurrently with {@link #set(long, T)}
-     *                                      operations.
+     *                                      {@code Function<Long, 
CompletableFuture<?>>} that should be called on every update of
+     *                                      storage revision as a listener. 
IMPORTANT: Revision update shouldn't happen
+     *                                      concurrently with {@link 
#complete(long, T)} operations.
      * @param historySize                   Size of the history of changes to 
store, including last applied token.
-     * @param defaultVal                    Supplier of the default value, 
that is used on {@link #update(long, Function, Function)} to
-     *                                      evaluate the default value if the 
value is not initialized yet.
+     * @param defaultVal                    Supplier of the default value, 
that is used on {@link #update(long, BiFunction)} to
+     *                                      evaluate the default value if the 
value is not initialized yet. It is not guaranteed to
+     *                                      execute only once.
      */
     public VersionedValue(
-            @Nullable BiConsumer<VersionedValue<T>, Long> 
storageRevisionUpdateCallback,
-            Consumer<Consumer<Long>> observableRevisionUpdater,
+            Consumer<Function<Long, CompletableFuture<?>>> 
observableRevisionUpdater,
             int historySize,
             Supplier<T> defaultVal
     ) {
-        this.storageRevisionUpdateCallback = storageRevisionUpdateCallback;
-
         this.historySize = historySize;
 
         this.defaultValSupplier = defaultVal;
 
-        observableRevisionUpdater.accept(this::onStorageRevisionUpdate);
+        this.defaultValRef = defaultValSupplier == null ? null : new 
AtomicReference<>();
+
+        observableRevisionUpdater.accept(this::completeOnRevision);
     }
 
     /**
      * Constructor.
      *
-     * @param observableRevisionUpdater A closure intended to connect this 
VersionedValue with a revision updater, that this
-     *                                  VersionedValue should be able to 
listen to, for receiving storage revision updates.
-     *                                  This closure is called once on a 
construction of this VersionedValue and accepts a
-     *                                  {@code Consumer<Long>} that should be 
called on every update of storage revision as a
-     *                                  listener. IMPORTANT: Revision update 
shouldn't happen concurrently with {@link #set(long, T)}
-     *                                  operations.
-     * @param defaultVal                Supplier of the default value, that is 
used on {@link #update(long, Function, Function)} to
-     *                                  evaluate the default value if the 
value is not initialized yet.
+     * @param observableRevisionUpdater     A closure intended to connect this 
VersionedValue with a revision updater, that this
+     *                                      VersionedValue should be able to 
listen to, for receiving storage revision updates.
+     *                                      This closure is called once on a 
construction of this VersionedValue and accepts a
+     *                                      {@code Function<Long, 
CompletableFuture<?>>} that should be called on every update of
+     *                                      storage revision as a listener. 
IMPORTANT: Revision update shouldn't happen
+     *                                      concurrently with {@link 
#complete(long, T)} operations.
+     * @param defaultVal                    Supplier of the default value, 
that is used on {@link #update(long, BiFunction)} to
+     *                                      evaluate the default value if the 
value is not initialized yet. It is not guaranteed to
+     *                                      execute only once.
      */
     public VersionedValue(
-            Consumer<Consumer<Long>> observableRevisionUpdater,
+            Consumer<Function<Long, CompletableFuture<?>>> 
observableRevisionUpdater,
             Supplier<T> defaultVal
     ) {
-        this(null, observableRevisionUpdater, DEFAULT_HISTORY_SIZE, 
defaultVal);
+        this(observableRevisionUpdater, DEFAULT_HISTORY_SIZE, defaultVal);
     }
 
     /**
-     * Constructor with default history size that equals 2. See {@link 
#VersionedValue(BiConsumer, Consumer, int, Supplier)}.
+     * Constructor with default history size that equals 2. See {@link 
#VersionedValue(Consumer, int, Supplier)}.
      *
-     * @param storageRevisionUpdateCallback Closure applied on storage 
revision update (see {@link #onStorageRevisionUpdate(long)}.
      * @param observableRevisionUpdater     A closure intended to connect this 
VersionedValue with a revision updater, that this
      *                                      VersionedValue should be able to 
listen to, for receiving storage revision updates.
      *                                      This closure is called once on a 
construction of this VersionedValue and accepts a
-     *                                      {@code Consumer<Long>} that should 
be called on every update of storage revision as a
-     *                                      listener. IMPORTANT: Revision 
update shouldn't happen concurrently with
-     *                                      {@link #set(long, T)} and {@link 
#update(long, Function, Function)} operations.
+     *                                      {@code Function<Long, 
CompletableFuture<?>>} that should be called on every update of
+     *                                      storage revision as a listener. 
IMPORTANT: Revision update shouldn't happen
+     *                                      concurrently with {@link 
#complete(long, T)} operations.
      */
-    public VersionedValue(
-            @Nullable BiConsumer<VersionedValue<T>, Long> 
storageRevisionUpdateCallback,
-            Consumer<Consumer<Long>> observableRevisionUpdater
-    ) {
-        this(storageRevisionUpdateCallback, observableRevisionUpdater, 
DEFAULT_HISTORY_SIZE, null);
-    }
-
-    /**
-     * Constructor with default history size that equals 2 and no closure. See 
{@link #VersionedValue(BiConsumer, Consumer, int, Supplier)}.
-     *
-     * @param observableRevisionUpdater A closure intended to connect this 
VersionedValue with a revision updater, that this VersionedValue
-     *                                  should be able to listen to, for 
receiving storage revision updates. This closure is called once on
-     *                                  a construction of this VersionedValue 
and accepts a {@code Consumer<Long>} that should be called
-     *                                  on every update of storage revision as 
a listener. IMPORTANT: Revision update shouldn't happen
-     *                                  concurrently with {@link #set(long, 
T)} operations.
-     */
-    public VersionedValue(Consumer<Consumer<Long>> observableRevisionUpdater) {
-        this(null, observableRevisionUpdater);
+    public VersionedValue(Consumer<Function<Long, CompletableFuture<?>>> 
observableRevisionUpdater) {
+        this(observableRevisionUpdater, DEFAULT_HISTORY_SIZE, null);
     }
 
     /**
@@ -190,18 +182,20 @@ public class VersionedValue<T> {
      * @return The future.
      */
     private CompletableFuture<T> getInternal(long causalityToken) {
+        long actualToken0 = this.actualToken;
+
         if (history.floorEntry(causalityToken) == null) {
-            throw new OutdatedTokenException(causalityToken, actualToken, 
historySize);
+            throw new OutdatedTokenException(causalityToken, actualToken0, 
historySize);
         }
 
-        if (causalityToken <= actualToken) {
+        if (causalityToken <= actualToken0) {
             return getValueForPreviousToken(causalityToken);
         }
 
         trimHistoryLock.readLock().lock();
 
         try {
-            if (causalityToken <= actualToken) {
+            if (causalityToken <= actualToken0) {
                 return getValueForPreviousToken(causalityToken);
             }
 
@@ -225,31 +219,31 @@ public class VersionedValue<T> {
             }
         }
 
-        synchronized (updateMutex) {
-            return getDefault();
-        }
+        return getDefault();
     }
 
     /**
-     * Returns a default value.
+     * Creates (if needed) and returns a default value.
      *
      * @return The value.
      */
     private T getDefault() {
-        // It is thread safe, as it's protected by either updateMutex or 
exclusiveness of #onStorageRevisionUpdate
-        // in all usages of getDefault().
-        if (defaultValSupplier != null && defaultVal == null) {
-            defaultVal = defaultValSupplier.get();
+        if (defaultValSupplier != null && defaultValRef.get() == null) {
+            T defaultVal = defaultValSupplier.get();
+
+            assert defaultVal != null : "Default value can't be null.";
+
+            defaultValRef.compareAndSet(null, defaultVal);
         }
 
-        return defaultVal;
+        return defaultValRef == null ? null : defaultValRef.get();
     }
 
     /**
      * Gets a value for less or equal token than the actual {@link 
#actualToken}.
      *
      * @param causalityToken Causality token.
-     * @return A completed future that contained a value.
+     * @return A completed future that contains a value.
      * @throws OutdatedTokenException If outdated token is passed as an 
argument.
      */
     private CompletableFuture<T> getValueForPreviousToken(long causalityToken) 
{
@@ -262,6 +256,16 @@ public class VersionedValue<T> {
         return histEntry.getValue();
     }
 
+    /**
+     * Save the version of the value associated with the given causality 
token. If someone has got a future to await the value associated
+     * with the given causality token (see {@link #get(long)}, then the future 
will be completed.
+     *
+     * @param causalityToken Causality token.
+     */
+    public void complete(long causalityToken) {
+        completeOnRevision(causalityToken);
+    }
+
     /**
      * Save the version of the value associated with the given causality 
token. If someone has got a future to await the value associated
      * with the given causality token (see {@link #get(long)}, then the future 
will be completed.
@@ -269,48 +273,103 @@ public class VersionedValue<T> {
      * @param causalityToken Causality token.
      * @param value          Current value.
      */
-    public void set(long causalityToken, T value) {
+    public void complete(long causalityToken, T value) {
         long actualToken0 = actualToken;
 
         if (actualToken0 == NOT_INITIALIZED) {
             history.put(causalityToken, initFut);
         }
 
-        assert actualToken0 == NOT_INITIALIZED || actualToken0 + 1 == 
causalityToken : IgniteStringFormatter.format(
-                "Token must be greater than actual by exactly 1 [token={}, 
actual={}]", causalityToken, actualToken0);
+        checkToken(actualToken0, causalityToken);
 
-        setValueInternal(causalityToken, value);
+        completeInternal(causalityToken, value, null);
     }
 
     /**
-     * Comparisons an exception to the causality token.
+     * Save the exception associated with the given causality token. If 
someone has got a future to await the value associated
+     * with the given causality token (see {@link #get(long)}, then the future 
will be completed.
      *
      * @param causalityToken Causality token.
      * @param throwable An exception.
      */
-    public void fail(long causalityToken, Throwable throwable) {
+    public void completeExceptionally(long causalityToken, Throwable 
throwable) {
         long actualToken0 = actualToken;
 
         if (actualToken0 == NOT_INITIALIZED) {
             history.put(causalityToken, initFut);
         }
 
-        assert actualToken0 == NOT_INITIALIZED || actualToken0 + 1 == 
causalityToken : IgniteStringFormatter.format(
-                "Token must be greater than actual by exactly 1 [token={}, 
actual={}]", causalityToken, actualToken0);
+        checkToken(actualToken0, causalityToken);
+
+        completeInternal(causalityToken, null, throwable);
+    }
+
+    /**
+     * This internal method assigns either value or exception according to 
specific token.
+     *
+     * @param causalityToken Causality token.
+     * @param value          Value to set.
+     * @param throwable      An exception.
+     */
+    private void completeInternal(long causalityToken, T value, Throwable 
throwable) {
+        CompletableFuture<T> res = history.putIfAbsent(
+                causalityToken,
+                throwable == null ? completedFuture(value) : 
failedFuture(throwable)
+        );
+
+        if (res == null) {
+            notifyCompletionListeners(causalityToken, value, throwable);
+
+            return;
+        }
+
+        assert !res.isDone() : completeInternalConflictErrorMessage(res, 
causalityToken, value, throwable);
+
+        if (throwable == null) {
+            res.complete(value);
+        } else {
+            res.completeExceptionally(throwable);
+        }
+
+        notifyCompletionListeners(causalityToken, value, throwable);
+    }
 
-        failInternal(causalityToken, throwable);
+    /**
+     * Builds an error message for the case when there is a conflict between 
history and a value or exception that is going to be
+     * saved.
+     *
+     * @param future Future.
+     * @param token Token.
+     * @param value Value.
+     * @param throwable Throwable.
+     * @return Error message.
+     */
+    private String completeInternalConflictErrorMessage(CompletableFuture<T> 
future, long token, T value, Throwable throwable) {
+        return future.handle(
+            (prevValue, prevThrowable) ->
+                IgniteStringFormatter.format(
+                    "Different values associated with the token [token={}, 
value={}, exception={}, prevValue={}, prevException={}]",
+                    token,
+                    value,
+                    throwable,
+                    prevValue,
+                    prevThrowable
+                )
+        )
+        .join();
     }
 
     /**
      * Updates the value using the given updater. The updater receives the 
value on previous token, or default value
      * (see constructor) if the value isn't initialized, or current 
intermediate value, if this method has been already
      * called for the same token; and returns a new value.<br>
+     * The updater will be called after updaters that had been passed to 
previous calls of this method complete.
      * If an exception ({@link CancellationException} or {@link 
CompletionException}) was thrown when calculating the value for previous
-     * token, then {@code fail} updater is used to process the exception and 
calculate a new value.<br>
+     * token, then updater is used to process the exception and calculate a 
new value.<br>
      * This method can be called multiple times for the same token, and 
doesn't complete the future created for this token.
-     * The future is supposed to be completed by storage revision update in 
this case. If this method has been called at least
-     * once on the given token, the updater will receive a value that was 
evaluated by updater on previous call, as intermediate
-     * result.<br>
+     * The future is supposed to be completed by storage revision update or a 
call of {@link #complete(long)} in this case.
+     * If this method has been called at least once on the given token, the 
updater will receive a value that was evaluated
+     * by updater on previous call, as intermediate result.<br>
      * As the order of multiple calls of this method on the same token is 
unknown, operations done by the updater must be
      * commutative. For example:
      * <ul>
@@ -322,107 +381,95 @@ public class VersionedValue<T> {
      * </ul>
      * Regardless of order in which this method's calls are made, V3 should be 
the final result.
      * <br>
-     * The method should return previous value (previous intermediate value, 
or a value for previous token, if this method
-     * is called for first time for given token).
+     * The method should return a future that will be completed when {@code 
updater} completes.
      *
      * @param causalityToken Causality token.
-     * @param complete       The function is invoked if the previous future 
completed successfully.
-     * @param fail           The function is invoked if the previous future 
completed with an exception.
-     * @return               Updated value.
+     * @param updater        The binary function that accepts previous value 
and exception, if present, and update it to compute
+     *                       the new value.
+     * @return               Future for updated value.
      */
-    public T update(long causalityToken, Function<T, T> complete, 
Function<Throwable, T> fail) {
-        long actualToken0 = actualToken;
-
-        assert actualToken0 == NOT_INITIALIZED || actualToken0 + 1 == 
causalityToken : IgniteStringFormatter.format(
-                "Token must be greater than actual by exactly 1 [token={}, 
actual={}]", causalityToken, actualToken0);
-
-        try {
-            synchronized (updateMutex) {
-                T previousValue;
-
-                if (isUpdating) {
-                    previousValue = tempValue;
-                } else {
-                    Entry<Long, CompletableFuture<T>> histEntry = 
history.floorEntry(actualToken0);
-
-                    if (histEntry == null) {
-                        previousValue = getDefault();
-                    } else {
-                        assert histEntry.getValue().isDone() : "Previous value 
should be ready.";
-
-                        previousValue = histEntry.getValue().join();
-                    }
-                }
+    public CompletableFuture<T> update(
+            long causalityToken,
+            BiFunction<T, Throwable, CompletableFuture<T>> updater
+    ) {
+        long actualToken0 = this.actualToken;
 
-                isUpdating = true;
+        checkToken(actualToken0, causalityToken);
 
-                T res = complete.apply(previousValue);
+        synchronized (updateMutex) {
+            CompletableFuture<T> updaterFuture = this.updaterFuture;
 
-                tempValue = res;
+            CompletableFuture<T> future = updaterFuture == null ? 
previousOrDefaultValueFuture(actualToken0) : updaterFuture;
 
-                return res;
-            }
-        } catch (CancellationException | CompletionException e) {
-            synchronized (updateMutex) {
-                isUpdating = true;
+            CompletableFuture<CompletableFuture<T>> f0 = future
+                    .handle(updater::apply)
+                    .handle((fut, e) -> e == null ? fut : failedFuture(e));
 
-                T res = fail.apply(e);
+            updaterFuture = f0.thenCompose(Function.identity());
 
-                tempValue = res;
+            this.updaterFuture = updaterFuture;
 
-                return res;
-            }
+            return updaterFuture;
         }
     }
 
     /**
-     * This internal method assigns value according to specific token without 
additional checks.
+     * Add listener for completions of this versioned value on every token. It 
will be called on every {@link #complete(long)},
+     * {@link #complete(long, Object)}, {@link #completeExceptionally(long, 
Throwable)} and also, if none of mentioned methods was
+     * called explicitly, on storage revision update,
      *
-     * @param causalityToken Causality token.
-     * @param value          Value to set.
+     * @param action Action to perform.
      */
-    private void setValueInternal(long causalityToken, T value) {
-        CompletableFuture<T> res = history.putIfAbsent(causalityToken, 
CompletableFuture.completedFuture(value));
-
-        if (res == null || res.isCompletedExceptionally()) {
-            return;
-        }
-
-        assert !res.isDone() : IgniteStringFormatter.format("Different values 
associated with the token "
-            + "[token={}, value={}, prevValue={}]", causalityToken, value, 
res.join());
+    public void whenComplete(IgniteTriConsumer<Long, T, Throwable> action) {
+        completionListeners.add(action);
+    }
 
-        res.complete(value);
+    /**
+     * Removes a completion listener, see {@link 
#whenComplete(IgniteTriConsumer)}.
+     *
+     * @param action Action to remove.
+     */
+    public void removeWhenComplete(IgniteTriConsumer<Long, T, Throwable> 
action) {
+        completionListeners.remove(action);
     }
 
     /**
-     * Fails a future associated with this causality token.
+     * Notify completion listeners.
      *
-     * @param causalityToken Causality token.
-     * @param throwable      An exception.
+     * @param causalityToken Token.
+     * @param value Value.
+     * @param throwable Throwable.
      */
-    private void failInternal(long causalityToken, Throwable throwable) {
-        CompletableFuture<T> res = history.putIfAbsent(causalityToken, 
CompletableFuture.failedFuture(throwable));
+    private void notifyCompletionListeners(long causalityToken, T value, 
Throwable throwable) {
+        Throwable unpackedThrowable = throwable instanceof CompletionException 
? throwable.getCause() : throwable;
 
-        if (res == null || res.isCompletedExceptionally()) {
-            return;
+        List<Exception> exceptions = new ArrayList<>();
+
+        for (IgniteTriConsumer<Long, T, Throwable> listener : 
completionListeners) {
+            try {
+                listener.accept(causalityToken, value, unpackedThrowable);
+            } catch (Exception e) {
+                exceptions.add(e);
+            }
         }
 
-        assert !res.isDone() : IgniteStringFormatter.format("A value already 
has associated with the token "
-                + "[token={}, ex={}, value={}]", causalityToken, throwable, 
res.join());
+        if (!exceptions.isEmpty()) {
+            IgniteInternalException ex = new IgniteInternalException();
+
+            exceptions.forEach(ex::addSuppressed);
 
-        res.completeExceptionally(throwable);
+            throw ex;
+        }
     }
 
     /**
-     * Should be called on a storage revision update. This also triggers 
completion of a future created for the given causality token. It
-     * implies that all possible updates associated with this token have been 
already applied to the component.
-     * <br>
-     * This method should not be called concurrently with {@link #update(long, 
Function, Function)} and {@link #set(long, Object)}
-     * methods, as the storage revision update listener is supposed to be 
called after all other configuration listeners.
+     * Complete because of explicit token update. This also triggers 
completion of a future created for the given causality token.
+     * This future completes after all updaters are complete (see {@link 
#update(long, BiFunction)}).
      *
-     * @param causalityToken Causality token.
+     * @param causalityToken Token.
+     * @return Future.
      */
-    private void onStorageRevisionUpdate(long causalityToken) {
+    private CompletableFuture<?> completeOnRevision(long causalityToken) {
         long actualToken0 = actualToken;
 
         assert causalityToken > actualToken0 : IgniteStringFormatter.format(
@@ -432,32 +479,34 @@ public class VersionedValue<T> {
             history.put(causalityToken, initFut);
         }
 
-        if (isUpdating) {
-            setValueInternal(causalityToken, tempValue);
+        synchronized (updateMutex) {
+            CompletableFuture<T> updaterFuture0 = updaterFuture;
 
-            isUpdating = false;
-        }
+            CompletableFuture<?> completeUpdatesFuture = updaterFuture0 == null
+                    ? completedFuture(null)
+                    : updaterFuture0.whenComplete((v, t) -> 
completeInternal(causalityToken, v, t));
 
-        if (storageRevisionUpdateCallback != null) {
-            storageRevisionUpdateCallback.accept(this, causalityToken);
-        }
+            updaterFuture = null;
 
-        completeRelatedFuture(causalityToken);
+            actualToken = causalityToken;
 
-        if (history.size() > 1 && causalityToken - history.firstKey() >= 
historySize) {
-            trimToSize(causalityToken);
-        }
+            return completeUpdatesFuture.thenRun(() -> {
+                completeRelatedFuture(causalityToken);
 
-        Entry<Long, CompletableFuture<T>> entry = 
history.floorEntry(causalityToken);
+                if (history.size() > 1 && causalityToken - history.firstKey() 
>= historySize) {
+                    trimToSize(causalityToken);
+                }
 
-        assert entry != null && entry.getValue().isDone() : 
IgniteStringFormatter.format(
-                "Future for the token is not completed [token={}]", 
causalityToken);
+                Entry<Long, CompletableFuture<T>> entry = 
history.floorEntry(causalityToken);
 
-        actualToken = causalityToken;
+                assert entry != null && entry.getValue().isDone() : 
IgniteStringFormatter.format(
+                    "Future for the token is not completed [token={}]", 
causalityToken);
+            });
+        }
     }
 
     /**
-     * Completes a future related with a specific causality token.
+     * Completes a future related with a specific causality token. It is 
called only on storage revision update.
      *
      * @param causalityToken The token which is becoming an actual.
      */
@@ -469,18 +518,45 @@ public class VersionedValue<T> {
         if (!future.isDone()) {
             Entry<Long, CompletableFuture<T>> entryBefore = 
history.headMap(causalityToken).lastEntry();
 
-            CompletableFuture<T> previousFuture =
-                    entryBefore == null ? 
CompletableFuture.completedFuture(getDefault()) : entryBefore.getValue();
+            CompletableFuture<T> previousFuture = entryBefore == null ? 
completedFuture(getDefault()) : entryBefore.getValue();
 
             assert previousFuture.isDone() : IgniteStringFormatter.format("No 
future for token [token={}]", causalityToken);
 
             previousFuture.whenComplete((t, throwable) -> {
                 if (throwable != null) {
                     future.completeExceptionally(throwable);
+
+                    notifyCompletionListeners(causalityToken, null, throwable);
                 } else {
                     future.complete(t);
+
+                    notifyCompletionListeners(causalityToken, t, null);
                 }
             });
+        } else if (entry.getKey() < causalityToken) {
+            // Notifying listeners when there were neither updates nor 
explicit completions.
+            // This future is previous, it is always done.
+            future.whenComplete((v, e) -> 
notifyCompletionListeners(causalityToken, v, e));
+        }
+    }
+
+    /**
+     * Return a future for previous or default value.
+     *
+     * @param actualToken Token.
+     * @return Future.
+     */
+    private CompletableFuture<T> previousOrDefaultValueFuture(long 
actualToken) {
+        Entry<Long, CompletableFuture<T>> histEntry = 
history.floorEntry(actualToken);
+
+        if (histEntry == null) {
+            return completedFuture(getDefault());
+        } else {
+            CompletableFuture<T> prevFuture = histEntry.getValue();
+
+            assert prevFuture.isDone() : "Previous value should be ready.";
+
+            return prevFuture;
         }
     }
 
@@ -504,4 +580,15 @@ public class VersionedValue<T> {
             trimHistoryLock.writeLock().unlock();
         }
     }
+
+    /**
+     * Check that the given causality token os correct according to the actual 
token.
+     *
+     * @param actualToken Actual token.
+     * @param causalityToken Causality token.
+     */
+    private static void checkToken(long actualToken, long causalityToken) {
+        assert actualToken == NOT_INITIALIZED || actualToken + 1 == 
causalityToken : IgniteStringFormatter.format(
+            "Token must be greater than actual by exactly 1 [token={}, 
actual={}]", causalityToken, actualToken);
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/IgniteTriConsumer.java 
b/modules/core/src/main/java/org/apache/ignite/lang/IgniteTriConsumer.java
new file mode 100644
index 000000000..8d56b0da6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteTriConsumer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.Objects;
+
+/**
+ * Represents a closure that accepts three input parameters and returns no 
result.
+ *
+ * @param <T> The type of the first parameter to the closure.
+ * @param <U> The type of the second parameter to the closure.
+ * @param <V> The type of the third parameter to the closure.
+ */
+@FunctionalInterface
+public interface IgniteTriConsumer<T, U, V> {
+    /**
+     * Performs the operation on the given arguments.
+     *
+     * @param t The first argument.
+     * @param u The second argument.
+     * @param v The third argument.
+     */
+    void accept(T t, U u, V v);
+
+    /**
+     * Returns a composed {@code IgniteTriConsumer} that performs, in 
sequence, this operation followed by the {@code after} closure.
+     * If {@code after} throws an exception, it should be handled by the 
caller of the closure. If performing this closure throws
+     * an exception, the {@code after} closure will not be performed.
+     *
+     * @param after Closure to execute after this {@code IgniteTriConsumer} 
completes.
+     * @return Composed {@code IgniteTriConsumer}.
+     */
+    default IgniteTriConsumer<T, U, V> andThen(IgniteTriConsumer<? super T, ? 
super U, ? super V> after) {
+        Objects.requireNonNull(after);
+
+        return (t, u, v) -> {
+            accept(t, u, v);
+
+            after.accept(t, u, v);
+        };
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
index 904aa81b4..791e1138c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/VersionedValueTest.java
@@ -17,17 +17,32 @@
 
 package org.apache.ignite.internal.causality;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteTriConsumer;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -41,6 +56,11 @@ public class VersionedValueTest {
     /** The test revision register is used to move the revision forward. */
     public static final TestRevisionRegister REGISTER = new 
TestRevisionRegister();
 
+    @BeforeEach
+    public void clearRegister() {
+        REGISTER.clear();
+    }
+
     /**
      * The test gets a value for {@link VersionedValue} before the value is 
calculated.
      *
@@ -48,26 +68,21 @@ public class VersionedValueTest {
      */
     @Test
     public void testGetValueBeforeReady() throws OutdatedTokenException {
-        VersionedValue<Integer> longVersionedValue = new VersionedValue<>(
-                (integerVersionedValue, token) -> {
-                    integerVersionedValue.set(token, TEST_VALUE);
-                },
-                REGISTER,
-                2,
-                null
-        );
+        VersionedValue<Integer> intVersionedValue = new 
VersionedValue<>(REGISTER, 2, null);
 
-        CompletableFuture<Integer> fut = longVersionedValue.get(0);
+        CompletableFuture<Integer> fut = intVersionedValue.get(0);
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision.accept(0L);
+        intVersionedValue.complete(0L, TEST_VALUE);
+
+        REGISTER.moveRevision(0L).join();
 
         assertTrue(fut.isDone());
 
         assertEquals(TEST_VALUE, fut.join());
 
-        assertSame(fut.join(), longVersionedValue.get(0).join());
+        assertSame(fut.join(), intVersionedValue.get(0).join());
     }
 
     /**
@@ -83,7 +98,7 @@ public class VersionedValueTest {
 
         assertFalse(fut.isDone());
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
         assertTrue(fut.isDone());
 
@@ -102,15 +117,15 @@ public class VersionedValueTest {
     public void testMissValueUpdateBeforeReady() throws OutdatedTokenException 
{
         VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision.accept(1L);
+        REGISTER.moveRevision(1L).join();
 
         assertTrue(fut.isDone());
 
@@ -129,10 +144,10 @@ public class VersionedValueTest {
     public void testMissValueUpdate() throws OutdatedTokenException {
         VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(0L);
-        REGISTER.moveRevision.accept(1L);
+        REGISTER.moveRevision(0L).join();
+        REGISTER.moveRevision(1L).join();
 
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
@@ -150,14 +165,14 @@ public class VersionedValueTest {
     public void testObsoleteToken() {
         VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
-        longVersionedValue.set(1, TEST_VALUE);
+        longVersionedValue.complete(1, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(1L);
-        REGISTER.moveRevision.accept(2L);
+        REGISTER.moveRevision(1L).join();
+        REGISTER.moveRevision(2L).join();
 
         assertThrowsExactly(OutdatedTokenException.class, () -> 
longVersionedValue.get(0));
     }
@@ -167,19 +182,18 @@ public class VersionedValueTest {
      */
     @Test
     public void testAutocompleteFuture() throws OutdatedTokenException {
-        VersionedValue<Integer> longVersionedValue = new VersionedValue<>((b, 
r) -> {
-        }, REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision.accept(1L);
-        REGISTER.moveRevision.accept(2L);
+        REGISTER.moveRevision(1L).join();
+        REGISTER.moveRevision(2L).join();
 
         assertTrue(fut.isDone());
         assertTrue(longVersionedValue.get(2).isDone());
@@ -192,12 +206,11 @@ public class VersionedValueTest {
      */
     @Test
     public void testUpdate() throws Exception {
-        VersionedValue<Integer> longVersionedValue = new VersionedValue<>((b, 
r) -> {
-        }, REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
-        longVersionedValue.set(0, TEST_VALUE);
+        longVersionedValue.complete(0, TEST_VALUE);
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
         CompletableFuture<Integer> fut = longVersionedValue.get(1);
 
@@ -206,16 +219,18 @@ public class VersionedValueTest {
         int incrementCount = 10;
 
         for (int i = 0; i < incrementCount; i++) {
-            longVersionedValue.update(1, previous -> ++previous, ex -> null);
+            longVersionedValue.update(1, (previous, e) -> 
completedFuture(++previous));
 
             assertFalse(fut.isDone());
         }
 
-        REGISTER.moveRevision.accept(1L);
+        REGISTER.moveRevision(1L).join();
 
         assertTrue(fut.isDone());
 
         assertEquals(TEST_VALUE + incrementCount, fut.get());
+
+        assertThrows(AssertionError.class, () -> longVersionedValue.update(1L, 
(i, t) -> completedFuture(null)));
     }
 
     /**
@@ -225,28 +240,152 @@ public class VersionedValueTest {
      */
     @Test
     public void testUpdatePredefined() throws Exception {
-        VersionedValue<Integer> longVersionedValue = new VersionedValue<>((b, 
r) -> {
-        }, REGISTER);
+        VersionedValue<Integer> longVersionedValue = new 
VersionedValue<>(REGISTER);
 
         CompletableFuture<Integer> fut = longVersionedValue.get(0);
 
         assertFalse(fut.isDone());
 
-        longVersionedValue.update(0, previous -> {
+        longVersionedValue.update(0, (previous, e) -> {
             assertNull(previous);
 
-            return TEST_VALUE;
-        }, ex -> null);
+            return completedFuture(TEST_VALUE);
+        });
 
         assertFalse(fut.isDone());
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
         assertTrue(fut.isDone());
 
         assertEquals(TEST_VALUE, fut.get());
     }
 
+    /**
+     * Test asynchronous update closure.
+     */
+    @Test
+    public void testAsyncUpdate() {
+        VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
+
+        CompletableFuture<Integer> fut = new CompletableFuture<>();
+
+        vv.update(0L, (v, e) -> fut);
+
+        CompletableFuture<Integer> vvFut = vv.get(0L);
+
+        CompletableFuture<?> revFut = REGISTER.moveRevision(0L);
+
+        assertFalse(fut.isDone());
+        assertFalse(vvFut.isDone());
+        assertFalse(revFut.isDone());
+
+        fut.complete(1);
+
+        revFut.join();
+
+        assertTrue(vvFut.isDone());
+    }
+
+    /**
+     * Test the case when exception happens in updater.
+     */
+    @Test
+    public void testExceptionOnUpdate() {
+        VersionedValue<Integer> vv = new VersionedValue<>(REGISTER, () -> 0);
+
+        final int count = 4;
+        final int successfulCompletionsCount = count / 2;
+
+        AtomicInteger actualSuccessfulCompletionsCount = new AtomicInteger();
+
+        final String exceptionMsg = "test msg";
+
+        for (int i = 0; i < count; i++) {
+            vv.update(0L, (v, e) -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
+                if (v == successfulCompletionsCount) {
+                    throw new IgniteInternalException(exceptionMsg);
+                }
+
+                actualSuccessfulCompletionsCount.incrementAndGet();
+
+                return completedFuture(++v);
+            });
+        }
+
+        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
+
+        vv.whenComplete((t, v, e) -> exceptionRef.set(e));
+
+        vv.complete(0L);
+
+        assertThrowsWithCause(() -> vv.get(0L).join(), 
IgniteInternalException.class);
+
+        assertEquals(exceptionMsg, exceptionRef.get().getMessage());
+        assertEquals(successfulCompletionsCount, 
actualSuccessfulCompletionsCount.get());
+    }
+
+    /**
+     * Test with multiple versioned values and asynchronous completion.
+     */
+    @Test
+    public void testAsyncMultiVv() {
+        final String registryName = "Registry";
+        final String assignmentName = "Assignment";
+        final String tableName = "T1_";
+
+        VersionedValue<Map<UUID, String>> tablesVv = new VersionedValue<>(f -> 
{}, HashMap::new);
+        VersionedValue<Map<UUID, String>> schemasVv = new 
VersionedValue<>(REGISTER, HashMap::new);
+        VersionedValue<Map<UUID, String>> assignmentsVv = new 
VersionedValue<>(REGISTER, HashMap::new);
+
+        schemasVv.whenComplete((token, value, ex) -> tablesVv.complete(token));
+
+        BiFunction<Long, UUID, CompletableFuture<String>> schemaRegistry =
+                (token, uuid) -> schemasVv.get(token).thenApply(schemas -> 
schemas.get(uuid));
+
+        // Adding table.
+        long token = 0L;
+        UUID tableId = UUID.randomUUID();
+
+        CompletableFuture<String> tableFut = schemaRegistry.apply(token, 
tableId)
+                .thenCombine(assignmentsVv.get(token), (registry, assignments) 
-> tableName + registry + assignments.get(tableId));
+
+        tablesVv.update(token, (old, e) -> tableFut.thenApply(table -> {
+            Map<UUID, String> val = new HashMap<>(old);
+
+            val.put(tableId, table);
+
+            return val;
+        }));
+
+        CompletableFuture<String> userFut = tablesVv.get(token).thenApply(map 
-> map.get(tableId));
+
+        schemasVv.update(token, (old, e) -> {
+            old.put(tableId, registryName);
+
+            return completedFuture(old);
+        });
+
+        assignmentsVv.update(token, (old, e) -> {
+            old.put(tableId, assignmentName);
+
+            return completedFuture(old);
+        });
+
+        assertFalse(tableFut.isDone());
+        assertFalse(userFut.isDone());
+
+        REGISTER.moveRevision(token).join();
+
+        tableFut.join();
+
+        assertEquals(tableName + registryName + assignmentName, 
userFut.join());
+    }
+
     /**
      * Checks a behavior when {@link VersionedValue} has not initialized yet, 
but someone already tries to get a value.
      *
@@ -264,7 +403,7 @@ public class VersionedValueTest {
 
         assertNull(longVersionedValue.latest());
 
-        longVersionedValue.set(2, TEST_VALUE);
+        longVersionedValue.complete(2, TEST_VALUE);
 
         assertTrue(fut1.isDone());
         assertTrue(fut2.isDone());
@@ -299,12 +438,10 @@ public class VersionedValueTest {
     public void checkDefaultValue(VersionedValue<Integer> vv, Integer 
expectedDefault) {
         assertEquals(expectedDefault, vv.latest());
 
-        vv.update(0, a -> {
+        vv.update(0, (a, e) -> {
                     assertEquals(expectedDefault, vv.latest());
 
-                    return a == null ? null : a + 1;
-                }, e -> {
-                    throw new IgniteInternalException(e);
+                    return completedFuture(a == null ? null : a + 1);
                 }
         );
 
@@ -314,9 +451,9 @@ public class VersionedValueTest {
 
         assertFalse(f.isDone());
 
-        vv.update(0, a -> a == null ? null : a + 1, e -> null);
+        vv.update(0, (a, e) -> completedFuture(a == null ? null : a + 1));
 
-        REGISTER.moveRevision.accept(0L);
+        REGISTER.moveRevision(0L).join();
 
         assertTrue(f.isDone());
 
@@ -324,17 +461,126 @@ public class VersionedValueTest {
     }
 
     /**
-     * Test revision register.
+     * Test {@link VersionedValue#whenComplete(IgniteTriConsumer)}.
      */
-    private static class TestRevisionRegister implements 
Consumer<Consumer<Long>> {
+    @Test
+    public void testWhenComplete() {
+        VersionedValue<Integer> vv = new VersionedValue<>(REGISTER);
+
+        AtomicInteger a = new AtomicInteger();
+        AtomicInteger cntr = new AtomicInteger(-1);
+
+        IgniteTriConsumer<Long, Integer, Throwable> listener = (t, v, e) -> {
+            if (e == null) {
+                a.set(v);
+            } else {
+                a.set(-1);
+            }
+
+            cntr.incrementAndGet();
+        };
+
+        vv.whenComplete(listener);
+
+        // Test complete.
+        long token = 0;
+
+        final long finalToken0 = token;
+
+        vv.complete(token, TEST_VALUE);
+
+        assertThrows(AssertionError.class, () -> vv.complete(finalToken0, 0));
+        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken0, new Exception()));
+
+        assertEquals(TEST_VALUE, a.get());
+        assertEquals(token, cntr.get());
+
+        REGISTER.moveRevision(token).join();
+
+        // Test update.
+        token = 1;
+
+        vv.update(token, (v, e) -> completedFuture(++v));
+
+        assertEquals(TEST_VALUE, a.get());
+
+        REGISTER.moveRevision(token).join();
 
+        assertEquals(TEST_VALUE + 1, a.get());
+        assertEquals(token, cntr.get());
+
+        // Test move revision.
+        token = 2;
+
+        REGISTER.moveRevision(token).join();
+
+        assertEquals(TEST_VALUE + 1, a.get());
+        assertEquals(token, cntr.get());
+
+        // Test complete exceptionally.
+        token = 3;
+
+        final long finalToken3 = token;
+
+        vv.completeExceptionally(token, new Exception());
+
+        assertThrows(AssertionError.class, () -> vv.complete(finalToken3, 0));
+        assertThrows(AssertionError.class, () -> 
vv.completeExceptionally(finalToken3, new Exception()));
+
+        assertEquals(-1, a.get());
+        assertEquals(token, cntr.get());
+
+        REGISTER.moveRevision(token).join();
+
+        assertEquals(token, cntr.get());
+
+        // Test remove listener.
+        token = 4;
+
+        vv.removeWhenComplete(listener);
+
+        a.set(0);
+
+        vv.complete(token, TEST_VALUE);
+
+        assertEquals(0, a.get());
+        assertEquals(token - 1, cntr.get());
+
+        REGISTER.moveRevision(token).join();
+    }
+
+    /**
+     * Test revision register.
+     */
+    private static class TestRevisionRegister implements 
Consumer<Function<Long, CompletableFuture<?>>> {
         /** Revision consumer. */
-        Consumer<Long> moveRevision;
+        List<Function<Long, CompletableFuture<?>>> moveRevisionList = new 
ArrayList<>();
 
         /** {@inheritDoc} */
         @Override
-        public void accept(Consumer<Long> consumer) {
-            moveRevision = consumer;
+        public void accept(Function<Long, CompletableFuture<?>> function) {
+            moveRevisionList.add(function);
+        }
+
+        /**
+         * Clear list.
+         */
+        public void clear() {
+            moveRevisionList.clear();
+        }
+
+        /**
+         * Move revision.
+         *
+         * @param revision Revision.
+         * @return Future for all listeners.
+         */
+        public CompletableFuture<?> moveRevision(long revision) {
+            List<CompletableFuture<?>> futures = new ArrayList<>();
+
+            moveRevisionList.forEach(m -> futures.add(m.apply(revision)));
+
+            return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[] {}));
         }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index db7c9c88f..49ec18799 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -42,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
@@ -246,13 +247,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 modules.distributed().polymorphicSchemaExtensions()
         );
 
-        Consumer<Consumer<Long>> registry = (c) -> {
-            
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(newStorageRevision
 -> {
-                c.accept(newStorageRevision);
-
-                return CompletableFuture.completedFuture(null);
-            });
-        };
+        Consumer<Function<Long, CompletableFuture<?>>> registry = (c) -> 
clusterCfgMgr.configurationRegistry()
+                .listenUpdateStorageRevision(newStorageRevision -> 
c.apply(newStorageRevision));
 
         DataStorageModules dataStorageModules = new 
DataStorageModules(ServiceLoader.load(DataStorageModule.class));
 
@@ -337,7 +333,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
         // Deploy all registered watches because all components are ready and 
have registered their listeners.
         metaStorageMgr.deployWatches();
 
-        configurationCatchUpFuture.join();
+        assertThat(configurationCatchUpFuture, willCompleteSuccessfully());
 
         log.info("Completed recovery on partially started node, last revision 
applied: " + lastRevision.get()
                 + ", acceptableDifference: " + 
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, 
100)
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 6c08c585d..47259d32e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.client.handler.ClientHandlerModule;
@@ -271,13 +272,8 @@ public class IgniteImpl implements Ignite {
                 clusterSvc
         );
 
-        Consumer<Consumer<Long>> registry = (c) -> {
-            
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(newStorageRevision
 -> {
-                c.accept(newStorageRevision);
-
-                return CompletableFuture.completedFuture(null);
-            });
-        };
+        Consumer<Function<Long, CompletableFuture<?>>> registry =
+                c -> 
clusterCfgMgr.configurationRegistry().listenUpdateStorageRevision(c::apply);
 
         DataStorageModules dataStorageModules = new DataStorageModules(
                 ServiceLoader.load(DataStorageModule.class, 
serviceProviderClassLoader)
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 66f3e8f80..f9f6dc3a8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -77,7 +78,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private final TableManager tableManager;
 
-    private final Consumer<Consumer<Long>> registry;
+    private final Consumer<Function<Long, CompletableFuture<?>>> registry;
 
     private final DataStorageManager dataStorageManager;
 
@@ -101,7 +102,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     /** Constructor. */
     public SqlQueryProcessor(
-            Consumer<Consumer<Long>> registry,
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
             ClusterService clusterSrvc,
             TableManager tableManager,
             DataStorageManager dataStorageManager,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 5e3b94aef..ffbce8f98 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.internal.sql.engine.schema;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
@@ -59,7 +64,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager 
{
      */
     public SqlSchemaManagerImpl(
             TableManager tableManager,
-            Consumer<Consumer<Long>> registry
+            Consumer<Function<Long, CompletableFuture<?>>> registry
     ) {
         this.tableManager = tableManager;
         schemasVv = new VersionedValue<>(registry, HashMap::new);
@@ -134,21 +139,22 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
      * @param causalityToken Causality token.
      */
     public synchronized void onSchemaCreated(String schemaName, long 
causalityToken) {
-        Map<String, IgniteSchema> schemasMap = schemasVv.update(
+        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
                 causalityToken,
-                schemas -> {
+                (schemas, e) -> {
+                    if (e != null) {
+                        return failedFuture(e);
+                    }
+
                     Map<String, IgniteSchema> res =  new HashMap<>(schemas);
 
                     res.putIfAbsent(schemaName, new IgniteSchema(schemaName));
 
-                    return res;
-                },
-                e -> {
-                    throw new IgniteInternalException(e);
+                    return completedFuture(res);
                 }
         );
 
-        rebuild(causalityToken, schemasMap);
+        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -158,21 +164,22 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
      * @param causalityToken Causality token.
      */
     public synchronized void onSchemaDropped(String schemaName, long 
causalityToken) {
-        Map<String, IgniteSchema> schemasMap = schemasVv.update(
+        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
                     causalityToken,
-                    schemas -> {
+                    (schemas, e) -> {
+                        if (e != null) {
+                            return failedFuture(e);
+                        }
+
                         Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                         res.remove(schemaName);
 
-                        return res;
-                    },
-                    e -> {
-                        throw new IgniteInternalException(e);
+                        return completedFuture(res);
                     }
         );
 
-        rebuild(causalityToken, schemasMap);
+        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -184,9 +191,13 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             TableImpl table,
             long causalityToken
     ) {
-        Map<String, IgniteSchema> schemasMap = schemasVv.update(
+        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(
                 causalityToken,
-                schemas -> {
+                (schemas, e) -> {
+                    if (e != null) {
+                        return failedFuture(e);
+                    }
+
                     Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                     IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
@@ -195,28 +206,26 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                     schema.addTable(removeSchema(schemaName, table.name()), 
igniteTable);
 
-                    tablesVv.update(
+                    return tablesVv
+                        .update(
                             causalityToken,
-                            tables -> {
+                            (tables, ex) -> {
+                                if (ex != null) {
+                                    return failedFuture(ex);
+                                }
+
                                 Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
 
                                 resTbls.put(igniteTable.id(), igniteTable);
 
-                                return resTbls;
-                            },
-                            e -> {
-                                throw new IgniteInternalException(e);
+                                return completedFuture(resTbls);
                             }
-                    );
-
-                    return res;
-                },
-                e -> {
-                    throw new IgniteInternalException(e);
+                        )
+                        .thenCompose(tables -> completedFuture(res));
                 }
         );
 
-        rebuild(causalityToken, schemasMap);
+        rebuild(causalityToken, schemasMapFut);
     }
 
     /**
@@ -240,8 +249,12 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             String tableName,
             long causalityToken
     ) {
-        Map<String, IgniteSchema> schemasMap = schemasVv.update(causalityToken,
-                schemas -> {
+        CompletableFuture<Map<String, IgniteSchema>> schemasMapFut = 
schemasVv.update(causalityToken,
+                (schemas, e) -> {
+                    if (e != null) {
+                        return failedFuture(e);
+                    }
+
                     Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                     IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
@@ -253,39 +266,43 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                     if (table != null) {
                         schema.removeTable(calciteTableName);
 
-                        tablesVv.update(causalityToken,
-                                tables -> {
+                        return tablesVv
+                            .update(causalityToken,
+                                (tables, ex) -> {
+                                    if (ex != null) {
+                                        return failedFuture(ex);
+                                    }
+
                                     Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
 
                                     resTbls.remove(table.id());
 
-                                    return resTbls;
-                                },
-                                e -> {
-                                    throw new IgniteInternalException(e);
+                                    return completedFuture(resTbls);
                                 }
-                        );
+                            )
+                            .thenCompose(tables -> completedFuture(res));
                     }
 
-                    return res;
-                },
-                e -> {
-                    throw new IgniteInternalException(e);
+                    return completedFuture(res);
                 }
         );
 
-        rebuild(causalityToken, schemasMap);
+        rebuild(causalityToken, schemasMapFut);
     }
 
-    private void rebuild(long causalityToken, Map<String, IgniteSchema> 
schemas) {
+    private void rebuild(long causalityToken, CompletableFuture<Map<String, 
IgniteSchema>> schemasFut) {
         SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
 
         newCalciteSchema.add("PUBLIC", new IgniteSchema("PUBLIC"));
 
-        schemas.forEach(newCalciteSchema::add);
+        schemasFut.join().forEach(newCalciteSchema::add);
 
-        calciteSchemaVv.update(causalityToken, s -> newCalciteSchema, e -> {
-            throw new IgniteInternalException(e);
+        calciteSchemaVv.update(causalityToken, (s, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
+            return completedFuture(newCalciteSchema);
         });
 
         listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 79e23c0ea..99a5f60e4 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static 
org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,9 +38,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Flow;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
@@ -205,7 +208,7 @@ public class StopCalciteModuleTest {
 
         qryProc.start();
 
-        testRevisionRegister.moveRevision.accept(0L);
+        testRevisionRegister.moveRevision.apply(0L).join();
 
         var cursors = qryProc.queryAsync(
                 "PUBLIC",
@@ -259,18 +262,22 @@ public class StopCalciteModuleTest {
     /**
      * Test revision register.
      */
-    private static class TestRevisionRegister implements 
Consumer<Consumer<Long>> {
-
+    private static class TestRevisionRegister implements 
Consumer<Function<Long, CompletableFuture<?>>> {
         /** Revision consumer. */
-        Consumer<Long> moveRevision;
+        Function<Long, CompletableFuture<?>> moveRevision;
 
         /** {@inheritDoc} */
         @Override
-        public void accept(Consumer<Long> consumer) {
+        public void accept(Function<Long, CompletableFuture<?>> function) {
             if (moveRevision == null) {
-                moveRevision = consumer;
+                moveRevision = function;
             } else {
-                moveRevision = moveRevision.andThen(consumer);
+                Function<Long, CompletableFuture<?>> old = moveRevision;
+
+                moveRevision = rev -> allOf(
+                    old.apply(rev),
+                    function.apply(rev)
+                );
             }
         }
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 542f2ba77..51249d8bb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -42,6 +42,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import 
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import 
org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
@@ -138,7 +139,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     private ConfigurationStorageRevisionListenerHolder 
fieldRevisionListenerHolder;
 
     /** Revision updater. */
-    private Consumer<Consumer<Long>> revisionUpdater;
+    private Consumer<Function<Long, CompletableFuture<?>>> revisionUpdater;
 
     /** Tables configuration. */
     @InjectConfiguration(
@@ -202,15 +203,13 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     /** Inner initialisation. */
     @BeforeEach
     void before() throws Exception {
-        revisionUpdater = (Consumer<Long> consumer) -> {
-            consumer.accept(0L);
+        revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
+            function.apply(0L).join();
 
             
fieldRevisionListenerHolder.listenUpdateStorageRevision(newStorageRevision -> {
                 log.info("Notify about revision: {}", newStorageRevision);
 
-                consumer.accept(newStorageRevision);
-
-                return CompletableFuture.completedFuture(null);
+                return function.apply(newStorageRevision);
             });
         };
 
@@ -628,6 +627,14 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
             return completedFuture(raftGrpSrvcMock);
         });
 
+        when(rm.updateRaftGroup(any(), any(), any(), any())).thenAnswer(mock 
-> {
+            RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
+
+            when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new 
NetworkAddress("localhost", 47500)));
+
+            return completedFuture(raftGrpSrvcMock);
+        });
+
         when(ts.getByAddress(any(NetworkAddress.class))).thenReturn(new 
ClusterNode(
                 UUID.randomUUID().toString(),
                 "node0",
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index ff085e11d..79dd5df1a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec.schema;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,8 +31,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.calcite.schema.Table;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
@@ -227,18 +230,17 @@ public class SqlSchemaManagerTest {
     /**
      * Test revision register.
      */
-    private static class TestRevisionRegister implements 
Consumer<Consumer<Long>> {
+    private static class TestRevisionRegister implements 
Consumer<Function<Long, CompletableFuture<?>>> {
         AtomicLong token = new AtomicLong(-1);
 
-
         /** Revision consumer. */
-        private Consumer<Long> moveRevision;
+        private Function<Long, CompletableFuture<?>> moveRevision;
 
         /**
          * Moves forward token.
          */
         void moveForward() {
-            moveRevision.accept(token.incrementAndGet());
+            moveRevision.apply(token.incrementAndGet()).join();
         }
 
         /**
@@ -252,11 +254,16 @@ public class SqlSchemaManagerTest {
 
         /** {@inheritDoc} */
         @Override
-        public void accept(Consumer<Long> consumer) {
+        public void accept(Function<Long, CompletableFuture<?>> function) {
             if (moveRevision == null) {
-                moveRevision = consumer;
+                moveRevision = function;
             } else {
-                moveRevision = moveRevision.andThen(consumer);
+                Function<Long, CompletableFuture<?>> old = moveRevision;
+
+                moveRevision = rev -> allOf(
+                    old.apply(rev),
+                    function.apply(rev)
+                );
             }
         }
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 19209ea0d..32770d261 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.table.distributed;
 
 import static java.util.Collections.unmodifiableMap;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -82,7 +84,6 @@ import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteObjectName;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.IgniteSystemProperties;
@@ -163,7 +164,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * @param dataStorageMgr Data storage manager.
      */
     public TableManager(
-            Consumer<Consumer<Long>> registry,
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
             TablesConfiguration tablesCfg,
             Loza raftMgr,
             BaselineManager baselineMgr,
@@ -242,7 +243,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     new NodeStoppingException()
             );
 
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
         }
 
         try {
@@ -276,7 +277,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     new NodeStoppingException()
             );
 
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
         }
 
         try {
@@ -301,7 +302,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private CompletableFuture<?> 
onUpdateAssignments(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
         if (!busyLock.enterBusy()) {
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
         }
 
         try {
@@ -339,7 +340,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 );
             }
 
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
         }
 
         try {
@@ -365,7 +366,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         SchemaDescriptor schemaDescriptor = 
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
 
-        tablesByIdVv.update(causalityToken, tablesById -> {
+        tablesByIdVv.update(causalityToken, (tablesById, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
             TableImpl table = tablesById.get(tblId);
 
             ((SchemaRegistryImpl) 
table.schemaView()).onSchemaRegistered(schemaDescriptor);
@@ -374,10 +379,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 fireEvent(TableEvent.ALTER, new 
TableEventParameters(causalityToken, table), null);
             }
 
-            return tablesById;
-        }, th -> {
-            throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot create a schema 
for table"
-                    + " [tableId={}, schemaVer={}]", tblId, 
schemaDescriptor.version()), th);
+            return completedFuture(tablesById);
         });
     }
 
@@ -422,7 +424,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             toAdd.removeAll(oldPartitionAssignment);
 
             // Create new raft nodes according to new assignments.
-            tablesByIdVv.update(causalityToken, tablesById -> {
+            tablesByIdVv.update(causalityToken, (tablesById, e) -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
                 InternalTable internalTable = 
tablesById.get(tblId).internalTable();
 
                 try {
@@ -441,14 +447,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                         return null;
                     });
-                } catch (NodeStoppingException e) {
-                    throw new AssertionError("Loza was stopped before Table 
manager", e);
+                } catch (NodeStoppingException ex) {
+                    throw new AssertionError("Loza was stopped before Table 
manager", ex);
                 }
 
-                return tablesById;
-            }, th -> {
-                throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot start RAFT group 
for table"
-                        + " [tableId={}, part={}]", tblId, partId), th);
+                return completedFuture(tablesById);
             });
         }
 
@@ -522,24 +525,28 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
         var table = new TableImpl(internalTable, schemaRegistry);
 
-        tablesVv.update(causalityToken, previous -> {
+        tablesVv.update(causalityToken, (previous, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
             var val = new HashMap<>(previous);
 
             val.put(name, table);
 
-            return val;
-        }, th -> {
-            throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table 
[name={}, id={}]", name, tblId), th);
+            return completedFuture(val);
         });
 
-        tablesByIdVv.update(causalityToken, previous -> {
+        tablesByIdVv.update(causalityToken, (previous, e) -> {
+            if (e != null) {
+                return failedFuture(e);
+            }
+
             var val = new HashMap<>(previous);
 
             val.put(tblId, table);
 
-            return val;
-        }, th -> {
-            throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot create a table 
[name={}, id={}]", name, tblId), th);
+            return completedFuture(val);
         });
 
         CompletableFuture.allOf(tablesByIdVv.get(causalityToken), 
tablesVv.get(causalityToken)).thenRun(() -> {
@@ -647,30 +654,32 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 raftMgr.stopRaftGroup(raftGroupName(tblId, p));
             }
 
-            tablesVv.update(causalityToken, previousVal -> {
+            tablesVv.update(causalityToken, (previousVal, e) -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
                 var map = new HashMap<>(previousVal);
 
                 map.remove(name);
 
-                return map;
-            }, th -> {
-                throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot drop a table 
[name={}, id={}]", name, tblId),
-                        th);
+                return completedFuture(map);
             });
 
             AtomicReference<TableImpl> tableHolder = new AtomicReference<>();
 
-            tablesByIdVv.update(causalityToken, previousVal -> {
+            tablesByIdVv.update(causalityToken, (previousVal, e) -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
                 var map = new HashMap<>(previousVal);
 
                 TableImpl table = map.remove(tblId);
 
                 tableHolder.set(table);
 
-                return map;
-            }, th -> {
-                throw new 
IgniteInternalException(IgniteStringFormatter.format("Cannot drop a table 
[name={}, id={}]", name, tblId),
-                    th);
+                return completedFuture(map);
             });
 
             TableImpl table = tableHolder.get();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 0c842aca5..a1b6b3597 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -45,6 +45,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Phaser;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.ignite.configuration.NamedListView;
 import 
org.apache.ignite.configuration.schemas.store.UnknownDataStorageConfigurationSchema;
 import 
org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
@@ -156,7 +157,7 @@ public class TableManagerTest extends IgniteAbstractTest {
     private ConfigurationStorageRevisionListenerHolder 
fieldRevisionListenerHolder;
 
     /** Revision updater. */
-    private Consumer<Consumer<Long>> revisionUpdater;
+    private Consumer<Function<Long, CompletableFuture<?>>> revisionUpdater;
 
     /** Tables configuration. */
     @InjectConfiguration(
@@ -192,15 +193,13 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Before all test scenarios. */
     @BeforeEach
     void before() {
-        revisionUpdater = (Consumer<Long> consumer) -> {
-            consumer.accept(0L);
+        revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
+            function.apply(0L).join();
 
             
fieldRevisionListenerHolder.listenUpdateStorageRevision(newStorageRevision -> {
                 log.info("Notify about revision: {}", newStorageRevision);
 
-                consumer.accept(newStorageRevision);
-
-                return CompletableFuture.completedFuture(null);
+                return function.apply(newStorageRevision);
             });
         };
 

Reply via email to