This is an automated email from the ASF dual-hosted git repository.

petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new bf1327c6a21 IGNITE-26776 Operation Context propagation is integrated 
into GridFutureAdapter and CompletableFuture (#12455)
bf1327c6a21 is described below

commit bf1327c6a21a20d28852d6be4c2a5909712df9fa
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Apr 28 17:38:53 2026 +0300

    IGNITE-26776 Operation Context propagation is integrated into 
GridFutureAdapter and CompletableFuture (#12455)
---
 checkstyle/checkstyle.xml                          |   6 +
 .../concurrent/IgniteCompletableFuture.java        | 408 +++++++++++++++++++++
 .../function/OperationContextAwareBiConsumer.java  |  49 +++
 ...r.java => OperationContextAwareBiFunction.java} |  45 +--
 .../function/OperationContextAwareConsumer.java    |  49 +++
 .../function/OperationContextAwareFunction.java    |  54 +++
 .../function/OperationContextAwareInClosure.java}  |  36 +-
 .../function/OperationContextAwareSupplier.java    |  43 +++
 .../function/OperationContextAwareWrapper.java     |   2 +-
 .../internal/jdbc/thin/JdbcThinConnection.java     |  10 +-
 .../snapshot/AbstractCreateSnapshotFutureTask.java |  18 +-
 .../persistence/snapshot/SnapshotCheckProcess.java |  22 +-
 .../persistence/snapshot/SnapshotChecker.java      |  18 +-
 .../persistence/snapshot/SnapshotFutureTask.java   |  16 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java |   4 +-
 .../snapshot/SnapshotRestoreProcess.java           |  19 +-
 .../snapshot/dump/CreateDumpFutureTask.java        |  14 +-
 .../query/GridCacheDistributedQueryFuture.java     |   6 +-
 .../cache/query/GridCacheQueryManager.java         |   6 +-
 .../cache/query/reducer/CacheQueryReducer.java     |   4 +-
 .../cache/query/reducer/IndexQueryReducer.java     |   8 +-
 .../query/reducer/MergeSortCacheQueryReducer.java  |   4 +-
 .../cache/query/reducer/NodePageStream.java        |   8 +-
 .../cache/query/reducer/TextQueryReducer.java      |   6 +-
 .../query/reducer/UnsortedCacheQueryReducer.java   |  14 +-
 .../persistence/DmsDataWriterWorker.java           |   6 +-
 .../processors/query/stat/BusyExecutor.java        |   6 +-
 .../stat/LocalStatisticsGatheringContext.java      |   8 +-
 .../internal/util/future/GridFutureAdapter.java    |   5 +-
 .../context/OperationContextAttributesTest.java    | 274 ++++++++++++--
 .../processors/query/stat/BusyExecutorTest.java    |   6 +-
 31 files changed, 1005 insertions(+), 169 deletions(-)

diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 3b248ed7919..6775611e16c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -218,6 +218,12 @@
             <property name="className" 
value="java.util.concurrent.ScheduledThreadPoolExecutor"/>
             <property name="substitutionClassName" 
value="org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor"/>
         </module>
+
+        <module 
name="org.apache.ignite.tools.checkstyle.ClassUsageRestrictionRule">
+            <property name="className" 
value="java.util.concurrent.CompletableFuture"/>
+            <property name="factoryMethods" value="allOf, anyOf, supplyAsync, 
runAsync, completedFuture"/>
+            <property name="substitutionClassName" 
value="org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture"/>
+        </module>
     </module>
 
     <!--
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
new file mode 100644
index 00000000000..496d5adfd3d
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.concurrent;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareBiConsumer;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareBiFunction;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareConsumer;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareFunction;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareSupplier;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class IgniteCompletableFuture<T> implements Future<T>, 
CompletionStage<T> {
+    /** */
+    private final CompletableFuture<T> delegate;
+
+    /** */
+    public IgniteCompletableFuture() {
+        delegate = new CompletableFuture<>();
+    }
+
+    /** */
+    private IgniteCompletableFuture(CompletableFuture<T> delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> thenApply(Function<? super 
T, ? extends U> fn) {
+        return 
wrap(delegate.thenApply(OperationContextAwareFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> thenApplyAsync(Function<? 
super T, ? extends U> fn) {
+        return 
wrap(delegate.thenApplyAsync(OperationContextAwareFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> thenApplyAsync(Function<? 
super T, ? extends U> fn, Executor executor) {
+        return 
wrap(delegate.thenApplyAsync(OperationContextAwareFunction.wrap(fn), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenAccept(Consumer<? super 
T> action) {
+        return 
wrap(delegate.thenAccept(OperationContextAwareConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenAcceptAsync(Consumer<? 
super T> action) {
+        return 
wrap(delegate.thenAcceptAsync(OperationContextAwareConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenAcceptAsync(Consumer<? 
super T> action, Executor executor) {
+        return 
wrap(delegate.thenAcceptAsync(OperationContextAwareConsumer.wrap(action), 
executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenRun(Runnable action) {
+        return 
wrap(delegate.thenRun(OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenRunAsync(Runnable 
action) {
+        return 
wrap(delegate.thenRunAsync(OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> thenRunAsync(Runnable 
action, Executor executor) {
+        return 
wrap(delegate.thenRunAsync(OperationContextAwareRunnable.wrap(action), 
executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U, V> IgniteCompletableFuture<V> thenCombine(
+        CompletionStage<? extends U> other,
+        BiFunction<? super T, ? super U, ? extends V> fn
+    ) {
+        return wrap(delegate.thenCombine(unwrap(other), 
OperationContextAwareBiFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U, V> IgniteCompletableFuture<V> thenCombineAsync(
+        CompletionStage<? extends U> other,
+        BiFunction<? super T, ? super U, ? extends V> fn
+    ) {
+        return wrap(delegate.thenCombineAsync(unwrap(other), 
OperationContextAwareBiFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U, V> IgniteCompletableFuture<V> thenCombineAsync(
+        CompletionStage<? extends U> other,
+        BiFunction<? super T, ? super U, ? extends V> fn,
+        Executor executor
+    ) {
+        return wrap(delegate.thenCombineAsync(unwrap(other), 
OperationContextAwareBiFunction.wrap(fn), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<Void> thenAcceptBoth(
+        CompletionStage<? extends U> other,
+        BiConsumer<? super T, ? super U> action
+    ) {
+        return wrap(delegate.thenAcceptBoth(unwrap(other), 
OperationContextAwareBiConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<Void> thenAcceptBothAsync(
+        CompletionStage<? extends U> other,
+        BiConsumer<? super T, ? super U> action
+    ) {
+        return wrap(delegate.thenAcceptBoth(unwrap(other), 
OperationContextAwareBiConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<Void> thenAcceptBothAsync(
+        CompletionStage<? extends U> other,
+        BiConsumer<? super T, ? super U> action,
+        Executor executor
+    ) {
+        return wrap(delegate.thenAcceptBothAsync(unwrap(other), 
OperationContextAwareBiConsumer.wrap(action), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterBoth(CompletionStage<?> other, Runnable action) {
+        return wrap(delegate.runAfterBoth(unwrap(other), 
OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterBothAsync(CompletionStage<?> other, Runnable action) {
+        return wrap(delegate.runAfterBothAsync(unwrap(other), 
OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) 
{
+        return wrap(delegate.runAfterBothAsync(unwrap(other), 
OperationContextAwareRunnable.wrap(action), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> 
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
+        return wrap(delegate.applyToEither(unwrap(other), 
OperationContextAwareFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> 
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> 
fn) {
+        return wrap(delegate.applyToEitherAsync(unwrap(other), 
OperationContextAwareFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> applyToEitherAsync(
+        CompletionStage<? extends T> other,
+        Function<? super T, U> fn,
+        Executor executor
+    ) {
+        return wrap(delegate.applyToEitherAsync(unwrap(other), 
OperationContextAwareFunction.wrap(fn), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
+        return wrap(delegate.acceptEither(unwrap(other), 
OperationContextAwareConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> 
action) {
+        return wrap(delegate.acceptEitherAsync(unwrap(other), 
OperationContextAwareConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> acceptEitherAsync(
+        CompletionStage<? extends T> other,
+        Consumer<? super T> action,
+        Executor executor
+    ) {
+        return wrap(delegate.acceptEitherAsync(unwrap(other), 
OperationContextAwareConsumer.wrap(action), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterEither(CompletionStage<?> other, Runnable action) {
+        return wrap(delegate.runAfterEither(unwrap(other), 
OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
+        return wrap(delegate.runAfterEitherAsync(unwrap(other), 
OperationContextAwareRunnable.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<Void> 
runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor 
executor) {
+        return wrap(delegate.runAfterEitherAsync(unwrap(other), 
OperationContextAwareRunnable.wrap(action), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> thenCompose(Function<? 
super T, ? extends CompletionStage<U>> fn) {
+        return 
wrap(delegate.thenCompose(OperationContextAwareCompletionStageFactory.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> 
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
+        return 
wrap(delegate.thenComposeAsync(OperationContextAwareCompletionStageFactory.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> thenComposeAsync(
+        Function<? super T, ? extends CompletionStage<U>> fn,
+        Executor executor
+    ) {
+        return 
wrap(delegate.thenComposeAsync(OperationContextAwareCompletionStageFactory.wrap(fn),
 executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<T> whenComplete(BiConsumer<? 
super T, ? super Throwable> action) {
+        return 
wrap(delegate.whenComplete(OperationContextAwareBiConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<T> whenCompleteAsync(BiConsumer<? 
super T, ? super Throwable> action) {
+        return 
wrap(delegate.whenCompleteAsync(OperationContextAwareBiConsumer.wrap(action)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<T> whenCompleteAsync(BiConsumer<? 
super T, ? super Throwable> action, Executor executor) {
+        return 
wrap(delegate.whenCompleteAsync(OperationContextAwareBiConsumer.wrap(action), 
executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> handle(BiFunction<? super 
T, Throwable, ? extends U> fn) {
+        return wrap(delegate.handle(OperationContextAwareBiFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> handleAsync(BiFunction<? 
super T, Throwable, ? extends U> fn) {
+        return 
wrap(delegate.handleAsync(OperationContextAwareBiFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public <U> IgniteCompletableFuture<U> handleAsync(BiFunction<? 
super T, Throwable, ? extends U> fn, Executor executor) {
+        return 
wrap(delegate.handleAsync(OperationContextAwareBiFunction.wrap(fn), executor));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCompletableFuture<T> 
exceptionally(Function<Throwable, ? extends T> fn) {
+        return 
wrap(delegate.exceptionally(OperationContextAwareFunction.wrap(fn)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<T> toCompletableFuture() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel(boolean mayInterruptIfRunning) {
+        return delegate.cancel(mayInterruptIfRunning);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCancelled() {
+        return delegate.isCancelled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDone() {
+        return delegate.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get() throws InterruptedException, ExecutionException {
+        return delegate.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get(long timeout, @NotNull TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+        return delegate.get(timeout, unit);
+    }
+
+    /** */
+    public T join() {
+        return delegate.join();
+    }
+
+    /** */
+    public T getNow(T valueIfAbsent) {
+        return delegate.getNow(valueIfAbsent);
+    }
+
+    /** */
+    public boolean complete(T value) {
+        return delegate.complete(value);
+    }
+
+    /** */
+    public boolean completeExceptionally(Throwable ex) {
+        return delegate.completeExceptionally(ex);
+    }
+
+    /** */
+    public boolean isCompletedExceptionally() {
+        return delegate.isCompletedExceptionally();
+    }
+
+    /** */
+    public static IgniteCompletableFuture<Void> 
allOf(IgniteCompletableFuture<?>... cfs) {
+        return wrap(CompletableFuture.allOf(Arrays.stream(cfs).map(f -> 
f.delegate).toArray(CompletableFuture[]::new)));
+    }
+
+    /** */
+    public static IgniteCompletableFuture<Object> 
anyOf(IgniteCompletableFuture<?>... cfs) {
+        return wrap(CompletableFuture.anyOf(Arrays.stream(cfs).map(f -> 
f.delegate).toArray(CompletableFuture[]::new)));
+    }
+
+    /** */
+    public static <U> IgniteCompletableFuture<U> supplyAsync(Supplier<U> 
supplier) {
+        return 
wrap(CompletableFuture.supplyAsync(OperationContextAwareSupplier.wrap(supplier)));
+    }
+
+    /** */
+    public static <U> IgniteCompletableFuture<U> supplyAsync(Supplier<U> 
supplier, Executor executor) {
+        return 
wrap(CompletableFuture.supplyAsync(OperationContextAwareSupplier.wrap(supplier),
 executor));
+    }
+
+    /** */
+    public static IgniteCompletableFuture<Void> runAsync(Runnable runnable) {
+        return 
wrap(CompletableFuture.runAsync(OperationContextAwareRunnable.wrap(runnable)));
+    }
+
+    /** */
+    public static IgniteCompletableFuture<Void> runAsync(Runnable runnable, 
Executor executor) {
+        return 
wrap(CompletableFuture.runAsync(OperationContextAwareRunnable.wrap(runnable), 
executor));
+    }
+
+    /** */
+    public static <U> IgniteCompletableFuture<U> completedFuture(U value) {
+        return wrap(CompletableFuture.completedFuture(value));
+    }
+
+    /** */
+    private static <T> IgniteCompletableFuture<T> wrap(CompletableFuture<T> 
delegate) {
+        return new IgniteCompletableFuture<>(delegate);
+    }
+
+    /** */
+    private static <T> CompletionStage<T> unwrap(CompletionStage<T> 
completionStage) {
+        return completionStage instanceof IgniteCompletableFuture
+            ? ((IgniteCompletableFuture<T>)completionStage).delegate
+            : completionStage;
+    }
+
+    /** */
+    private static class OperationContextAwareCompletionStageFactory<T, U>
+        extends OperationContextAwareWrapper<Function<? super T, ? extends 
CompletionStage<U>>>
+        implements Function<T, CompletionStage<U>> {
+        /** */
+        public OperationContextAwareCompletionStageFactory(
+            Function<? super T, ? extends CompletionStage<U>> delegate,
+            OperationContextSnapshot snapshot
+        ) {
+            super(delegate, snapshot);
+        }
+
+        /** {@inheritDoc} */
+        @Override public CompletionStage<U> apply(T t) {
+            try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+                return unwrap(delegate.apply(t));
+            }
+        }
+
+        /** */
+        public static <T, R> Function<? super T, ? extends CompletionStage<R>> 
wrap(
+            Function<? super T, ? extends CompletionStage<R>> delegate
+        ) {
+            if (delegate == null || delegate instanceof 
OperationContextAwareWrapper)
+                return delegate;
+
+            return new OperationContextAwareCompletionStageFactory<>(delegate, 
OperationContext.createSnapshot());
+        }
+    }
+}
+
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
new file mode 100644
index 00000000000..6870dde9be8
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareBiConsumer<T, U> extends 
OperationContextAwareWrapper<BiConsumer<T, U>> implements BiConsumer<T, U> {
+    /** */
+    public OperationContextAwareBiConsumer(BiConsumer<T, U> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(T t, U u) {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            delegate.accept(t, u);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public BiConsumer<T, U> andThen(@NotNull BiConsumer<? 
super T, ? super U> after) {
+        return BiConsumer.super.andThen(wrap(after));
+    }
+
+    /** */
+    public static <T, U> BiConsumer<T, U> wrap(BiConsumer<T, U> delegate) {
+        return wrap(delegate, OperationContextAwareBiConsumer::new);
+    }
+}
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
similarity index 50%
copy from 
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
copy to 
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
index a741d3b6d3c..bdf17e0042b 100644
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
@@ -18,44 +18,35 @@
 package org.apache.ignite.internal.thread.context.function;
 
 import java.util.function.BiFunction;
-import org.apache.ignite.internal.IgniteInternalWrapper;
+import java.util.function.Function;
 import org.apache.ignite.internal.thread.context.OperationContext;
 import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
 
 /** */
-abstract class OperationContextAwareWrapper<T> implements 
IgniteInternalWrapper<T> {
+public class OperationContextAwareBiFunction<T, U, R>
+    extends OperationContextAwareWrapper<BiFunction<T, U, R>>
+    implements BiFunction<T, U, R> {
     /** */
-    protected final T delegate;
-
-    /** */
-    protected final OperationContextSnapshot snapshot;
-
-    /** */
-    @Override public T delegate() {
-        return delegate;
+    public OperationContextAwareBiFunction(BiFunction<T, U, R> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
     }
 
-    /** */
-    protected OperationContextAwareWrapper(T delegate, 
OperationContextSnapshot snapshot) {
-        this.delegate = delegate;
-        this.snapshot = snapshot;
+    /** {@inheritDoc} */
+    @Override public R apply(T t, U u) {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            return delegate.apply(t, u);
+        }
     }
 
-    /** */
-    protected static <T> T wrap(T delegate, BiFunction<T, 
OperationContextSnapshot, T> wrapper) {
-        return wrap(delegate, wrapper, false);
+    /** {@inheritDoc} */
+    @NotNull @Override public <V> BiFunction<T, U, V> andThen(@NotNull 
Function<? super R, ? extends V> after) {
+        return 
BiFunction.super.andThen(OperationContextAwareFunction.wrap(after));
     }
 
     /** */
-    protected static <T> T wrap(T delegate, BiFunction<T, 
OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
-        if (delegate == null || delegate instanceof 
OperationContextAwareWrapper)
-            return delegate;
-
-        OperationContextSnapshot snapshot = OperationContext.createSnapshot();
-
-        if (ignoreEmptyContext && snapshot == null)
-            return delegate;
-
-        return wrapper.apply(delegate, snapshot);
+    public static <T, U, R> BiFunction<T, U, R> wrap(BiFunction<T, U, R> 
delegate) {
+        return wrap(delegate, OperationContextAwareBiFunction::new);
     }
 }
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
new file mode 100644
index 00000000000..f2f0290f2f4
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Consumer;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareConsumer<T> extends 
OperationContextAwareWrapper<Consumer<T>> implements Consumer<T> {
+    /** */
+    public OperationContextAwareConsumer(Consumer<T> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Consumer<T> andThen(@NotNull Consumer<? super T> 
after) {
+        return Consumer.super.andThen(wrap(after));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(T t) {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            delegate.accept(t);
+        }
+    }
+
+    /** */
+    public static <T> Consumer<T> wrap(Consumer<T> delegate) {
+        return wrap(delegate, OperationContextAwareConsumer::new);
+    }
+}
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
new file mode 100644
index 00000000000..e12b64b9389
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Function;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareFunction<T, R> extends 
OperationContextAwareWrapper<Function<T, R>> implements Function<T, R> {
+    /** */
+    private OperationContextAwareFunction(Function<T, R> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R apply(T t) {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            return delegate.apply(t);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public <V> Function<V, R> compose(@NotNull Function<? 
super V, ? extends T> before) {
+        return Function.super.compose(wrap(before));
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public <V> Function<T, V> andThen(@NotNull Function<? 
super R, ? extends V> after) {
+        return Function.super.andThen(wrap(after));
+    }
+
+    /** */
+    public static <T, R> Function<T, R> wrap(Function<T, R> delegate) {
+        return wrap(delegate, OperationContextAwareFunction::new);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
similarity index 50%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
copy to 
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
index 5ad5a04fb89..197cc8dcd7a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
@@ -15,33 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.cache.query.reducer;
+package org.apache.ignite.internal.thread.context.function;
 
-import java.util.Comparator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.apache.ignite.lang.IgniteInClosure;
 
-/**
- * Reducer for {@code TextQuery} results.
- */
-public class TextQueryReducer<R> extends MergeSortCacheQueryReducer<R> {
+/** */
+public class OperationContextAwareInClosure<E> extends 
OperationContextAwareWrapper<IgniteInClosure<E>> implements IgniteInClosure<E> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    public TextQueryReducer(final Map<UUID, NodePageStream<R>> pageStreams) {
-        super(pageStreams);
+    public OperationContextAwareInClosure(IgniteInClosure<E> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
     }
 
     /** {@inheritDoc} */
-    @Override protected CompletableFuture<Comparator<NodePage<R>>> 
pageComparator() {
-        CompletableFuture<Comparator<NodePage<R>>> f = new 
CompletableFuture<>();
-
-        f.complete((o1, o2) -> -Float.compare(
-            ((ScoredCacheEntry<?, ?>)o1.head()).score(), ((ScoredCacheEntry<?, 
?>)o2.head()).score()));
+    @Override public void apply(E e) {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            delegate.apply(e);
+        }
+    }
 
-        return f;
+    /** */
+    public static <E> IgniteInClosure<E> wrap(IgniteInClosure<E> delefate) {
+        return wrap(delefate, OperationContextAwareInClosure::new);
     }
+
 }
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
new file mode 100644
index 00000000000..30e816bb534
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareSupplier<T> extends 
OperationContextAwareWrapper<Supplier<T>> implements Supplier<T> {
+    /** */
+    public OperationContextAwareSupplier(Supplier<T> delegate, 
OperationContextSnapshot snapshot) {
+        super(delegate, snapshot);
+    }
+
+    /** {@inheritDoc} */
+    @Override public T get() {
+        try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+            return delegate.get();
+        }
+    }
+
+    /** */
+    public static <T> Supplier<T> wrap(Supplier<T> delegate) {
+        return wrap(delegate, OperationContextAwareSupplier::new);
+    }
+}
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
index a741d3b6d3c..381eecfca09 100644
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
@@ -23,7 +23,7 @@ import 
org.apache.ignite.internal.thread.context.OperationContext;
 import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
 
 /** */
-abstract class OperationContextAwareWrapper<T> implements 
IgniteInternalWrapper<T> {
+public abstract class OperationContextAwareWrapper<T> implements 
IgniteInternalWrapper<T> {
     /** */
     protected final T delegate;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index f60ab5835ad..e3771824b5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -53,7 +53,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
@@ -120,6 +119,7 @@ import 
org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import 
org.apache.ignite.internal.sql.optimizer.affinity.PartitionClientContext;
 import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.HostAndPortRange;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2599,7 +2599,7 @@ public class JdbcThinConnection implements Connection {
      */
     private abstract class BlockingJdbcChannel {
         /** Request ID -> Jdbc result map. */
-        private Map<Long, CompletableFuture<JdbcResult>> results = new 
ConcurrentHashMap<>();
+        private Map<Long, IgniteCompletableFuture<JdbcResult>> results = new 
ConcurrentHashMap<>();
 
         /**
          * Do request in blocking style. It just call
@@ -2614,9 +2614,9 @@ public class JdbcThinConnection implements Connection {
             R res;
 
             if (isStream()) {
-                CompletableFuture<JdbcResult> resFut = new 
CompletableFuture<>();
+                IgniteCompletableFuture<JdbcResult> resFut = new 
IgniteCompletableFuture<>();
 
-                CompletableFuture<JdbcResult> oldFut = 
results.put(req.requestId(), resFut);
+                IgniteCompletableFuture<JdbcResult> oldFut = 
results.put(req.requestId(), resFut);
 
                 assert oldFut == null : "Another request with the same id is 
waiting for result.";
 
@@ -2639,7 +2639,7 @@ public class JdbcThinConnection implements Connection {
         boolean handleResult(long reqId, JdbcResult res) {
             boolean handled = false;
 
-            CompletableFuture<JdbcResult> fut = results.remove(reqId);
+            IgniteCompletableFuture<JdbcResult> fut = results.remove(reqId);
 
             if (fut != null) {
                 fut.complete(res);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
index d72d8b1e301..2a22e286e53 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryType;
@@ -38,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.apache.ignite.internal.processors.marshaller.MappedName;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -55,7 +55,7 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
     protected final Map<Integer, Set<Integer>> processed = new HashMap<>();
 
     /** Future which will be completed when task requested to be closed. Will 
be executed on system pool. */
-    protected volatile CompletableFuture<Void> closeFut;
+    protected volatile IgniteCompletableFuture<Void> closeFut;
 
     /** Snapshot file tree. */
     protected final SnapshotFileTree sft;
@@ -81,10 +81,10 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
     }
 
     /** */
-    protected abstract List<CompletableFuture<Void>> saveCacheConfigs();
+    protected abstract List<IgniteCompletableFuture<Void>> saveCacheConfigs();
 
     /** */
-    protected abstract List<CompletableFuture<Void>> saveGroup(int grpId, 
Set<Integer> grpParts) throws IgniteCheckedException;
+    protected abstract List<IgniteCompletableFuture<Void>> saveGroup(int 
grpId, Set<Integer> grpParts) throws IgniteCheckedException;
 
     /** {@inheritDoc} */
     @Override public boolean cancel() {
@@ -103,7 +103,7 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
     }
 
     /** @return Future which will be completed when operations truly stopped. 
*/
-    protected abstract CompletableFuture<Void> closeAsync();
+    protected abstract IgniteCompletableFuture<Void> closeAsync();
 
     /**
      * @return {@code true} if current task requested to be stopped.
@@ -180,7 +180,7 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
     protected void saveSnapshotData() {
         try {
             // Submit all tasks for partitions and deltas processing.
-            List<CompletableFuture<Void>> futs = new ArrayList<>();
+            List<IgniteCompletableFuture<Void>> futs = new ArrayList<>();
 
             Collection<BinaryType> binTypesCopy = cctx.kernalContext()
                 .cacheObjects()
@@ -201,7 +201,7 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
 
             int futsSize = futs.size();
 
-            CompletableFuture.allOf(futs.toArray(new 
CompletableFuture[futsSize])).whenComplete((res, t) -> {
+            IgniteCompletableFuture.allOf(futs.toArray(new 
IgniteCompletableFuture[futsSize])).whenComplete((res, t) -> {
                 assert t == null : "Exception must never be thrown since a 
wrapper is used " +
                     "for each snapshot task: " + t;
 
@@ -244,8 +244,8 @@ public abstract class AbstractCreateSnapshotFutureTask 
extends AbstractSnapshotF
     }
 
     /** */
-    protected CompletableFuture<Void> runAsync(IgniteThrowableRunner task) {
-        return CompletableFuture.runAsync(
+    protected IgniteCompletableFuture<Void> runAsync(IgniteThrowableRunner 
task) {
+        return IgniteCompletableFuture.runAsync(
             wrapExceptionIfStarted(task),
             snpSndr.executor()
         );
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index ab3e1a96029..b612861517a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -45,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.filename.Snapshot
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -326,7 +326,7 @@ public class SnapshotCheckProcess {
 
         GridFutureAdapter<SnapshotCheckResponse> phaseFut = new 
GridFutureAdapter<>();
 
-        CompletableFuture<SnapshotCheckResponse> workingFut;
+        IgniteCompletableFuture<SnapshotCheckResponse> workingFut;
 
         if (req.incrementalIndex() > 0) {
             assert !req.allRestoreHandlers() : "Snapshot handlers aren't 
supported for incremental snapshot.";
@@ -349,13 +349,13 @@ public class SnapshotCheckProcess {
     }
 
     /** @return A composed future of increment checks for each consistent id 
regarding {@link SnapshotCheckContext#metas}. */
-    private CompletableFuture<SnapshotCheckResponse> 
incrementalFuture(SnapshotCheckContext ctx) {
+    private IgniteCompletableFuture<SnapshotCheckResponse> 
incrementalFuture(SnapshotCheckContext ctx) {
         // Incremental snapshots do not support working on other topology. 
Only single meta and snapshot part can be processed.
         SnapshotMetadata meta = ctx.metas.get(0);
 
-        CompletableFuture<SnapshotCheckResponse> resFut = new 
CompletableFuture<>();
+        IgniteCompletableFuture<SnapshotCheckResponse> resFut = new 
IgniteCompletableFuture<>();
 
-        CompletableFuture<IncrementalSnapshotVerifyResult> workingFut = 
snpChecker.checkIncrementalSnapshot(
+        IgniteCompletableFuture<IncrementalSnapshotVerifyResult> workingFut = 
snpChecker.checkIncrementalSnapshot(
             ctx.locFileTree.get(meta.consistentId()),
             ctx.req.incrementalIndex(),
             ctx.totalCounter::addAndGet,
@@ -373,18 +373,18 @@ public class SnapshotCheckProcess {
     }
 
     /** @return A composed future of partitions checks for each consistent id 
regarding {@link SnapshotCheckContext#metas}. */
-    private CompletableFuture<SnapshotCheckResponse> 
partitionsHashesFuture(SnapshotCheckContext ctx) {
+    private IgniteCompletableFuture<SnapshotCheckResponse> 
partitionsHashesFuture(SnapshotCheckContext ctx) {
         // Per metas result: consistent id -> check results per partition key.
         Map<String, Map<PartitionKey, PartitionHashRecord>> perMetaResults = 
new ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
         // Per consistent id.
         Map<String, Throwable> exceptions = new 
ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
-        CompletableFuture<SnapshotCheckResponse> composedFut = new 
CompletableFuture<>();
+        IgniteCompletableFuture<SnapshotCheckResponse> composedFut = new 
IgniteCompletableFuture<>();
         AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
 
         for (SnapshotMetadata meta : ctx.metas) {
             // Run asynchronously to calculate the metric 'total partitions' 
faster.
             kctx.pools().getSnapshotExecutorService().submit(() -> {
-                CompletableFuture<SnapshotPartitionsVerifyHandlerResponse> 
metaFut = snpChecker.checkPartitions(
+                
IgniteCompletableFuture<SnapshotPartitionsVerifyHandlerResponse> metaFut = 
snpChecker.checkPartitions(
                     meta,
                     ctx.locFileTree.get(meta.consistentId()),
                     ctx.req.groups(),
@@ -417,18 +417,18 @@ public class SnapshotCheckProcess {
      * @return A composed future of all the snapshot handlers for each 
consistent id regarding {@link SnapshotCheckContext#metas}.
      * @see IgniteSnapshotManager#handlers()
      */
-    private CompletableFuture<SnapshotCheckResponse> 
allHandlersFuture(SnapshotCheckContext ctx) {
+    private IgniteCompletableFuture<SnapshotCheckResponse> 
allHandlersFuture(SnapshotCheckContext ctx) {
         // Per metas result: snapshot part's consistent id -> check result per 
handler name.
         Map<String, Map<String, SnapshotHandlerResult<Message>>> 
perMetaResults = new ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
         // Per consistent id.
         Map<String, Throwable> exceptions = new 
ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
-        CompletableFuture<SnapshotCheckResponse> composedFut = new 
CompletableFuture<>();
+        IgniteCompletableFuture<SnapshotCheckResponse> composedFut = new 
IgniteCompletableFuture<>();
         AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
 
         for (SnapshotMetadata meta : ctx.metas) {
             // Run asynchronously to calculate the metric 'total partitions' 
faster.
             kctx.pools().getSnapshotExecutorService().submit(() -> {
-                CompletableFuture<Map<String, SnapshotHandlerResult<Message>>> 
metaFut = snpChecker.invokeCustomHandlers(
+                IgniteCompletableFuture<Map<String, 
SnapshotHandlerResult<Message>>> metaFut = snpChecker.invokeCustomHandlers(
                     meta,
                     ctx.locFileTree.get(meta.consistentId()),
                     ctx.req.groups(),
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 4acb019b7cd..da9aba4ec0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -38,6 +37,7 @@ import 
org.apache.ignite.internal.management.cache.PartitionKey;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
 import 
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
@@ -63,19 +63,19 @@ public class SnapshotChecker {
     }
 
     /** Launches local metas checking. */
-    public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
+    public IgniteCompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
         SnapshotFileTree sft,
         int incIdx,
         @Nullable Collection<Integer> grpIds
     ) {
-        return CompletableFuture.supplyAsync(
+        return IgniteCompletableFuture.supplyAsync(
             new SnapshotMetadataVerificationTask(kctx.grid(), log, sft, 
incIdx, grpIds),
             executor
         );
     }
 
     /** */
-    public CompletableFuture<IncrementalSnapshotVerifyResult> 
checkIncrementalSnapshot(
+    public IgniteCompletableFuture<IncrementalSnapshotVerifyResult> 
checkIncrementalSnapshot(
         SnapshotFileTree sft,
         int incIdx,
         @Nullable Consumer<Integer> totalCnsmr,
@@ -83,7 +83,7 @@ public class SnapshotChecker {
     ) {
         assert incIdx > 0;
 
-        return CompletableFuture.supplyAsync(
+        return IgniteCompletableFuture.supplyAsync(
             new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx, 
totalCnsmr, checkedCnsmr),
             executor
         );
@@ -169,7 +169,7 @@ public class SnapshotChecker {
      *
      * @see IgniteSnapshotManager#handlers()
      */
-    public CompletableFuture<Map<String, SnapshotHandlerResult<Message>>> 
invokeCustomHandlers(
+    public IgniteCompletableFuture<Map<String, 
SnapshotHandlerResult<Message>>> invokeCustomHandlers(
         SnapshotMetadata meta,
         SnapshotFileTree sft,
         @Nullable Collection<String> grps,
@@ -179,7 +179,7 @@ public class SnapshotChecker {
     ) {
         // The handlers use or may use the same snapshot pool. If it is 
configured with 1 thread, launching waiting task in
         // the same pool might block it.
-        return CompletableFuture.supplyAsync(() -> {
+        return IgniteCompletableFuture.supplyAsync(() -> {
                 try {
                     SnapshotHandlerContext hndCnt = new SnapshotHandlerContext(
                         meta,
@@ -202,7 +202,7 @@ public class SnapshotChecker {
     }
 
     /** Launches local partitions checking. */
-    public CompletableFuture<SnapshotPartitionsVerifyHandlerResponse> 
checkPartitions(
+    public IgniteCompletableFuture<SnapshotPartitionsVerifyHandlerResponse> 
checkPartitions(
         SnapshotMetadata meta,
         SnapshotFileTree sft,
         @Nullable Collection<String> grps,
@@ -211,7 +211,7 @@ public class SnapshotChecker {
         @Nullable Consumer<Integer> totalCnsmr,
         @Nullable Consumer<Integer> checkedPartCnsmr
     ) {
-        return CompletableFuture.supplyAsync(() -> {
+        return IgniteCompletableFuture.supplyAsync(() -> {
             SnapshotHandlerContext hctx = new SnapshotHandlerContext(
                 meta,
                 grps,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 14a7d4df628..bca42981010 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerArray;
@@ -67,6 +66,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import 
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -125,7 +125,7 @@ class SnapshotFutureTask extends 
AbstractCreateSnapshotFutureTask implements Che
     private final boolean withMetaStorage;
 
     /** Checkpoint end future. */
-    private final CompletableFuture<Boolean> cpEndFut = new 
CompletableFuture<>();
+    private final IgniteCompletableFuture<Boolean> cpEndFut = new 
IgniteCompletableFuture<>();
 
     /** Future to wait until checkpoint mark phase will be finished and 
snapshot tasks scheduled. */
     private final GridFutureAdapter<Void> startedFut = new 
GridFutureAdapter<>();
@@ -379,7 +379,7 @@ class SnapshotFutureTask extends 
AbstractCreateSnapshotFutureTask implements Che
     }
 
     /** {@inheritDoc} */
-    @Override protected List<CompletableFuture<Void>> saveGroup(int grpId, 
Set<Integer> grpParts) {
+    @Override protected List<IgniteCompletableFuture<Void>> saveGroup(int 
grpId, Set<Integer> grpParts) {
         // Process partitions for a particular cache group.
         return grpParts.stream().map(partId -> {
             GroupPartitionId pair = new GroupPartitionId(grpId, partId);
@@ -438,7 +438,7 @@ class SnapshotFutureTask extends 
AbstractCreateSnapshotFutureTask implements Che
     }
 
     /** {@inheritDoc} */
-    @Override protected List<CompletableFuture<Void>> saveCacheConfigs() {
+    @Override protected List<IgniteCompletableFuture<Void>> saveCacheConfigs() 
{
         // Send configuration files of all cache groups.
         return ccfgSndrs.stream()
             .map(ccfgSndr -> runAsync(ccfgSndr::sendCacheConfig))
@@ -467,7 +467,7 @@ class SnapshotFutureTask extends 
AbstractCreateSnapshotFutureTask implements Che
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Void> closeAsync() {
+    @Override public synchronized IgniteCompletableFuture<Void> closeAsync() {
         if (closeFut == null) {
             Throwable err0 = err.get();
 
@@ -477,8 +477,10 @@ class SnapshotFutureTask extends 
AbstractCreateSnapshotFutureTask implements Che
                 .map(Map.Entry::getKey)
                 .collect(Collectors.toSet());
 
-            closeFut = CompletableFuture.runAsync(() -> onDone(new 
SnapshotFutureTaskResult(taken, snpPtr), err0),
-                cctx.kernalContext().pools().getSystemExecutorService());
+            closeFut = IgniteCompletableFuture.runAsync(
+                () -> onDone(new SnapshotFutureTaskResult(taken, snpPtr), 
err0),
+                cctx.kernalContext().pools().getSystemExecutorService()
+            );
         }
 
         return closeFut;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
index 12b589f1d6b..bd2aaf85537 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.filename.FileTree
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -106,7 +106,7 @@ public class SnapshotResponseRemoteFutureTask extends 
AbstractSnapshotFutureTask
 
             snpSndr.init(partsToSend.size());
 
-            CompletableFuture.runAsync(() -> partsToSend.forEach((gp, sinfo) 
-> {
+            IgniteCompletableFuture.runAsync(() -> partsToSend.forEach((gp, 
sinfo) -> {
                 if (err.get() != null)
                     return;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 036dcdc1312..531971a00fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -36,7 +36,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -84,6 +83,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -947,8 +947,8 @@ public class SnapshotRestoreProcess {
                     ", caches=" + F.transform(opCtx0.dirs.values(), s -> 
NodeFileTree.cacheName(s.get(0))) + ']');
             }
 
-            CompletableFuture<Void> metaFut = 
ctx.localNodeId().equals(opCtx0.opNodeId) ?
-                CompletableFuture.runAsync(
+            IgniteCompletableFuture<Void> metaFut = 
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                IgniteCompletableFuture.runAsync(
                     () -> {
                         try {
                             SnapshotMetadata meta = 
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
@@ -969,7 +969,7 @@ public class SnapshotRestoreProcess {
 
                             opCtx0.errHnd.accept(t);
                         }
-                    }, snpMgr.snapshotExecutorService()) : 
CompletableFuture.completedFuture(null);
+                    }, snpMgr.snapshotExecutorService()) : 
IgniteCompletableFuture.completedFuture(null);
 
             Map<String, GridAffinityAssignmentCache> affCache = new 
HashMap<>();
 
@@ -1139,7 +1139,7 @@ public class SnapshotRestoreProcess {
                                     return;
                                 }
 
-                                CompletableFuture.runAsync(
+                                IgniteCompletableFuture.runAsync(
                                     () -> {
                                         try {
                                             punchHole(grpId, partId, snpFile);
@@ -1169,7 +1169,7 @@ public class SnapshotRestoreProcess {
 
             opCtx0.totalParts = size;
 
-            CompletableFuture.allOf(allPartFuts.toArray(new 
CompletableFuture[size]))
+            IgniteCompletableFuture.allOf(allPartFuts.toArray(new 
IgniteCompletableFuture[size]))
                 .runAfterBothAsync(metaFut, () -> {
                     try {
                         if (opCtx0.stopChecker.getAsBoolean())
@@ -1754,7 +1754,7 @@ public class SnapshotRestoreProcess {
     ) {
         IgniteSnapshotManager snapMgr = ctx.cache().context().snapshotMgr();
 
-        CompletableFuture<Path> copyPartFut = CompletableFuture.supplyAsync(() 
-> {
+        IgniteCompletableFuture<Path> copyPartFut = 
IgniteCompletableFuture.supplyAsync(() -> {
             if (opCtx.stopChecker.getAsBoolean())
                 throw new IgniteInterruptedException("The operation has been 
stopped on copy file: " + snpFile.getAbsolutePath());
 
@@ -1774,7 +1774,8 @@ public class SnapshotRestoreProcess {
         if (opCtx.isGroupCompressed(grpId)) {
             copyPartFut = copyPartFut.thenComposeAsync(
                 p -> {
-                    CompletableFuture<Path> result = new CompletableFuture<>();
+                    IgniteCompletableFuture<Path> result = new 
IgniteCompletableFuture<>();
+
                     try {
                         punchHole(grpId, partFut.partId, tmpPartFile);
 
@@ -1970,7 +1971,7 @@ public class SnapshotRestoreProcess {
     }
 
     /** Future will be completed when partition processing ends. */
-    private static class PartitionRestoreFuture extends 
CompletableFuture<Path> {
+    private static class PartitionRestoreFuture extends 
IgniteCompletableFuture<Path> {
         /** Partition id. */
         private final int partId;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 041ccf7b1f8..674b664f169 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,6 +67,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.BasicRateLimiter;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -242,7 +242,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
     }
 
     /** {@inheritDoc} */
-    @Override protected List<CompletableFuture<Void>> saveCacheConfigs() {
+    @Override protected List<IgniteCompletableFuture<Void>> saveCacheConfigs() 
{
         return processed.keySet().stream().map(grp -> runAsync(() -> {
             CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
 
@@ -263,7 +263,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
     }
 
     /** {@inheritDoc} */
-    @Override protected List<CompletableFuture<Void>> saveGroup(int grp, 
Set<Integer> grpParts) {
+    @Override protected List<IgniteCompletableFuture<Void>> saveGroup(int grp, 
Set<Integer> grpParts) {
         long start = System.currentTimeMillis();
 
         AtomicLong entriesCnt = new AtomicLong();
@@ -277,7 +277,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
         if (log.isInfoEnabled())
             log.info("Start group dump [name=" + name + ", id=" + grp + ']');
 
-        List<CompletableFuture<Void>> futs = grpParts.stream().map(part -> 
runAsync(() -> {
+        List<IgniteCompletableFuture<Void>> futs = grpParts.stream().map(part 
-> runAsync(() -> {
             long entriesCnt0 = 0;
             long writtenEntriesCnt0 = 0;
 
@@ -330,7 +330,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
 
         int futsSize = futs.size();
 
-        CompletableFuture.allOf(futs.toArray(new 
CompletableFuture[futsSize])).whenComplete((res, t) -> {
+        IgniteCompletableFuture.allOf(futs.toArray(new 
IgniteCompletableFuture[futsSize])).whenComplete((res, t) -> {
             clearDumpListener(gctx);
 
             if (log.isInfoEnabled()) {
@@ -365,7 +365,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized CompletableFuture<Void> closeAsync() {
+    @Override protected synchronized IgniteCompletableFuture<Void> 
closeAsync() {
         if (closeFut == null) {
             dumpCtxs.values().forEach(PartitionDumpContext::close);
 
@@ -382,7 +382,7 @@ public class CreateDumpFutureTask extends 
AbstractCreateSnapshotFutureTask imple
                     taken.add(new GroupPartitionId(grp, part));
             }
 
-            closeFut = CompletableFuture.runAsync(
+            closeFut = IgniteCompletableFuture.runAsync(
                 () -> {
                     thLocBufs.clear();
                     if (encThLocBufs != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 58a58914824..09143f88b22 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +37,7 @@ import 
org.apache.ignite.internal.processors.cache.query.reducer.IndexQueryReduc
 import 
org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
 import 
org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
 import 
org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -69,7 +69,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends 
GridCacheQueryFutu
     private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
 
     /** Metadata for IndexQuery. */
-    private final CompletableFuture<IndexQueryResultMeta> idxQryMetaFut;
+    private final IgniteCompletableFuture<IndexQueryResultMeta> idxQryMetaFut;
 
     /** Query start time in nanoseconds to measure duration. */
     private final long startTimeNanos;
@@ -106,7 +106,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> 
extends GridCacheQueryFutu
         Map<UUID, NodePageStream<R>> streamsMap = 
Collections.unmodifiableMap(streams);
 
         if (qry.query().type() == INDEX) {
-            idxQryMetaFut = new CompletableFuture<>();
+            idxQryMetaFut = new IgniteCompletableFuture<>();
 
             reducer = new 
IndexQueryReducer<>(qry.query().idxQryDesc().valType(), streamsMap, cctx, 
idxQryMetaFut);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 2db3ed5263e..bbfa9960908 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -36,7 +36,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiFunction;
@@ -88,6 +87,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.task.GridInternal;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -1996,7 +1996,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         private final GridCacheQueryType type;
 
         /** Future of query result metadata. Completed when query actually 
started. */
-        private final CompletableFuture<IndexQueryResultMeta> metadata;
+        private final IgniteCompletableFuture<IndexQueryResultMeta> metadata;
 
         /** Flag shows whether first result page was delivered to user. */
         private volatile boolean sentFirst;
@@ -2010,7 +2010,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             this.type = type;
 
-            metadata = type == INDEX ? new CompletableFuture<>() : null;
+            metadata = type == INDEX ? new IgniteCompletableFuture<>() : null;
         }
 
         /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
index a8a5fefa43d..07d4fd6b9d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
@@ -19,9 +19,9 @@ package 
org.apache.ignite.internal.processors.cache.query.reducer;
 
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 
 /**
@@ -50,7 +50,7 @@ public abstract class CacheQueryReducer<T> extends 
GridIteratorAdapter<T> {
      * @return Object that completed the specified future.
      * @throws IgniteCheckedException for all failures.
      */
-    public static <T> T get(CompletableFuture<?> fut) throws 
IgniteCheckedException {
+    public static <T> T get(IgniteCompletableFuture<?> fut) throws 
IgniteCheckedException {
         try {
             return (T)fut.get();
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
index 35d069f3852..f8a9162cc43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
@@ -22,7 +22,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
@@ -36,6 +35,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.lang.IgniteBiTuple;
 
 /**
@@ -46,7 +46,7 @@ public class IndexQueryReducer<R> extends 
MergeSortCacheQueryReducer<R> {
     private static final long serialVersionUID = 0L;
 
     /** Future that will be completed with first page response. */
-    private final CompletableFuture<IndexQueryResultMeta> metaFut;
+    private final IgniteCompletableFuture<IndexQueryResultMeta> metaFut;
 
     /** */
     private final String valType;
@@ -59,7 +59,7 @@ public class IndexQueryReducer<R> extends 
MergeSortCacheQueryReducer<R> {
         final String valType,
         final Map<UUID, NodePageStream<R>> pageStreams,
         final GridCacheContext<?, ?> cctx,
-        final CompletableFuture<IndexQueryResultMeta> meta
+        final IgniteCompletableFuture<IndexQueryResultMeta> meta
     ) {
         super(pageStreams);
 
@@ -69,7 +69,7 @@ public class IndexQueryReducer<R> extends 
MergeSortCacheQueryReducer<R> {
     }
 
     /** {@inheritDoc} */
-    @Override protected CompletableFuture<Comparator<NodePage<R>>> 
pageComparator() {
+    @Override protected IgniteCompletableFuture<Comparator<NodePage<R>>> 
pageComparator() {
         return metaFut.thenApply(m -> {
             Map<String, IndexKeyDefinition> keyDefs = m.keyDefinitions();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
index 3024a07d31a..b3a5a7e4ad2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 
 /**
  * Reducer of cache query results that sort result through all nodes. Note 
that it's assumed that every node
@@ -48,7 +48,7 @@ abstract class MergeSortCacheQueryReducer<R> extends 
CacheQueryReducer<R> {
     }
 
     /** @return Comparator for pages from nodes. */
-    protected abstract CompletableFuture<Comparator<NodePage<R>>> 
pageComparator();
+    protected abstract IgniteCompletableFuture<Comparator<NodePage<R>>> 
pageComparator();
 
     /** {@inheritDoc} */
     @Override public boolean hasNextX() throws IgniteCheckedException {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
index b6cfb08fbbe..76b524afad3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
@@ -19,7 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.query.reducer;
 
 import java.util.Collection;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 
 /**
  * This class provides an interface {@link #headPage()} that returns a future 
will be completed with {@link NodePage}
@@ -39,7 +39,7 @@ public class NodePageStream<R> {
     private boolean hasRemotePages = true;
 
     /** Promise to notify the stream consumer about delivering new page. */
-    private CompletableFuture<NodePage<R>> head = new CompletableFuture<>();
+    private IgniteCompletableFuture<NodePage<R>> head = new 
IgniteCompletableFuture<>();
 
     /** */
     public NodePageStream(UUID nodeId, Runnable reqPages, Runnable 
cancelPages) {
@@ -58,7 +58,7 @@ public class NodePageStream<R> {
      *
      * @return Future that will be completed with query result page.
      */
-    public synchronized CompletableFuture<NodePage<R>> headPage() {
+    public synchronized IgniteCompletableFuture<NodePage<R>> headPage() {
         return head;
     }
 
@@ -78,7 +78,7 @@ public class NodePageStream<R> {
                 if (!reqNext) {
                     synchronized (NodePageStream.this) {
                         if (hasRemotePages) {
-                            head = new CompletableFuture<>();
+                            head = new IgniteCompletableFuture<>();
 
                             reqPages.run();
                         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
index 5ad5a04fb89..f4f24384ff1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
@@ -20,8 +20,8 @@ package 
org.apache.ignite.internal.processors.cache.query.reducer;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 
 /**
  * Reducer for {@code TextQuery} results.
@@ -36,8 +36,8 @@ public class TextQueryReducer<R> extends 
MergeSortCacheQueryReducer<R> {
     }
 
     /** {@inheritDoc} */
-    @Override protected CompletableFuture<Comparator<NodePage<R>>> 
pageComparator() {
-        CompletableFuture<Comparator<NodePage<R>>> f = new 
CompletableFuture<>();
+    @Override protected IgniteCompletableFuture<Comparator<NodePage<R>>> 
pageComparator() {
+        IgniteCompletableFuture<Comparator<NodePage<R>>> f = new 
IgniteCompletableFuture<>();
 
         f.complete((o1, o2) -> -Float.compare(
             ((ScoredCacheEntry<?, ?>)o1.head()).score(), ((ScoredCacheEntry<?, 
?>)o2.head()).score()));
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
index 20ff484b8ee..4e502d95c84 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
@@ -20,8 +20,8 @@ package 
org.apache.ignite.internal.processors.cache.query.reducer;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 
 /**
  * Reducer of cache query results, no ordering of results is provided.
@@ -33,14 +33,14 @@ public class UnsortedCacheQueryReducer<R> extends 
CacheQueryReducer<R> {
     /** Current page to return data to user. */
     private NodePage<R> page;
 
-    /** Pending futures for requeseted pages. */
-    private final CompletableFuture<NodePage<R>>[] futs;
+    /** Pending futures for requested pages. */
+    private final IgniteCompletableFuture<NodePage<R>>[] futs;
 
     /** */
     public UnsortedCacheQueryReducer(Map<UUID, NodePageStream<R>> pageStreams) 
{
         super(pageStreams);
 
-        futs = new CompletableFuture[pageStreams.size()];
+        futs = new IgniteCompletableFuture[pageStreams.size()];
     }
 
     /** {@inheritDoc} */
@@ -52,7 +52,7 @@ public class UnsortedCacheQueryReducer<R> extends 
CacheQueryReducer<R> {
                 if (s.closed())
                     continue;
 
-                CompletableFuture<NodePage<R>> f = s.headPage();
+                IgniteCompletableFuture<NodePage<R>> f = s.headPage();
 
                 if (f.isDone()) {
                     page = get(f);
@@ -67,11 +67,11 @@ public class UnsortedCacheQueryReducer<R> extends 
CacheQueryReducer<R> {
             if (pendingNodesCnt == 0)
                 return false;
 
-            CompletableFuture[] pendingFuts = Arrays.copyOf(futs, 
pendingNodesCnt);
+            IgniteCompletableFuture<?>[] pendingFuts = Arrays.copyOf(futs, 
pendingNodesCnt);
 
             Arrays.fill(futs, 0, pendingNodesCnt, null);
 
-            page = get(CompletableFuture.anyOf(pendingFuts));
+            page = get(IgniteCompletableFuture.anyOf(pendingFuts));
         }
 
         return true;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index 2fa811ea071..7d3db0d5d19 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.metastorage.persistence;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
@@ -31,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -76,7 +76,7 @@ public class DmsDataWriterWorker extends GridWorker {
      * This task is used to pause processing of the {@code updateQueue}. If 
this task completed it means that all the updates
      * prior to it already flushed to the local metastorage.
      */
-    private volatile Future<?> suspendFut = 
CompletableFuture.completedFuture(AWAIT);
+    private volatile Future<?> suspendFut = 
IgniteCompletableFuture.completedFuture(AWAIT);
 
     /** */
     public DmsDataWriterWorker(
@@ -117,7 +117,7 @@ public class DmsDataWriterWorker extends GridWorker {
      */
     public void suspend(IgniteInternalFuture<?> compFut) {
         if (isCancelled())
-            suspendFut = CompletableFuture.completedFuture(AWAIT);
+            suspendFut = IgniteCompletableFuture.completedFuture(AWAIT);
         else {
             latch = new CountDownLatch(1);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
index 0c1021a4bcf..2b0b7295524 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.stat;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.IgniteLogger;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
 import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -152,10 +152,10 @@ public class BusyExecutor {
      * @param r Task to execute.
      * @return Completable future with executed flag in result.
      */
-    public CompletableFuture<Boolean> submit(Runnable r) {
+    public IgniteCompletableFuture<Boolean> submit(Runnable r) {
         GridBusyLock lock = busyLock;
 
-        CompletableFuture<Boolean> res = new CompletableFuture<>();
+        IgniteCompletableFuture<Boolean> res = new IgniteCompletableFuture<>();
 
         if (r instanceof CancellableTask) {
             CancellableTask ct = (CancellableTask)r;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
index 4df521d39d0..77ae9837533 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.query.stat;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -52,7 +52,7 @@ public class LocalStatisticsGatheringContext {
     private final AffinityTopologyVersion topVer;
 
     /** Future with success status as a result. */
-    private final CompletableFuture<Void> future;
+    private final IgniteCompletableFuture<Void> future;
 
     /** Context cancelled flag. */
     private volatile boolean cancelled;
@@ -81,7 +81,7 @@ public class LocalStatisticsGatheringContext {
         this.remainingParts = new HashSet<>(remainingParts);
         this.allParts = (forceRecollect) ? null : new 
HashSet<>(remainingParts);
         this.topVer = topVer;
-        this.future = new CompletableFuture<>();
+        this.future = new IgniteCompletableFuture<>();
     }
 
     /**
@@ -173,7 +173,7 @@ public class LocalStatisticsGatheringContext {
     /**
      * @return Gathering completable future.
      */
-    public CompletableFuture<Void> future() {
+    public IgniteCompletableFuture<Void> future() {
         return future;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f477c8b4da9..c932a9c3f8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -28,6 +28,7 @@ import 
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.thread.context.function.OperationContextAwareInClosure;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -79,7 +80,9 @@ public class GridFutureAdapter<R> implements 
IgniteInternalFuture<R> {
          * @param val Node value.
          */
         Node(Object val) {
-            this.val = val;
+            this.val = val instanceof Thread
+                ? val
+                : OperationContextAwareInClosure.wrap((IgniteInClosure<?>)val);
         }
     }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index c6d1a7376bd..e67b307884f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -18,19 +18,35 @@
 package org.apache.ignite.internal.thread.context;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.thread.pool.IgniteForkJoinPool;
 import 
org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -486,7 +502,7 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
 
         BiConsumerX<String, Integer> checks = (s, i) -> pool.execute(new 
AttributeValueChecker(s, i), 1);
 
-        createAttributeChecks(checks);
+        execute(checks);
 
         AttributeValueChecker.assertAllCreatedChecksPassed();
     }
@@ -506,18 +522,18 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
         ));
 
         BiConsumerX<String, Integer> checks = (s, i) -> {
-            pool.execute( new AttributeValueChecker(s, i));
+            pool.execute(new AttributeValueChecker(s, i));
             pool.execute(1, new AttributeValueChecker(s, i));
         };
 
-        createAttributeChecks(checks);
+        execute(checks);
 
         AttributeValueChecker.assertAllCreatedChecksPassed();
     }
 
     /** */
     @Test
-    public void testThreadContextAwareScheduledThreadPoolExecutor() throws 
Exception {
+    public void testOperationContextAwareScheduledThreadPoolExecutor() throws 
Exception {
         IgniteScheduledThreadPoolExecutor pool = deferShutdown(new 
IgniteScheduledThreadPoolExecutor("test", "test", 1));
 
         doContextAwareExecutorServiceTest(pool);
@@ -531,7 +547,7 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
             pool.scheduleWithFixedDelay(new AttributeValueChecker(s, i), 100, 
100, MILLISECONDS);
         };
 
-        createAttributeChecks(checks);
+        execute(checks);
 
         poolUnblockedLatch.countDown();
 
@@ -540,16 +556,126 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
 
     /** */
     @Test
-    public void testThreadContextAwareForkJoinCommonPool() throws Exception {
+    public void testOperationContextAwareForkJoinCommonPool() throws Exception 
{
         doContextAwareExecutorServiceTest(IgniteForkJoinPool.commonPool());
     }
 
     /** */
     @Test
-    public void testThreadContextAwareForkJoinPool() throws Exception {
+    public void testOperationContextAwareForkJoinPool() throws Exception {
         doContextAwareExecutorServiceTest(deferShutdown(new 
IgniteForkJoinPool("test", "test", 2, null, false)));
     }
 
+    /** */
+    @Test
+    public void testGridFutureAdapterContextPropagation() throws Exception {
+        GridFutureAdapter<Integer> fut = new GridFutureAdapter<>();
+
+        BiConsumerX<String, Integer> checks = (s, i) -> {
+            fut.listen(new AttributeValueChecker(s, i));
+            fut.listen(AttributeValueChecker.createInClosure(s, i));
+            fut.chain(AttributeValueChecker.createClosure(s, i))
+                .chain(AttributeValueChecker.createClosure(s, i), 
IgniteForkJoinPool.commonPool())
+                .chain(AttributeValueChecker.createOutClosure(s, i))
+                .chain(AttributeValueChecker.createOutClosure(s, i), 
IgniteForkJoinPool.commonPool())
+                .chainCompose(AttributeValueChecker.createComposeClosure(s, i))
+                .chainCompose(AttributeValueChecker.createComposeClosure(s, 
i), IgniteForkJoinPool.commonPool());
+        };
+
+        execute(checks);
+
+        try (Scope ignored = OperationContext.set(STR_ATTR, "test", INT_ATTR, 
5)) {
+            checkAttributeValues("test", 5);
+
+            fut.onDone(0);
+
+            checkAttributeValues("test", 5);
+        }
+
+        AttributeValueChecker.assertAllCreatedChecksPassed();
+    }
+
+    /** */
+    @Test
+    public void testCompletableFutureContextPropagation() throws Exception {
+        IgniteCompletableFuture<Integer> fut = new IgniteCompletableFuture<>();
+        IgniteCompletableFuture<Integer> failedFut = new 
IgniteCompletableFuture<>();
+        IgniteCompletableFuture<Integer> testCompletionStage = new 
IgniteCompletableFuture<>();
+
+        IgniteCompletableFuture<Void> allFut = 
IgniteCompletableFuture.allOf(fut, testCompletionStage);
+        IgniteCompletableFuture<Object> anyFut = 
IgniteCompletableFuture.anyOf(fut, testCompletionStage);
+
+        BiConsumerX<String, Integer> checks = (s, i) -> {
+            
fut.thenCompose(AttributeValueChecker.createCompletableStageFactory(s, i))
+                
.thenComposeAsync(AttributeValueChecker.createCompletableStageFactory(s, i))
+                
.thenComposeAsync(AttributeValueChecker.createCompletableStageFactory(s, i), 
ForkJoinPool.commonPool())
+                .thenApply(AttributeValueChecker.createFunction(s, i))
+                .thenApplyAsync(AttributeValueChecker.createFunction(s, i))
+                .thenApplyAsync(AttributeValueChecker.createFunction(s, i), 
ForkJoinPool.commonPool())
+                .whenComplete(AttributeValueChecker.createBiConsumer(s, i))
+                .whenCompleteAsync(AttributeValueChecker.createBiConsumer(s, 
i))
+                .whenCompleteAsync(AttributeValueChecker.createBiConsumer(s, 
i), ForkJoinPool.commonPool())
+                .thenCombine(testCompletionStage, 
AttributeValueChecker.createBiFunction(s, i))
+                .thenCombineAsync(testCompletionStage, 
AttributeValueChecker.createBiFunction(s, i))
+                .thenCombineAsync(testCompletionStage, 
AttributeValueChecker.createBiFunction(s, i), ForkJoinPool.commonPool())
+                .applyToEither(testCompletionStage, 
AttributeValueChecker.createFunction(s, i))
+                .applyToEitherAsync(testCompletionStage, 
AttributeValueChecker.createFunction(s, i))
+                .applyToEitherAsync(testCompletionStage, 
AttributeValueChecker.createFunction(s, i), ForkJoinPool.commonPool())
+                .handle(AttributeValueChecker.createBiFunction(s, i))
+                .handleAsync(AttributeValueChecker.createBiFunction(s, i))
+                .handleAsync(AttributeValueChecker.createBiFunction(s, i), 
ForkJoinPool.commonPool());
+
+            fut.thenAccept(AttributeValueChecker.createConsumer(s, i));
+            fut.thenAcceptAsync(AttributeValueChecker.createConsumer(s, i));
+            fut.thenAcceptAsync(AttributeValueChecker.createConsumer(s, i), 
ForkJoinPool.commonPool());
+
+            fut.thenRun(new AttributeValueChecker(s, i));
+            fut.thenRunAsync(new AttributeValueChecker(s, i));
+            fut.thenRunAsync(new AttributeValueChecker(s, i), 
ForkJoinPool.commonPool());
+
+            fut.thenAcceptBoth(testCompletionStage, 
AttributeValueChecker.createBiConsumer(s, i));
+            fut.thenAcceptBothAsync(testCompletionStage, 
AttributeValueChecker.createBiConsumer(s, i));
+            fut.thenAcceptBothAsync(testCompletionStage, 
AttributeValueChecker.createBiConsumer(s, i), ForkJoinPool.commonPool());
+
+            fut.runAfterBoth(testCompletionStage, new AttributeValueChecker(s, 
i));
+            fut.runAfterBothAsync(testCompletionStage, new 
AttributeValueChecker(s, i));
+            fut.runAfterBothAsync(testCompletionStage, new 
AttributeValueChecker(s, i), ForkJoinPool.commonPool());
+
+            fut.acceptEither(testCompletionStage, 
AttributeValueChecker.createConsumer(s, i));
+            fut.acceptEitherAsync(testCompletionStage, 
AttributeValueChecker.createConsumer(s, i));
+            fut.acceptEitherAsync(testCompletionStage, 
AttributeValueChecker.createConsumer(s, i), ForkJoinPool.commonPool());
+
+            fut.runAfterEither(testCompletionStage, new 
AttributeValueChecker(s, i));
+            fut.runAfterEitherAsync(testCompletionStage, new 
AttributeValueChecker(s, i));
+            fut.runAfterEitherAsync(testCompletionStage, new 
AttributeValueChecker(s, i), ForkJoinPool.commonPool());
+
+            failedFut.exceptionally(AttributeValueChecker.createFunction(s, 
i));
+
+            IgniteCompletableFuture.runAsync(new AttributeValueChecker(s, i));
+            IgniteCompletableFuture.runAsync(new AttributeValueChecker(s, i), 
ForkJoinPool.commonPool());
+
+            
IgniteCompletableFuture.supplyAsync(AttributeValueChecker.createSupplier(s, i));
+            
IgniteCompletableFuture.supplyAsync(AttributeValueChecker.createSupplier(s, i), 
ForkJoinPool.commonPool());
+        };
+
+        execute(checks);
+
+        try (Scope ignored = OperationContext.set(STR_ATTR, "test", INT_ATTR, 
5)) {
+            checkAttributeValues("test", 5);
+
+            fut.complete(0);
+            failedFut.completeExceptionally(new IgniteException());
+            testCompletionStage.complete(0);
+
+            checkAttributeValues("test", 5);
+        }
+
+        AttributeValueChecker.assertAllCreatedChecksPassed();
+
+        anyFut.get(getTestTimeout(), MILLISECONDS);
+        allFut.get(getTestTimeout(), MILLISECONDS);
+    }
+
     /** */
     private void doContextAwareExecutorServiceTest(ExecutorService pool) 
throws Exception {
         CountDownLatch poolUnblockedLatch = blockPool(pool);
@@ -567,11 +693,11 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
             pool.invokeAll(List.of((Callable<Integer>)new 
AttributeValueChecker(s, i)), 1000, MILLISECONDS);
         };
 
-        createAttributeChecks(asyncChecks);
+        execute(asyncChecks);
 
         poolUnblockedLatch.countDown();
 
-        createAttributeChecks(syncChecks);
+        execute(syncChecks);
 
         AttributeValueChecker.assertAllCreatedChecksPassed();
     }
@@ -600,20 +726,20 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    private void createAttributeChecks(BiConsumerX<String, Integer> 
checkGenerator) throws Exception {
+    private void execute(BiConsumerX<String, Integer> checks) throws Exception 
{
         try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR, 
1)) {
-            checkGenerator.accept("test1", 1);
+            checks.accept("test1", 1);
         }
 
         try (Scope ignored = OperationContext.set(INT_ATTR, 2)) {
-            checkGenerator.accept(DFLT_STR_VAL, 2);
+            checks.accept(DFLT_STR_VAL, 2);
         }
 
         try (Scope ignored = OperationContext.set(STR_ATTR, "test2")) {
-            checkGenerator.accept("test2", DFLT_INT_VAL);
+            checks.accept("test2", DFLT_INT_VAL);
         }
 
-        checkGenerator.accept(DFLT_STR_VAL, DFLT_INT_VAL);
+        checks.accept(DFLT_STR_VAL, DFLT_INT_VAL);
     }
 
     /** */
@@ -623,20 +749,23 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    private static class AttributeValueChecker extends CompletableFuture<Void> 
implements Runnable, Callable<Integer> {
+    private static class AttributeValueChecker extends CompletableFuture<Void> 
implements IgniteRunnable, Callable<Integer> {
         /** */
-        static final List<AttributeValueChecker> CHECKS = new ArrayList<>();
+        private static final long serialVersionUID = 0L;
 
         /** */
-        private final String strAttrVal;
+        static final List<AttributeValueChecker> CHECKS = 
Collections.synchronizedList(new ArrayList<>());
 
         /** */
-        private final Integer intAttrVal;
+        private final String expStrAttrVal;
 
         /** */
-        public AttributeValueChecker(String strAttrVal, Integer intAttrVal) {
-            this.strAttrVal = strAttrVal;
-            this.intAttrVal = intAttrVal;
+        private final Integer expIntAttrVal;
+
+        /** */
+        public AttributeValueChecker(String expStrAttrVal, Integer 
expIntAttrVal) {
+            this.expStrAttrVal = expStrAttrVal;
+            this.expIntAttrVal = expIntAttrVal;
 
             CHECKS.add(this);
         }
@@ -644,7 +773,7 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override public void run() {
             try {
-                checkAttributeValues(strAttrVal, intAttrVal);
+                checkAttributeValues(expStrAttrVal, expIntAttrVal);
 
                 complete(null);
             }
@@ -666,6 +795,107 @@ public class OperationContextAttributesTest extends 
GridCommonAbstractTest {
                 check.get(1000, MILLISECONDS);
             }
         }
+
+        /** */
+        static IgniteClosure<IgniteInternalFuture<Integer>, Integer> 
createClosure(String strAttrVal, int intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return fut -> {
+                checker.run();
+
+                return 0;
+            };
+        }
+
+        /** */
+        static IgniteClosure<IgniteInternalFuture<Integer>, 
IgniteInternalFuture<Integer>> createComposeClosure(
+            String strAttrVal,
+            int intAttrVal
+        ) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return fut -> {
+                checker.run();
+
+                return fut;
+            };
+        }
+
+        /** */
+        static IgniteInClosure<IgniteInternalFuture<Integer>> 
createInClosure(String strAttrVal, int intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return fut -> checker.run();
+        }
+
+        /** */
+        static IgniteOutClosure<Integer> createOutClosure(String strAttrVal, 
int intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return () -> {
+                checker.run();
+
+                return 0;
+            };
+        }
+
+        /** */
+        static <T> BiFunction<Integer, T, Integer> createBiFunction(String 
strAttrVal, int intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return (r, t) -> {
+                checker.run();
+
+                return 0;
+            };
+        }
+
+        /** */
+        static <T> Function<T, Integer> createFunction(String strAttrVal, int 
intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return a -> {
+                checker.run();
+
+                return 0;
+            };
+        }
+
+        /** */
+        static Function<Integer, CompletionStage<Integer>> 
createCompletableStageFactory(String strAttrVal, int intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return a -> {
+                checker.run();
+
+                return IgniteCompletableFuture.completedFuture(0);
+            };
+        }
+
+        /** */
+        static Supplier<Integer> createSupplier(String strAttrVal, int 
intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return () -> {
+                checker.run();
+
+                return 0;
+            };
+        }
+
+        /** */
+        static Consumer<Integer> createConsumer(String strAttrVal, int 
intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return a -> checker.run();
+        }
+
+        /** */
+        static <T, R> BiConsumer<T, R> createBiConsumer(String strAttrVal, int 
intAttrVal) {
+            AttributeValueChecker checker = new 
AttributeValueChecker(strAttrVal, intAttrVal);
+
+            return (r, t) -> checker.run();
+        }
     }
 
     /** */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
index 7a9faef81ba..141b7a34401 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.stat;
 
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import 
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
 import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -125,7 +125,7 @@ public class BusyExecutorTest extends 
GridCommonAbstractTest {
         be.activate();
 
         be.execute(taskExec);
-        CompletableFuture<Boolean> submitFut = be.submit(taskSubmit);
+        IgniteCompletableFuture<Boolean> submitFut = be.submit(taskSubmit);
         be.execute(cancellableTask);
 
         Thread.sleep(TIME_TO_START_THREAD);
@@ -155,7 +155,7 @@ public class BusyExecutorTest extends 
GridCommonAbstractTest {
         BusyExecutor be = new BusyExecutor("testActivateDeactivate", pool, () 
-> false, c -> log);
         be.activate();
 
-        CompletableFuture<Boolean> futures[] = new CompletableFuture[100];
+        IgniteCompletableFuture<Boolean> futures[] = new 
IgniteCompletableFuture[100];
         CountDownLatch executed = new CountDownLatch(futures.length * 2);
 
         for (int i = 0; i < futures.length; i++) {

Reply via email to