This is an automated email from the ASF dual-hosted git repository.
petrov-mg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bf1327c6a21 IGNITE-26776 Operation Context propagation is integrated
into GridFutureAdapter and CompletableFuture (#12455)
bf1327c6a21 is described below
commit bf1327c6a21a20d28852d6be4c2a5909712df9fa
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Apr 28 17:38:53 2026 +0300
IGNITE-26776 Operation Context propagation is integrated into
GridFutureAdapter and CompletableFuture (#12455)
---
checkstyle/checkstyle.xml | 6 +
.../concurrent/IgniteCompletableFuture.java | 408 +++++++++++++++++++++
.../function/OperationContextAwareBiConsumer.java | 49 +++
...r.java => OperationContextAwareBiFunction.java} | 45 +--
.../function/OperationContextAwareConsumer.java | 49 +++
.../function/OperationContextAwareFunction.java | 54 +++
.../function/OperationContextAwareInClosure.java} | 36 +-
.../function/OperationContextAwareSupplier.java | 43 +++
.../function/OperationContextAwareWrapper.java | 2 +-
.../internal/jdbc/thin/JdbcThinConnection.java | 10 +-
.../snapshot/AbstractCreateSnapshotFutureTask.java | 18 +-
.../persistence/snapshot/SnapshotCheckProcess.java | 22 +-
.../persistence/snapshot/SnapshotChecker.java | 18 +-
.../persistence/snapshot/SnapshotFutureTask.java | 16 +-
.../snapshot/SnapshotResponseRemoteFutureTask.java | 4 +-
.../snapshot/SnapshotRestoreProcess.java | 19 +-
.../snapshot/dump/CreateDumpFutureTask.java | 14 +-
.../query/GridCacheDistributedQueryFuture.java | 6 +-
.../cache/query/GridCacheQueryManager.java | 6 +-
.../cache/query/reducer/CacheQueryReducer.java | 4 +-
.../cache/query/reducer/IndexQueryReducer.java | 8 +-
.../query/reducer/MergeSortCacheQueryReducer.java | 4 +-
.../cache/query/reducer/NodePageStream.java | 8 +-
.../cache/query/reducer/TextQueryReducer.java | 6 +-
.../query/reducer/UnsortedCacheQueryReducer.java | 14 +-
.../persistence/DmsDataWriterWorker.java | 6 +-
.../processors/query/stat/BusyExecutor.java | 6 +-
.../stat/LocalStatisticsGatheringContext.java | 8 +-
.../internal/util/future/GridFutureAdapter.java | 5 +-
.../context/OperationContextAttributesTest.java | 274 ++++++++++++--
.../processors/query/stat/BusyExecutorTest.java | 6 +-
31 files changed, 1005 insertions(+), 169 deletions(-)
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 3b248ed7919..6775611e16c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -218,6 +218,12 @@
<property name="className"
value="java.util.concurrent.ScheduledThreadPoolExecutor"/>
<property name="substitutionClassName"
value="org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor"/>
</module>
+
+ <module
name="org.apache.ignite.tools.checkstyle.ClassUsageRestrictionRule">
+ <property name="className"
value="java.util.concurrent.CompletableFuture"/>
+ <property name="factoryMethods" value="allOf, anyOf, supplyAsync,
runAsync, completedFuture"/>
+ <property name="substitutionClassName"
value="org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture"/>
+ </module>
</module>
<!--
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
new file mode 100644
index 00000000000..496d5adfd3d
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/concurrent/IgniteCompletableFuture.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.concurrent;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareBiConsumer;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareBiFunction;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareConsumer;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareFunction;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareSupplier;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class IgniteCompletableFuture<T> implements Future<T>,
CompletionStage<T> {
+ /** */
+ private final CompletableFuture<T> delegate;
+
+ /** */
+ public IgniteCompletableFuture() {
+ delegate = new CompletableFuture<>();
+ }
+
+ /** */
+ private IgniteCompletableFuture(CompletableFuture<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> thenApply(Function<? super
T, ? extends U> fn) {
+ return
wrap(delegate.thenApply(OperationContextAwareFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> thenApplyAsync(Function<?
super T, ? extends U> fn) {
+ return
wrap(delegate.thenApplyAsync(OperationContextAwareFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> thenApplyAsync(Function<?
super T, ? extends U> fn, Executor executor) {
+ return
wrap(delegate.thenApplyAsync(OperationContextAwareFunction.wrap(fn), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenAccept(Consumer<? super
T> action) {
+ return
wrap(delegate.thenAccept(OperationContextAwareConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenAcceptAsync(Consumer<?
super T> action) {
+ return
wrap(delegate.thenAcceptAsync(OperationContextAwareConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenAcceptAsync(Consumer<?
super T> action, Executor executor) {
+ return
wrap(delegate.thenAcceptAsync(OperationContextAwareConsumer.wrap(action),
executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenRun(Runnable action) {
+ return
wrap(delegate.thenRun(OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenRunAsync(Runnable
action) {
+ return
wrap(delegate.thenRunAsync(OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> thenRunAsync(Runnable
action, Executor executor) {
+ return
wrap(delegate.thenRunAsync(OperationContextAwareRunnable.wrap(action),
executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U, V> IgniteCompletableFuture<V> thenCombine(
+ CompletionStage<? extends U> other,
+ BiFunction<? super T, ? super U, ? extends V> fn
+ ) {
+ return wrap(delegate.thenCombine(unwrap(other),
OperationContextAwareBiFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U, V> IgniteCompletableFuture<V> thenCombineAsync(
+ CompletionStage<? extends U> other,
+ BiFunction<? super T, ? super U, ? extends V> fn
+ ) {
+ return wrap(delegate.thenCombineAsync(unwrap(other),
OperationContextAwareBiFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U, V> IgniteCompletableFuture<V> thenCombineAsync(
+ CompletionStage<? extends U> other,
+ BiFunction<? super T, ? super U, ? extends V> fn,
+ Executor executor
+ ) {
+ return wrap(delegate.thenCombineAsync(unwrap(other),
OperationContextAwareBiFunction.wrap(fn), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<Void> thenAcceptBoth(
+ CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action
+ ) {
+ return wrap(delegate.thenAcceptBoth(unwrap(other),
OperationContextAwareBiConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<Void> thenAcceptBothAsync(
+ CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action
+ ) {
+ return wrap(delegate.thenAcceptBoth(unwrap(other),
OperationContextAwareBiConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<Void> thenAcceptBothAsync(
+ CompletionStage<? extends U> other,
+ BiConsumer<? super T, ? super U> action,
+ Executor executor
+ ) {
+ return wrap(delegate.thenAcceptBothAsync(unwrap(other),
OperationContextAwareBiConsumer.wrap(action), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterBoth(CompletionStage<?> other, Runnable action) {
+ return wrap(delegate.runAfterBoth(unwrap(other),
OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action) {
+ return wrap(delegate.runAfterBothAsync(unwrap(other),
OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
{
+ return wrap(delegate.runAfterBothAsync(unwrap(other),
OperationContextAwareRunnable.wrap(action), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U>
applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
+ return wrap(delegate.applyToEither(unwrap(other),
OperationContextAwareFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U>
applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U>
fn) {
+ return wrap(delegate.applyToEitherAsync(unwrap(other),
OperationContextAwareFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> applyToEitherAsync(
+ CompletionStage<? extends T> other,
+ Function<? super T, U> fn,
+ Executor executor
+ ) {
+ return wrap(delegate.applyToEitherAsync(unwrap(other),
OperationContextAwareFunction.wrap(fn), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
+ return wrap(delegate.acceptEither(unwrap(other),
OperationContextAwareConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T>
action) {
+ return wrap(delegate.acceptEitherAsync(unwrap(other),
OperationContextAwareConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void> acceptEitherAsync(
+ CompletionStage<? extends T> other,
+ Consumer<? super T> action,
+ Executor executor
+ ) {
+ return wrap(delegate.acceptEitherAsync(unwrap(other),
OperationContextAwareConsumer.wrap(action), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterEither(CompletionStage<?> other, Runnable action) {
+ return wrap(delegate.runAfterEither(unwrap(other),
OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
+ return wrap(delegate.runAfterEitherAsync(unwrap(other),
OperationContextAwareRunnable.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<Void>
runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor
executor) {
+ return wrap(delegate.runAfterEitherAsync(unwrap(other),
OperationContextAwareRunnable.wrap(action), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> thenCompose(Function<?
super T, ? extends CompletionStage<U>> fn) {
+ return
wrap(delegate.thenCompose(OperationContextAwareCompletionStageFactory.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U>
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
+ return
wrap(delegate.thenComposeAsync(OperationContextAwareCompletionStageFactory.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> thenComposeAsync(
+ Function<? super T, ? extends CompletionStage<U>> fn,
+ Executor executor
+ ) {
+ return
wrap(delegate.thenComposeAsync(OperationContextAwareCompletionStageFactory.wrap(fn),
executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<T> whenComplete(BiConsumer<?
super T, ? super Throwable> action) {
+ return
wrap(delegate.whenComplete(OperationContextAwareBiConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<T> whenCompleteAsync(BiConsumer<?
super T, ? super Throwable> action) {
+ return
wrap(delegate.whenCompleteAsync(OperationContextAwareBiConsumer.wrap(action)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<T> whenCompleteAsync(BiConsumer<?
super T, ? super Throwable> action, Executor executor) {
+ return
wrap(delegate.whenCompleteAsync(OperationContextAwareBiConsumer.wrap(action),
executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> handle(BiFunction<? super
T, Throwable, ? extends U> fn) {
+ return wrap(delegate.handle(OperationContextAwareBiFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> handleAsync(BiFunction<?
super T, Throwable, ? extends U> fn) {
+ return
wrap(delegate.handleAsync(OperationContextAwareBiFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <U> IgniteCompletableFuture<U> handleAsync(BiFunction<?
super T, Throwable, ? extends U> fn, Executor executor) {
+ return
wrap(delegate.handleAsync(OperationContextAwareBiFunction.wrap(fn), executor));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompletableFuture<T>
exceptionally(Function<Throwable, ? extends T> fn) {
+ return
wrap(delegate.exceptionally(OperationContextAwareFunction.wrap(fn)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<T> toCompletableFuture() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel(boolean mayInterruptIfRunning) {
+ return delegate.cancel(mayInterruptIfRunning);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isCancelled() {
+ return delegate.isCancelled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDone() {
+ return delegate.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T get() throws InterruptedException, ExecutionException {
+ return delegate.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T get(long timeout, @NotNull TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
+ return delegate.get(timeout, unit);
+ }
+
+ /** */
+ public T join() {
+ return delegate.join();
+ }
+
+ /** */
+ public T getNow(T valueIfAbsent) {
+ return delegate.getNow(valueIfAbsent);
+ }
+
+ /** */
+ public boolean complete(T value) {
+ return delegate.complete(value);
+ }
+
+ /** */
+ public boolean completeExceptionally(Throwable ex) {
+ return delegate.completeExceptionally(ex);
+ }
+
+ /** */
+ public boolean isCompletedExceptionally() {
+ return delegate.isCompletedExceptionally();
+ }
+
+ /** */
+ public static IgniteCompletableFuture<Void>
allOf(IgniteCompletableFuture<?>... cfs) {
+ return wrap(CompletableFuture.allOf(Arrays.stream(cfs).map(f ->
f.delegate).toArray(CompletableFuture[]::new)));
+ }
+
+ /** */
+ public static IgniteCompletableFuture<Object>
anyOf(IgniteCompletableFuture<?>... cfs) {
+ return wrap(CompletableFuture.anyOf(Arrays.stream(cfs).map(f ->
f.delegate).toArray(CompletableFuture[]::new)));
+ }
+
+ /** */
+ public static <U> IgniteCompletableFuture<U> supplyAsync(Supplier<U>
supplier) {
+ return
wrap(CompletableFuture.supplyAsync(OperationContextAwareSupplier.wrap(supplier)));
+ }
+
+ /** */
+ public static <U> IgniteCompletableFuture<U> supplyAsync(Supplier<U>
supplier, Executor executor) {
+ return
wrap(CompletableFuture.supplyAsync(OperationContextAwareSupplier.wrap(supplier),
executor));
+ }
+
+ /** */
+ public static IgniteCompletableFuture<Void> runAsync(Runnable runnable) {
+ return
wrap(CompletableFuture.runAsync(OperationContextAwareRunnable.wrap(runnable)));
+ }
+
+ /** */
+ public static IgniteCompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
+ return
wrap(CompletableFuture.runAsync(OperationContextAwareRunnable.wrap(runnable),
executor));
+ }
+
+ /** */
+ public static <U> IgniteCompletableFuture<U> completedFuture(U value) {
+ return wrap(CompletableFuture.completedFuture(value));
+ }
+
+ /** */
+ private static <T> IgniteCompletableFuture<T> wrap(CompletableFuture<T>
delegate) {
+ return new IgniteCompletableFuture<>(delegate);
+ }
+
+ /** */
+ private static <T> CompletionStage<T> unwrap(CompletionStage<T>
completionStage) {
+ return completionStage instanceof IgniteCompletableFuture
+ ? ((IgniteCompletableFuture<T>)completionStage).delegate
+ : completionStage;
+ }
+
+ /** */
+ private static class OperationContextAwareCompletionStageFactory<T, U>
+ extends OperationContextAwareWrapper<Function<? super T, ? extends
CompletionStage<U>>>
+ implements Function<T, CompletionStage<U>> {
+ /** */
+ public OperationContextAwareCompletionStageFactory(
+ Function<? super T, ? extends CompletionStage<U>> delegate,
+ OperationContextSnapshot snapshot
+ ) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletionStage<U> apply(T t) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return unwrap(delegate.apply(t));
+ }
+ }
+
+ /** */
+ public static <T, R> Function<? super T, ? extends CompletionStage<R>>
wrap(
+ Function<? super T, ? extends CompletionStage<R>> delegate
+ ) {
+ if (delegate == null || delegate instanceof
OperationContextAwareWrapper)
+ return delegate;
+
+ return new OperationContextAwareCompletionStageFactory<>(delegate,
OperationContext.createSnapshot());
+ }
+ }
+}
+
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
new file mode 100644
index 00000000000..6870dde9be8
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareBiConsumer<T, U> extends
OperationContextAwareWrapper<BiConsumer<T, U>> implements BiConsumer<T, U> {
+ /** */
+ public OperationContextAwareBiConsumer(BiConsumer<T, U> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(T t, U u) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ delegate.accept(t, u);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public BiConsumer<T, U> andThen(@NotNull BiConsumer<?
super T, ? super U> after) {
+ return BiConsumer.super.andThen(wrap(after));
+ }
+
+ /** */
+ public static <T, U> BiConsumer<T, U> wrap(BiConsumer<T, U> delegate) {
+ return wrap(delegate, OperationContextAwareBiConsumer::new);
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
similarity index 50%
copy from
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
copy to
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
index a741d3b6d3c..bdf17e0042b 100644
---
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareBiFunction.java
@@ -18,44 +18,35 @@
package org.apache.ignite.internal.thread.context.function;
import java.util.function.BiFunction;
-import org.apache.ignite.internal.IgniteInternalWrapper;
+import java.util.function.Function;
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
/** */
-abstract class OperationContextAwareWrapper<T> implements
IgniteInternalWrapper<T> {
+public class OperationContextAwareBiFunction<T, U, R>
+ extends OperationContextAwareWrapper<BiFunction<T, U, R>>
+ implements BiFunction<T, U, R> {
/** */
- protected final T delegate;
-
- /** */
- protected final OperationContextSnapshot snapshot;
-
- /** */
- @Override public T delegate() {
- return delegate;
+ public OperationContextAwareBiFunction(BiFunction<T, U, R> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
}
- /** */
- protected OperationContextAwareWrapper(T delegate,
OperationContextSnapshot snapshot) {
- this.delegate = delegate;
- this.snapshot = snapshot;
+ /** {@inheritDoc} */
+ @Override public R apply(T t, U u) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return delegate.apply(t, u);
+ }
}
- /** */
- protected static <T> T wrap(T delegate, BiFunction<T,
OperationContextSnapshot, T> wrapper) {
- return wrap(delegate, wrapper, false);
+ /** {@inheritDoc} */
+ @NotNull @Override public <V> BiFunction<T, U, V> andThen(@NotNull
Function<? super R, ? extends V> after) {
+ return
BiFunction.super.andThen(OperationContextAwareFunction.wrap(after));
}
/** */
- protected static <T> T wrap(T delegate, BiFunction<T,
OperationContextSnapshot, T> wrapper, boolean ignoreEmptyContext) {
- if (delegate == null || delegate instanceof
OperationContextAwareWrapper)
- return delegate;
-
- OperationContextSnapshot snapshot = OperationContext.createSnapshot();
-
- if (ignoreEmptyContext && snapshot == null)
- return delegate;
-
- return wrapper.apply(delegate, snapshot);
+ public static <T, U, R> BiFunction<T, U, R> wrap(BiFunction<T, U, R>
delegate) {
+ return wrap(delegate, OperationContextAwareBiFunction::new);
}
}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
new file mode 100644
index 00000000000..f2f0290f2f4
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareConsumer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Consumer;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareConsumer<T> extends
OperationContextAwareWrapper<Consumer<T>> implements Consumer<T> {
+ /** */
+ public OperationContextAwareConsumer(Consumer<T> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Consumer<T> andThen(@NotNull Consumer<? super T>
after) {
+ return Consumer.super.andThen(wrap(after));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void accept(T t) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ delegate.accept(t);
+ }
+ }
+
+ /** */
+ public static <T> Consumer<T> wrap(Consumer<T> delegate) {
+ return wrap(delegate, OperationContextAwareConsumer::new);
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
new file mode 100644
index 00000000000..e12b64b9389
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Function;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.jetbrains.annotations.NotNull;
+
+/** */
+public class OperationContextAwareFunction<T, R> extends
OperationContextAwareWrapper<Function<T, R>> implements Function<T, R> {
+ /** */
+ private OperationContextAwareFunction(Function<T, R> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public R apply(T t) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return delegate.apply(t);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public <V> Function<V, R> compose(@NotNull Function<?
super V, ? extends T> before) {
+ return Function.super.compose(wrap(before));
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public <V> Function<T, V> andThen(@NotNull Function<?
super R, ? extends V> after) {
+ return Function.super.andThen(wrap(after));
+ }
+
+ /** */
+ public static <T, R> Function<T, R> wrap(Function<T, R> delegate) {
+ return wrap(delegate, OperationContextAwareFunction::new);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
similarity index 50%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
copy to
modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
index 5ad5a04fb89..197cc8dcd7a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareInClosure.java
@@ -15,33 +15,33 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.query.reducer;
+package org.apache.ignite.internal.thread.context.function;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+import org.apache.ignite.lang.IgniteInClosure;
-/**
- * Reducer for {@code TextQuery} results.
- */
-public class TextQueryReducer<R> extends MergeSortCacheQueryReducer<R> {
+/** */
+public class OperationContextAwareInClosure<E> extends
OperationContextAwareWrapper<IgniteInClosure<E>> implements IgniteInClosure<E> {
/** */
private static final long serialVersionUID = 0L;
/** */
- public TextQueryReducer(final Map<UUID, NodePageStream<R>> pageStreams) {
- super(pageStreams);
+ public OperationContextAwareInClosure(IgniteInClosure<E> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
}
/** {@inheritDoc} */
- @Override protected CompletableFuture<Comparator<NodePage<R>>>
pageComparator() {
- CompletableFuture<Comparator<NodePage<R>>> f = new
CompletableFuture<>();
-
- f.complete((o1, o2) -> -Float.compare(
- ((ScoredCacheEntry<?, ?>)o1.head()).score(), ((ScoredCacheEntry<?,
?>)o2.head()).score()));
+ @Override public void apply(E e) {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ delegate.apply(e);
+ }
+ }
- return f;
+ /** */
+ public static <E> IgniteInClosure<E> wrap(IgniteInClosure<E> delefate) {
+ return wrap(delefate, OperationContextAwareInClosure::new);
}
+
}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
new file mode 100644
index 00000000000..30e816bb534
--- /dev/null
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread.context.function;
+
+import java.util.function.Supplier;
+import org.apache.ignite.internal.thread.context.OperationContext;
+import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
+import org.apache.ignite.internal.thread.context.Scope;
+
+/** */
+public class OperationContextAwareSupplier<T> extends
OperationContextAwareWrapper<Supplier<T>> implements Supplier<T> {
+ /** */
+ public OperationContextAwareSupplier(Supplier<T> delegate,
OperationContextSnapshot snapshot) {
+ super(delegate, snapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public T get() {
+ try (Scope ignored = OperationContext.restoreSnapshot(snapshot)) {
+ return delegate.get();
+ }
+ }
+
+ /** */
+ public static <T> Supplier<T> wrap(Supplier<T> delegate) {
+ return wrap(delegate, OperationContextAwareSupplier::new);
+ }
+}
diff --git
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
index a741d3b6d3c..381eecfca09 100644
---
a/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
+++
b/modules/commons/src/main/java/org/apache/ignite/internal/thread/context/function/OperationContextAwareWrapper.java
@@ -23,7 +23,7 @@ import
org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
/** */
-abstract class OperationContextAwareWrapper<T> implements
IgniteInternalWrapper<T> {
+public abstract class OperationContextAwareWrapper<T> implements
IgniteInternalWrapper<T> {
/** */
protected final T delegate;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index f60ab5835ad..e3771824b5e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -53,7 +53,6 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
@@ -120,6 +119,7 @@ import
org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
import
org.apache.ignite.internal.sql.optimizer.affinity.PartitionClientContext;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2599,7 +2599,7 @@ public class JdbcThinConnection implements Connection {
*/
private abstract class BlockingJdbcChannel {
/** Request ID -> Jdbc result map. */
- private Map<Long, CompletableFuture<JdbcResult>> results = new
ConcurrentHashMap<>();
+ private Map<Long, IgniteCompletableFuture<JdbcResult>> results = new
ConcurrentHashMap<>();
/**
* Do request in blocking style. It just call
@@ -2614,9 +2614,9 @@ public class JdbcThinConnection implements Connection {
R res;
if (isStream()) {
- CompletableFuture<JdbcResult> resFut = new
CompletableFuture<>();
+ IgniteCompletableFuture<JdbcResult> resFut = new
IgniteCompletableFuture<>();
- CompletableFuture<JdbcResult> oldFut =
results.put(req.requestId(), resFut);
+ IgniteCompletableFuture<JdbcResult> oldFut =
results.put(req.requestId(), resFut);
assert oldFut == null : "Another request with the same id is
waiting for result.";
@@ -2639,7 +2639,7 @@ public class JdbcThinConnection implements Connection {
boolean handleResult(long reqId, JdbcResult res) {
boolean handled = false;
- CompletableFuture<JdbcResult> fut = results.remove(reqId);
+ IgniteCompletableFuture<JdbcResult> fut = results.remove(reqId);
if (fut != null) {
fut.complete(res);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
index d72d8b1e301..2a22e286e53 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryType;
@@ -38,6 +37,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.marshaller.MappedName;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -55,7 +55,7 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
protected final Map<Integer, Set<Integer>> processed = new HashMap<>();
/** Future which will be completed when task requested to be closed. Will
be executed on system pool. */
- protected volatile CompletableFuture<Void> closeFut;
+ protected volatile IgniteCompletableFuture<Void> closeFut;
/** Snapshot file tree. */
protected final SnapshotFileTree sft;
@@ -81,10 +81,10 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
}
/** */
- protected abstract List<CompletableFuture<Void>> saveCacheConfigs();
+ protected abstract List<IgniteCompletableFuture<Void>> saveCacheConfigs();
/** */
- protected abstract List<CompletableFuture<Void>> saveGroup(int grpId,
Set<Integer> grpParts) throws IgniteCheckedException;
+ protected abstract List<IgniteCompletableFuture<Void>> saveGroup(int
grpId, Set<Integer> grpParts) throws IgniteCheckedException;
/** {@inheritDoc} */
@Override public boolean cancel() {
@@ -103,7 +103,7 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
}
/** @return Future which will be completed when operations truly stopped.
*/
- protected abstract CompletableFuture<Void> closeAsync();
+ protected abstract IgniteCompletableFuture<Void> closeAsync();
/**
* @return {@code true} if current task requested to be stopped.
@@ -180,7 +180,7 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
protected void saveSnapshotData() {
try {
// Submit all tasks for partitions and deltas processing.
- List<CompletableFuture<Void>> futs = new ArrayList<>();
+ List<IgniteCompletableFuture<Void>> futs = new ArrayList<>();
Collection<BinaryType> binTypesCopy = cctx.kernalContext()
.cacheObjects()
@@ -201,7 +201,7 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
int futsSize = futs.size();
- CompletableFuture.allOf(futs.toArray(new
CompletableFuture[futsSize])).whenComplete((res, t) -> {
+ IgniteCompletableFuture.allOf(futs.toArray(new
IgniteCompletableFuture[futsSize])).whenComplete((res, t) -> {
assert t == null : "Exception must never be thrown since a
wrapper is used " +
"for each snapshot task: " + t;
@@ -244,8 +244,8 @@ public abstract class AbstractCreateSnapshotFutureTask
extends AbstractSnapshotF
}
/** */
- protected CompletableFuture<Void> runAsync(IgniteThrowableRunner task) {
- return CompletableFuture.runAsync(
+ protected IgniteCompletableFuture<Void> runAsync(IgniteThrowableRunner
task) {
+ return IgniteCompletableFuture.runAsync(
wrapExceptionIfStarted(task),
snpSndr.executor()
);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
index ab3e1a96029..b612861517a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcess.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -45,6 +44,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.filename.Snapshot
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -326,7 +326,7 @@ public class SnapshotCheckProcess {
GridFutureAdapter<SnapshotCheckResponse> phaseFut = new
GridFutureAdapter<>();
- CompletableFuture<SnapshotCheckResponse> workingFut;
+ IgniteCompletableFuture<SnapshotCheckResponse> workingFut;
if (req.incrementalIndex() > 0) {
assert !req.allRestoreHandlers() : "Snapshot handlers aren't
supported for incremental snapshot.";
@@ -349,13 +349,13 @@ public class SnapshotCheckProcess {
}
/** @return A composed future of increment checks for each consistent id
regarding {@link SnapshotCheckContext#metas}. */
- private CompletableFuture<SnapshotCheckResponse>
incrementalFuture(SnapshotCheckContext ctx) {
+ private IgniteCompletableFuture<SnapshotCheckResponse>
incrementalFuture(SnapshotCheckContext ctx) {
// Incremental snapshots do not support working on other topology.
Only single meta and snapshot part can be processed.
SnapshotMetadata meta = ctx.metas.get(0);
- CompletableFuture<SnapshotCheckResponse> resFut = new
CompletableFuture<>();
+ IgniteCompletableFuture<SnapshotCheckResponse> resFut = new
IgniteCompletableFuture<>();
- CompletableFuture<IncrementalSnapshotVerifyResult> workingFut =
snpChecker.checkIncrementalSnapshot(
+ IgniteCompletableFuture<IncrementalSnapshotVerifyResult> workingFut =
snpChecker.checkIncrementalSnapshot(
ctx.locFileTree.get(meta.consistentId()),
ctx.req.incrementalIndex(),
ctx.totalCounter::addAndGet,
@@ -373,18 +373,18 @@ public class SnapshotCheckProcess {
}
/** @return A composed future of partitions checks for each consistent id
regarding {@link SnapshotCheckContext#metas}. */
- private CompletableFuture<SnapshotCheckResponse>
partitionsHashesFuture(SnapshotCheckContext ctx) {
+ private IgniteCompletableFuture<SnapshotCheckResponse>
partitionsHashesFuture(SnapshotCheckContext ctx) {
// Per metas result: consistent id -> check results per partition key.
Map<String, Map<PartitionKey, PartitionHashRecord>> perMetaResults =
new ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
// Per consistent id.
Map<String, Throwable> exceptions = new
ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
- CompletableFuture<SnapshotCheckResponse> composedFut = new
CompletableFuture<>();
+ IgniteCompletableFuture<SnapshotCheckResponse> composedFut = new
IgniteCompletableFuture<>();
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
for (SnapshotMetadata meta : ctx.metas) {
// Run asynchronously to calculate the metric 'total partitions'
faster.
kctx.pools().getSnapshotExecutorService().submit(() -> {
- CompletableFuture<SnapshotPartitionsVerifyHandlerResponse>
metaFut = snpChecker.checkPartitions(
+
IgniteCompletableFuture<SnapshotPartitionsVerifyHandlerResponse> metaFut =
snpChecker.checkPartitions(
meta,
ctx.locFileTree.get(meta.consistentId()),
ctx.req.groups(),
@@ -417,18 +417,18 @@ public class SnapshotCheckProcess {
* @return A composed future of all the snapshot handlers for each
consistent id regarding {@link SnapshotCheckContext#metas}.
* @see IgniteSnapshotManager#handlers()
*/
- private CompletableFuture<SnapshotCheckResponse>
allHandlersFuture(SnapshotCheckContext ctx) {
+ private IgniteCompletableFuture<SnapshotCheckResponse>
allHandlersFuture(SnapshotCheckContext ctx) {
// Per metas result: snapshot part's consistent id -> check result per
handler name.
Map<String, Map<String, SnapshotHandlerResult<Message>>>
perMetaResults = new ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
// Per consistent id.
Map<String, Throwable> exceptions = new
ConcurrentHashMap<>(ctx.metas.size(), 1.0f);
- CompletableFuture<SnapshotCheckResponse> composedFut = new
CompletableFuture<>();
+ IgniteCompletableFuture<SnapshotCheckResponse> composedFut = new
IgniteCompletableFuture<>();
AtomicInteger metasProcessed = new AtomicInteger(ctx.metas.size());
for (SnapshotMetadata meta : ctx.metas) {
// Run asynchronously to calculate the metric 'total partitions'
faster.
kctx.pools().getSnapshotExecutorService().submit(() -> {
- CompletableFuture<Map<String, SnapshotHandlerResult<Message>>>
metaFut = snpChecker.invokeCustomHandlers(
+ IgniteCompletableFuture<Map<String,
SnapshotHandlerResult<Message>>> metaFut = snpChecker.invokeCustomHandlers(
meta,
ctx.locFileTree.get(meta.consistentId()),
ctx.req.groups(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
index 4acb019b7cd..da9aba4ec0d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java
@@ -23,7 +23,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -38,6 +37,7 @@ import
org.apache.ignite.internal.management.cache.PartitionKey;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
@@ -63,19 +63,19 @@ public class SnapshotChecker {
}
/** Launches local metas checking. */
- public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
+ public IgniteCompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
SnapshotFileTree sft,
int incIdx,
@Nullable Collection<Integer> grpIds
) {
- return CompletableFuture.supplyAsync(
+ return IgniteCompletableFuture.supplyAsync(
new SnapshotMetadataVerificationTask(kctx.grid(), log, sft,
incIdx, grpIds),
executor
);
}
/** */
- public CompletableFuture<IncrementalSnapshotVerifyResult>
checkIncrementalSnapshot(
+ public IgniteCompletableFuture<IncrementalSnapshotVerifyResult>
checkIncrementalSnapshot(
SnapshotFileTree sft,
int incIdx,
@Nullable Consumer<Integer> totalCnsmr,
@@ -83,7 +83,7 @@ public class SnapshotChecker {
) {
assert incIdx > 0;
- return CompletableFuture.supplyAsync(
+ return IgniteCompletableFuture.supplyAsync(
new IncrementalSnapshotVerify(kctx.grid(), log, sft, incIdx,
totalCnsmr, checkedCnsmr),
executor
);
@@ -169,7 +169,7 @@ public class SnapshotChecker {
*
* @see IgniteSnapshotManager#handlers()
*/
- public CompletableFuture<Map<String, SnapshotHandlerResult<Message>>>
invokeCustomHandlers(
+ public IgniteCompletableFuture<Map<String,
SnapshotHandlerResult<Message>>> invokeCustomHandlers(
SnapshotMetadata meta,
SnapshotFileTree sft,
@Nullable Collection<String> grps,
@@ -179,7 +179,7 @@ public class SnapshotChecker {
) {
// The handlers use or may use the same snapshot pool. If it is
configured with 1 thread, launching waiting task in
// the same pool might block it.
- return CompletableFuture.supplyAsync(() -> {
+ return IgniteCompletableFuture.supplyAsync(() -> {
try {
SnapshotHandlerContext hndCnt = new SnapshotHandlerContext(
meta,
@@ -202,7 +202,7 @@ public class SnapshotChecker {
}
/** Launches local partitions checking. */
- public CompletableFuture<SnapshotPartitionsVerifyHandlerResponse>
checkPartitions(
+ public IgniteCompletableFuture<SnapshotPartitionsVerifyHandlerResponse>
checkPartitions(
SnapshotMetadata meta,
SnapshotFileTree sft,
@Nullable Collection<String> grps,
@@ -211,7 +211,7 @@ public class SnapshotChecker {
@Nullable Consumer<Integer> totalCnsmr,
@Nullable Consumer<Integer> checkedPartCnsmr
) {
- return CompletableFuture.supplyAsync(() -> {
+ return IgniteCompletableFuture.supplyAsync(() -> {
SnapshotHandlerContext hctx = new SnapshotHandlerContext(
meta,
grps,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 14a7d4df628..bca42981010 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
@@ -67,6 +66,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -125,7 +125,7 @@ class SnapshotFutureTask extends
AbstractCreateSnapshotFutureTask implements Che
private final boolean withMetaStorage;
/** Checkpoint end future. */
- private final CompletableFuture<Boolean> cpEndFut = new
CompletableFuture<>();
+ private final IgniteCompletableFuture<Boolean> cpEndFut = new
IgniteCompletableFuture<>();
/** Future to wait until checkpoint mark phase will be finished and
snapshot tasks scheduled. */
private final GridFutureAdapter<Void> startedFut = new
GridFutureAdapter<>();
@@ -379,7 +379,7 @@ class SnapshotFutureTask extends
AbstractCreateSnapshotFutureTask implements Che
}
/** {@inheritDoc} */
- @Override protected List<CompletableFuture<Void>> saveGroup(int grpId,
Set<Integer> grpParts) {
+ @Override protected List<IgniteCompletableFuture<Void>> saveGroup(int
grpId, Set<Integer> grpParts) {
// Process partitions for a particular cache group.
return grpParts.stream().map(partId -> {
GroupPartitionId pair = new GroupPartitionId(grpId, partId);
@@ -438,7 +438,7 @@ class SnapshotFutureTask extends
AbstractCreateSnapshotFutureTask implements Che
}
/** {@inheritDoc} */
- @Override protected List<CompletableFuture<Void>> saveCacheConfigs() {
+ @Override protected List<IgniteCompletableFuture<Void>> saveCacheConfigs()
{
// Send configuration files of all cache groups.
return ccfgSndrs.stream()
.map(ccfgSndr -> runAsync(ccfgSndr::sendCacheConfig))
@@ -467,7 +467,7 @@ class SnapshotFutureTask extends
AbstractCreateSnapshotFutureTask implements Che
}
/** {@inheritDoc} */
- @Override public synchronized CompletableFuture<Void> closeAsync() {
+ @Override public synchronized IgniteCompletableFuture<Void> closeAsync() {
if (closeFut == null) {
Throwable err0 = err.get();
@@ -477,8 +477,10 @@ class SnapshotFutureTask extends
AbstractCreateSnapshotFutureTask implements Che
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
- closeFut = CompletableFuture.runAsync(() -> onDone(new
SnapshotFutureTaskResult(taken, snpPtr), err0),
- cctx.kernalContext().pools().getSystemExecutorService());
+ closeFut = IgniteCompletableFuture.runAsync(
+ () -> onDone(new SnapshotFutureTaskResult(taken, snpPtr),
err0),
+ cctx.kernalContext().pools().getSystemExecutorService()
+ );
}
return closeFut;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
index 12b589f1d6b..bd2aaf85537 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +37,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.filename.FileTree
import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
@@ -106,7 +106,7 @@ public class SnapshotResponseRemoteFutureTask extends
AbstractSnapshotFutureTask
snpSndr.init(partsToSend.size());
- CompletableFuture.runAsync(() -> partsToSend.forEach((gp, sinfo)
-> {
+ IgniteCompletableFuture.runAsync(() -> partsToSend.forEach((gp,
sinfo) -> {
if (err.get() != null)
return;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 036dcdc1312..531971a00fe 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -36,7 +36,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -84,6 +83,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -947,8 +947,8 @@ public class SnapshotRestoreProcess {
", caches=" + F.transform(opCtx0.dirs.values(), s ->
NodeFileTree.cacheName(s.get(0))) + ']');
}
- CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
- CompletableFuture.runAsync(
+ IgniteCompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ IgniteCompletableFuture.runAsync(
() -> {
try {
SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
@@ -969,7 +969,7 @@ public class SnapshotRestoreProcess {
opCtx0.errHnd.accept(t);
}
- }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
+ }, snpMgr.snapshotExecutorService()) :
IgniteCompletableFuture.completedFuture(null);
Map<String, GridAffinityAssignmentCache> affCache = new
HashMap<>();
@@ -1139,7 +1139,7 @@ public class SnapshotRestoreProcess {
return;
}
- CompletableFuture.runAsync(
+ IgniteCompletableFuture.runAsync(
() -> {
try {
punchHole(grpId, partId, snpFile);
@@ -1169,7 +1169,7 @@ public class SnapshotRestoreProcess {
opCtx0.totalParts = size;
- CompletableFuture.allOf(allPartFuts.toArray(new
CompletableFuture[size]))
+ IgniteCompletableFuture.allOf(allPartFuts.toArray(new
IgniteCompletableFuture[size]))
.runAfterBothAsync(metaFut, () -> {
try {
if (opCtx0.stopChecker.getAsBoolean())
@@ -1754,7 +1754,7 @@ public class SnapshotRestoreProcess {
) {
IgniteSnapshotManager snapMgr = ctx.cache().context().snapshotMgr();
- CompletableFuture<Path> copyPartFut = CompletableFuture.supplyAsync(()
-> {
+ IgniteCompletableFuture<Path> copyPartFut =
IgniteCompletableFuture.supplyAsync(() -> {
if (opCtx.stopChecker.getAsBoolean())
throw new IgniteInterruptedException("The operation has been
stopped on copy file: " + snpFile.getAbsolutePath());
@@ -1774,7 +1774,8 @@ public class SnapshotRestoreProcess {
if (opCtx.isGroupCompressed(grpId)) {
copyPartFut = copyPartFut.thenComposeAsync(
p -> {
- CompletableFuture<Path> result = new CompletableFuture<>();
+ IgniteCompletableFuture<Path> result = new
IgniteCompletableFuture<>();
+
try {
punchHole(grpId, partFut.partId, tmpPartFile);
@@ -1970,7 +1971,7 @@ public class SnapshotRestoreProcess {
}
/** Future will be completed when partition processing ends. */
- private static class PartitionRestoreFuture extends
CompletableFuture<Path> {
+ private static class PartitionRestoreFuture extends
IgniteCompletableFuture<Path> {
/** Partition id. */
private final int partId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
index 041ccf7b1f8..674b664f169 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -68,6 +67,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.BasicRateLimiter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -242,7 +242,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
}
/** {@inheritDoc} */
- @Override protected List<CompletableFuture<Void>> saveCacheConfigs() {
+ @Override protected List<IgniteCompletableFuture<Void>> saveCacheConfigs()
{
return processed.keySet().stream().map(grp -> runAsync(() -> {
CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
@@ -263,7 +263,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
}
/** {@inheritDoc} */
- @Override protected List<CompletableFuture<Void>> saveGroup(int grp,
Set<Integer> grpParts) {
+ @Override protected List<IgniteCompletableFuture<Void>> saveGroup(int grp,
Set<Integer> grpParts) {
long start = System.currentTimeMillis();
AtomicLong entriesCnt = new AtomicLong();
@@ -277,7 +277,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
if (log.isInfoEnabled())
log.info("Start group dump [name=" + name + ", id=" + grp + ']');
- List<CompletableFuture<Void>> futs = grpParts.stream().map(part ->
runAsync(() -> {
+ List<IgniteCompletableFuture<Void>> futs = grpParts.stream().map(part
-> runAsync(() -> {
long entriesCnt0 = 0;
long writtenEntriesCnt0 = 0;
@@ -330,7 +330,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
int futsSize = futs.size();
- CompletableFuture.allOf(futs.toArray(new
CompletableFuture[futsSize])).whenComplete((res, t) -> {
+ IgniteCompletableFuture.allOf(futs.toArray(new
IgniteCompletableFuture[futsSize])).whenComplete((res, t) -> {
clearDumpListener(gctx);
if (log.isInfoEnabled()) {
@@ -365,7 +365,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
}
/** {@inheritDoc} */
- @Override protected synchronized CompletableFuture<Void> closeAsync() {
+ @Override protected synchronized IgniteCompletableFuture<Void>
closeAsync() {
if (closeFut == null) {
dumpCtxs.values().forEach(PartitionDumpContext::close);
@@ -382,7 +382,7 @@ public class CreateDumpFutureTask extends
AbstractCreateSnapshotFutureTask imple
taken.add(new GroupPartitionId(grp, part));
}
- closeFut = CompletableFuture.runAsync(
+ closeFut = IgniteCompletableFuture.runAsync(
() -> {
thLocBufs.clear();
if (encThLocBufs != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 58a58914824..09143f88b22 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +37,7 @@ import
org.apache.ignite.internal.processors.cache.query.reducer.IndexQueryReduc
import
org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
import
org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
import
org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -69,7 +69,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends
GridCacheQueryFutu
private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
/** Metadata for IndexQuery. */
- private final CompletableFuture<IndexQueryResultMeta> idxQryMetaFut;
+ private final IgniteCompletableFuture<IndexQueryResultMeta> idxQryMetaFut;
/** Query start time in nanoseconds to measure duration. */
private final long startTimeNanos;
@@ -106,7 +106,7 @@ public class GridCacheDistributedQueryFuture<K, V, R>
extends GridCacheQueryFutu
Map<UUID, NodePageStream<R>> streamsMap =
Collections.unmodifiableMap(streams);
if (qry.query().type() == INDEX) {
- idxQryMetaFut = new CompletableFuture<>();
+ idxQryMetaFut = new IgniteCompletableFuture<>();
reducer = new
IndexQueryReducer<>(qry.query().idxQryDesc().valType(), streamsMap, cctx,
idxQryMetaFut);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 2db3ed5263e..bbfa9960908 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -36,7 +36,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
@@ -88,6 +87,7 @@ import
org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.task.GridInternal;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -1996,7 +1996,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
private final GridCacheQueryType type;
/** Future of query result metadata. Completed when query actually
started. */
- private final CompletableFuture<IndexQueryResultMeta> metadata;
+ private final IgniteCompletableFuture<IndexQueryResultMeta> metadata;
/** Flag shows whether first result page was delivered to user. */
private volatile boolean sentFirst;
@@ -2010,7 +2010,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
this.type = type;
- metadata = type == INDEX ? new CompletableFuture<>() : null;
+ metadata = type == INDEX ? new IgniteCompletableFuture<>() : null;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
index a8a5fefa43d..07d4fd6b9d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
@@ -19,9 +19,9 @@ package
org.apache.ignite.internal.processors.cache.query.reducer;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.IgniteCheckedException;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
/**
@@ -50,7 +50,7 @@ public abstract class CacheQueryReducer<T> extends
GridIteratorAdapter<T> {
* @return Object that completed the specified future.
* @throws IgniteCheckedException for all failures.
*/
- public static <T> T get(CompletableFuture<?> fut) throws
IgniteCheckedException {
+ public static <T> T get(IgniteCompletableFuture<?> fut) throws
IgniteCheckedException {
try {
return (T)fut.get();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
index 35d069f3852..f8a9162cc43 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/IndexQueryReducer.java
@@ -22,7 +22,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
@@ -36,6 +35,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.lang.IgniteBiTuple;
/**
@@ -46,7 +46,7 @@ public class IndexQueryReducer<R> extends
MergeSortCacheQueryReducer<R> {
private static final long serialVersionUID = 0L;
/** Future that will be completed with first page response. */
- private final CompletableFuture<IndexQueryResultMeta> metaFut;
+ private final IgniteCompletableFuture<IndexQueryResultMeta> metaFut;
/** */
private final String valType;
@@ -59,7 +59,7 @@ public class IndexQueryReducer<R> extends
MergeSortCacheQueryReducer<R> {
final String valType,
final Map<UUID, NodePageStream<R>> pageStreams,
final GridCacheContext<?, ?> cctx,
- final CompletableFuture<IndexQueryResultMeta> meta
+ final IgniteCompletableFuture<IndexQueryResultMeta> meta
) {
super(pageStreams);
@@ -69,7 +69,7 @@ public class IndexQueryReducer<R> extends
MergeSortCacheQueryReducer<R> {
}
/** {@inheritDoc} */
- @Override protected CompletableFuture<Comparator<NodePage<R>>>
pageComparator() {
+ @Override protected IgniteCompletableFuture<Comparator<NodePage<R>>>
pageComparator() {
return metaFut.thenApply(m -> {
Map<String, IndexKeyDefinition> keyDefs = m.keyDefinitions();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
index 3024a07d31a..b3a5a7e4ad2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
@@ -22,8 +22,8 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.IgniteCheckedException;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
/**
* Reducer of cache query results that sort result through all nodes. Note
that it's assumed that every node
@@ -48,7 +48,7 @@ abstract class MergeSortCacheQueryReducer<R> extends
CacheQueryReducer<R> {
}
/** @return Comparator for pages from nodes. */
- protected abstract CompletableFuture<Comparator<NodePage<R>>>
pageComparator();
+ protected abstract IgniteCompletableFuture<Comparator<NodePage<R>>>
pageComparator();
/** {@inheritDoc} */
@Override public boolean hasNextX() throws IgniteCheckedException {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
index b6cfb08fbbe..76b524afad3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
@@ -19,7 +19,7 @@ package
org.apache.ignite.internal.processors.cache.query.reducer;
import java.util.Collection;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
/**
* This class provides an interface {@link #headPage()} that returns a future
will be completed with {@link NodePage}
@@ -39,7 +39,7 @@ public class NodePageStream<R> {
private boolean hasRemotePages = true;
/** Promise to notify the stream consumer about delivering new page. */
- private CompletableFuture<NodePage<R>> head = new CompletableFuture<>();
+ private IgniteCompletableFuture<NodePage<R>> head = new
IgniteCompletableFuture<>();
/** */
public NodePageStream(UUID nodeId, Runnable reqPages, Runnable
cancelPages) {
@@ -58,7 +58,7 @@ public class NodePageStream<R> {
*
* @return Future that will be completed with query result page.
*/
- public synchronized CompletableFuture<NodePage<R>> headPage() {
+ public synchronized IgniteCompletableFuture<NodePage<R>> headPage() {
return head;
}
@@ -78,7 +78,7 @@ public class NodePageStream<R> {
if (!reqNext) {
synchronized (NodePageStream.this) {
if (hasRemotePages) {
- head = new CompletableFuture<>();
+ head = new IgniteCompletableFuture<>();
reqPages.run();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
index 5ad5a04fb89..f4f24384ff1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/TextQueryReducer.java
@@ -20,8 +20,8 @@ package
org.apache.ignite.internal.processors.cache.query.reducer;
import java.util.Comparator;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
/**
* Reducer for {@code TextQuery} results.
@@ -36,8 +36,8 @@ public class TextQueryReducer<R> extends
MergeSortCacheQueryReducer<R> {
}
/** {@inheritDoc} */
- @Override protected CompletableFuture<Comparator<NodePage<R>>>
pageComparator() {
- CompletableFuture<Comparator<NodePage<R>>> f = new
CompletableFuture<>();
+ @Override protected IgniteCompletableFuture<Comparator<NodePage<R>>>
pageComparator() {
+ IgniteCompletableFuture<Comparator<NodePage<R>>> f = new
IgniteCompletableFuture<>();
f.complete((o1, o2) -> -Float.compare(
((ScoredCacheEntry<?, ?>)o1.head()).score(), ((ScoredCacheEntry<?,
?>)o2.head()).score()));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
index 20ff484b8ee..4e502d95c84 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
@@ -20,8 +20,8 @@ package
org.apache.ignite.internal.processors.cache.query.reducer;
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.IgniteCheckedException;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
/**
* Reducer of cache query results, no ordering of results is provided.
@@ -33,14 +33,14 @@ public class UnsortedCacheQueryReducer<R> extends
CacheQueryReducer<R> {
/** Current page to return data to user. */
private NodePage<R> page;
- /** Pending futures for requeseted pages. */
- private final CompletableFuture<NodePage<R>>[] futs;
+ /** Pending futures for requested pages. */
+ private final IgniteCompletableFuture<NodePage<R>>[] futs;
/** */
public UnsortedCacheQueryReducer(Map<UUID, NodePageStream<R>> pageStreams)
{
super(pageStreams);
- futs = new CompletableFuture[pageStreams.size()];
+ futs = new IgniteCompletableFuture[pageStreams.size()];
}
/** {@inheritDoc} */
@@ -52,7 +52,7 @@ public class UnsortedCacheQueryReducer<R> extends
CacheQueryReducer<R> {
if (s.closed())
continue;
- CompletableFuture<NodePage<R>> f = s.headPage();
+ IgniteCompletableFuture<NodePage<R>> f = s.headPage();
if (f.isDone()) {
page = get(f);
@@ -67,11 +67,11 @@ public class UnsortedCacheQueryReducer<R> extends
CacheQueryReducer<R> {
if (pendingNodesCnt == 0)
return false;
- CompletableFuture[] pendingFuts = Arrays.copyOf(futs,
pendingNodesCnt);
+ IgniteCompletableFuture<?>[] pendingFuts = Arrays.copyOf(futs,
pendingNodesCnt);
Arrays.fill(futs, 0, pendingNodesCnt, null);
- page = get(CompletableFuture.anyOf(pendingFuts));
+ page = get(IgniteCompletableFuture.anyOf(pendingFuts));
}
return true;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index 2fa811ea071..7d3db0d5d19 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -20,7 +20,6 @@ package
org.apache.ignite.internal.processors.metastorage.persistence;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
@@ -31,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -76,7 +76,7 @@ public class DmsDataWriterWorker extends GridWorker {
* This task is used to pause processing of the {@code updateQueue}. If
this task completed it means that all the updates
* prior to it already flushed to the local metastorage.
*/
- private volatile Future<?> suspendFut =
CompletableFuture.completedFuture(AWAIT);
+ private volatile Future<?> suspendFut =
IgniteCompletableFuture.completedFuture(AWAIT);
/** */
public DmsDataWriterWorker(
@@ -117,7 +117,7 @@ public class DmsDataWriterWorker extends GridWorker {
*/
public void suspend(IgniteInternalFuture<?> compFut) {
if (isCancelled())
- suspendFut = CompletableFuture.completedFuture(AWAIT);
+ suspendFut = IgniteCompletableFuture.completedFuture(AWAIT);
else {
latch = new CountDownLatch(1);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
index 0c1021a4bcf..2b0b7295524 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/BusyExecutor.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.query.stat;
-import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteLogger;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -152,10 +152,10 @@ public class BusyExecutor {
* @param r Task to execute.
* @return Completable future with executed flag in result.
*/
- public CompletableFuture<Boolean> submit(Runnable r) {
+ public IgniteCompletableFuture<Boolean> submit(Runnable r) {
GridBusyLock lock = busyLock;
- CompletableFuture<Boolean> res = new CompletableFuture<>();
+ IgniteCompletableFuture<Boolean> res = new IgniteCompletableFuture<>();
if (r instanceof CancellableTask) {
CancellableTask ct = (CancellableTask)r;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
index 4df521d39d0..77ae9837533 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/LocalStatisticsGatheringContext.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.query.stat;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -52,7 +52,7 @@ public class LocalStatisticsGatheringContext {
private final AffinityTopologyVersion topVer;
/** Future with success status as a result. */
- private final CompletableFuture<Void> future;
+ private final IgniteCompletableFuture<Void> future;
/** Context cancelled flag. */
private volatile boolean cancelled;
@@ -81,7 +81,7 @@ public class LocalStatisticsGatheringContext {
this.remainingParts = new HashSet<>(remainingParts);
this.allParts = (forceRecollect) ? null : new
HashSet<>(remainingParts);
this.topVer = topVer;
- this.future = new CompletableFuture<>();
+ this.future = new IgniteCompletableFuture<>();
}
/**
@@ -173,7 +173,7 @@ public class LocalStatisticsGatheringContext {
/**
* @return Gathering completable future.
*/
- public CompletableFuture<Void> future() {
+ public IgniteCompletableFuture<Void> future() {
return future;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f477c8b4da9..c932a9c3f8f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -28,6 +28,7 @@ import
org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import
org.apache.ignite.internal.thread.context.function.OperationContextAwareInClosure;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -79,7 +80,9 @@ public class GridFutureAdapter<R> implements
IgniteInternalFuture<R> {
* @param val Node value.
*/
Node(Object val) {
- this.val = val;
+ this.val = val instanceof Thread
+ ? val
+ : OperationContextAwareInClosure.wrap((IgniteInClosure<?>)val);
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
index c6d1a7376bd..e67b307884f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/context/OperationContextAttributesTest.java
@@ -18,19 +18,35 @@
package org.apache.ignite.internal.thread.context;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.thread.pool.IgniteForkJoinPool;
import
org.apache.ignite.internal.thread.pool.IgniteScheduledThreadPoolExecutor;
import org.apache.ignite.internal.thread.pool.IgniteStripedExecutor;
import org.apache.ignite.internal.thread.pool.IgniteStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -486,7 +502,7 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
BiConsumerX<String, Integer> checks = (s, i) -> pool.execute(new
AttributeValueChecker(s, i), 1);
- createAttributeChecks(checks);
+ execute(checks);
AttributeValueChecker.assertAllCreatedChecksPassed();
}
@@ -506,18 +522,18 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
));
BiConsumerX<String, Integer> checks = (s, i) -> {
- pool.execute( new AttributeValueChecker(s, i));
+ pool.execute(new AttributeValueChecker(s, i));
pool.execute(1, new AttributeValueChecker(s, i));
};
- createAttributeChecks(checks);
+ execute(checks);
AttributeValueChecker.assertAllCreatedChecksPassed();
}
/** */
@Test
- public void testThreadContextAwareScheduledThreadPoolExecutor() throws
Exception {
+ public void testOperationContextAwareScheduledThreadPoolExecutor() throws
Exception {
IgniteScheduledThreadPoolExecutor pool = deferShutdown(new
IgniteScheduledThreadPoolExecutor("test", "test", 1));
doContextAwareExecutorServiceTest(pool);
@@ -531,7 +547,7 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
pool.scheduleWithFixedDelay(new AttributeValueChecker(s, i), 100,
100, MILLISECONDS);
};
- createAttributeChecks(checks);
+ execute(checks);
poolUnblockedLatch.countDown();
@@ -540,16 +556,126 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
/** */
@Test
- public void testThreadContextAwareForkJoinCommonPool() throws Exception {
+ public void testOperationContextAwareForkJoinCommonPool() throws Exception
{
doContextAwareExecutorServiceTest(IgniteForkJoinPool.commonPool());
}
/** */
@Test
- public void testThreadContextAwareForkJoinPool() throws Exception {
+ public void testOperationContextAwareForkJoinPool() throws Exception {
doContextAwareExecutorServiceTest(deferShutdown(new
IgniteForkJoinPool("test", "test", 2, null, false)));
}
+ /** */
+ @Test
+ public void testGridFutureAdapterContextPropagation() throws Exception {
+ GridFutureAdapter<Integer> fut = new GridFutureAdapter<>();
+
+ BiConsumerX<String, Integer> checks = (s, i) -> {
+ fut.listen(new AttributeValueChecker(s, i));
+ fut.listen(AttributeValueChecker.createInClosure(s, i));
+ fut.chain(AttributeValueChecker.createClosure(s, i))
+ .chain(AttributeValueChecker.createClosure(s, i),
IgniteForkJoinPool.commonPool())
+ .chain(AttributeValueChecker.createOutClosure(s, i))
+ .chain(AttributeValueChecker.createOutClosure(s, i),
IgniteForkJoinPool.commonPool())
+ .chainCompose(AttributeValueChecker.createComposeClosure(s, i))
+ .chainCompose(AttributeValueChecker.createComposeClosure(s,
i), IgniteForkJoinPool.commonPool());
+ };
+
+ execute(checks);
+
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test", INT_ATTR,
5)) {
+ checkAttributeValues("test", 5);
+
+ fut.onDone(0);
+
+ checkAttributeValues("test", 5);
+ }
+
+ AttributeValueChecker.assertAllCreatedChecksPassed();
+ }
+
+ /** */
+ @Test
+ public void testCompletableFutureContextPropagation() throws Exception {
+ IgniteCompletableFuture<Integer> fut = new IgniteCompletableFuture<>();
+ IgniteCompletableFuture<Integer> failedFut = new
IgniteCompletableFuture<>();
+ IgniteCompletableFuture<Integer> testCompletionStage = new
IgniteCompletableFuture<>();
+
+ IgniteCompletableFuture<Void> allFut =
IgniteCompletableFuture.allOf(fut, testCompletionStage);
+ IgniteCompletableFuture<Object> anyFut =
IgniteCompletableFuture.anyOf(fut, testCompletionStage);
+
+ BiConsumerX<String, Integer> checks = (s, i) -> {
+
fut.thenCompose(AttributeValueChecker.createCompletableStageFactory(s, i))
+
.thenComposeAsync(AttributeValueChecker.createCompletableStageFactory(s, i))
+
.thenComposeAsync(AttributeValueChecker.createCompletableStageFactory(s, i),
ForkJoinPool.commonPool())
+ .thenApply(AttributeValueChecker.createFunction(s, i))
+ .thenApplyAsync(AttributeValueChecker.createFunction(s, i))
+ .thenApplyAsync(AttributeValueChecker.createFunction(s, i),
ForkJoinPool.commonPool())
+ .whenComplete(AttributeValueChecker.createBiConsumer(s, i))
+ .whenCompleteAsync(AttributeValueChecker.createBiConsumer(s,
i))
+ .whenCompleteAsync(AttributeValueChecker.createBiConsumer(s,
i), ForkJoinPool.commonPool())
+ .thenCombine(testCompletionStage,
AttributeValueChecker.createBiFunction(s, i))
+ .thenCombineAsync(testCompletionStage,
AttributeValueChecker.createBiFunction(s, i))
+ .thenCombineAsync(testCompletionStage,
AttributeValueChecker.createBiFunction(s, i), ForkJoinPool.commonPool())
+ .applyToEither(testCompletionStage,
AttributeValueChecker.createFunction(s, i))
+ .applyToEitherAsync(testCompletionStage,
AttributeValueChecker.createFunction(s, i))
+ .applyToEitherAsync(testCompletionStage,
AttributeValueChecker.createFunction(s, i), ForkJoinPool.commonPool())
+ .handle(AttributeValueChecker.createBiFunction(s, i))
+ .handleAsync(AttributeValueChecker.createBiFunction(s, i))
+ .handleAsync(AttributeValueChecker.createBiFunction(s, i),
ForkJoinPool.commonPool());
+
+ fut.thenAccept(AttributeValueChecker.createConsumer(s, i));
+ fut.thenAcceptAsync(AttributeValueChecker.createConsumer(s, i));
+ fut.thenAcceptAsync(AttributeValueChecker.createConsumer(s, i),
ForkJoinPool.commonPool());
+
+ fut.thenRun(new AttributeValueChecker(s, i));
+ fut.thenRunAsync(new AttributeValueChecker(s, i));
+ fut.thenRunAsync(new AttributeValueChecker(s, i),
ForkJoinPool.commonPool());
+
+ fut.thenAcceptBoth(testCompletionStage,
AttributeValueChecker.createBiConsumer(s, i));
+ fut.thenAcceptBothAsync(testCompletionStage,
AttributeValueChecker.createBiConsumer(s, i));
+ fut.thenAcceptBothAsync(testCompletionStage,
AttributeValueChecker.createBiConsumer(s, i), ForkJoinPool.commonPool());
+
+ fut.runAfterBoth(testCompletionStage, new AttributeValueChecker(s,
i));
+ fut.runAfterBothAsync(testCompletionStage, new
AttributeValueChecker(s, i));
+ fut.runAfterBothAsync(testCompletionStage, new
AttributeValueChecker(s, i), ForkJoinPool.commonPool());
+
+ fut.acceptEither(testCompletionStage,
AttributeValueChecker.createConsumer(s, i));
+ fut.acceptEitherAsync(testCompletionStage,
AttributeValueChecker.createConsumer(s, i));
+ fut.acceptEitherAsync(testCompletionStage,
AttributeValueChecker.createConsumer(s, i), ForkJoinPool.commonPool());
+
+ fut.runAfterEither(testCompletionStage, new
AttributeValueChecker(s, i));
+ fut.runAfterEitherAsync(testCompletionStage, new
AttributeValueChecker(s, i));
+ fut.runAfterEitherAsync(testCompletionStage, new
AttributeValueChecker(s, i), ForkJoinPool.commonPool());
+
+ failedFut.exceptionally(AttributeValueChecker.createFunction(s,
i));
+
+ IgniteCompletableFuture.runAsync(new AttributeValueChecker(s, i));
+ IgniteCompletableFuture.runAsync(new AttributeValueChecker(s, i),
ForkJoinPool.commonPool());
+
+
IgniteCompletableFuture.supplyAsync(AttributeValueChecker.createSupplier(s, i));
+
IgniteCompletableFuture.supplyAsync(AttributeValueChecker.createSupplier(s, i),
ForkJoinPool.commonPool());
+ };
+
+ execute(checks);
+
+ try (Scope ignored = OperationContext.set(STR_ATTR, "test", INT_ATTR,
5)) {
+ checkAttributeValues("test", 5);
+
+ fut.complete(0);
+ failedFut.completeExceptionally(new IgniteException());
+ testCompletionStage.complete(0);
+
+ checkAttributeValues("test", 5);
+ }
+
+ AttributeValueChecker.assertAllCreatedChecksPassed();
+
+ anyFut.get(getTestTimeout(), MILLISECONDS);
+ allFut.get(getTestTimeout(), MILLISECONDS);
+ }
+
/** */
private void doContextAwareExecutorServiceTest(ExecutorService pool)
throws Exception {
CountDownLatch poolUnblockedLatch = blockPool(pool);
@@ -567,11 +693,11 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
pool.invokeAll(List.of((Callable<Integer>)new
AttributeValueChecker(s, i)), 1000, MILLISECONDS);
};
- createAttributeChecks(asyncChecks);
+ execute(asyncChecks);
poolUnblockedLatch.countDown();
- createAttributeChecks(syncChecks);
+ execute(syncChecks);
AttributeValueChecker.assertAllCreatedChecksPassed();
}
@@ -600,20 +726,20 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
}
/** */
- private void createAttributeChecks(BiConsumerX<String, Integer>
checkGenerator) throws Exception {
+ private void execute(BiConsumerX<String, Integer> checks) throws Exception
{
try (Scope ignored = OperationContext.set(STR_ATTR, "test1", INT_ATTR,
1)) {
- checkGenerator.accept("test1", 1);
+ checks.accept("test1", 1);
}
try (Scope ignored = OperationContext.set(INT_ATTR, 2)) {
- checkGenerator.accept(DFLT_STR_VAL, 2);
+ checks.accept(DFLT_STR_VAL, 2);
}
try (Scope ignored = OperationContext.set(STR_ATTR, "test2")) {
- checkGenerator.accept("test2", DFLT_INT_VAL);
+ checks.accept("test2", DFLT_INT_VAL);
}
- checkGenerator.accept(DFLT_STR_VAL, DFLT_INT_VAL);
+ checks.accept(DFLT_STR_VAL, DFLT_INT_VAL);
}
/** */
@@ -623,20 +749,23 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
}
/** */
- private static class AttributeValueChecker extends CompletableFuture<Void>
implements Runnable, Callable<Integer> {
+ private static class AttributeValueChecker extends CompletableFuture<Void>
implements IgniteRunnable, Callable<Integer> {
/** */
- static final List<AttributeValueChecker> CHECKS = new ArrayList<>();
+ private static final long serialVersionUID = 0L;
/** */
- private final String strAttrVal;
+ static final List<AttributeValueChecker> CHECKS =
Collections.synchronizedList(new ArrayList<>());
/** */
- private final Integer intAttrVal;
+ private final String expStrAttrVal;
/** */
- public AttributeValueChecker(String strAttrVal, Integer intAttrVal) {
- this.strAttrVal = strAttrVal;
- this.intAttrVal = intAttrVal;
+ private final Integer expIntAttrVal;
+
+ /** */
+ public AttributeValueChecker(String expStrAttrVal, Integer
expIntAttrVal) {
+ this.expStrAttrVal = expStrAttrVal;
+ this.expIntAttrVal = expIntAttrVal;
CHECKS.add(this);
}
@@ -644,7 +773,7 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void run() {
try {
- checkAttributeValues(strAttrVal, intAttrVal);
+ checkAttributeValues(expStrAttrVal, expIntAttrVal);
complete(null);
}
@@ -666,6 +795,107 @@ public class OperationContextAttributesTest extends
GridCommonAbstractTest {
check.get(1000, MILLISECONDS);
}
}
+
+ /** */
+ static IgniteClosure<IgniteInternalFuture<Integer>, Integer>
createClosure(String strAttrVal, int intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return fut -> {
+ checker.run();
+
+ return 0;
+ };
+ }
+
+ /** */
+ static IgniteClosure<IgniteInternalFuture<Integer>,
IgniteInternalFuture<Integer>> createComposeClosure(
+ String strAttrVal,
+ int intAttrVal
+ ) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return fut -> {
+ checker.run();
+
+ return fut;
+ };
+ }
+
+ /** */
+ static IgniteInClosure<IgniteInternalFuture<Integer>>
createInClosure(String strAttrVal, int intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return fut -> checker.run();
+ }
+
+ /** */
+ static IgniteOutClosure<Integer> createOutClosure(String strAttrVal,
int intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return () -> {
+ checker.run();
+
+ return 0;
+ };
+ }
+
+ /** */
+ static <T> BiFunction<Integer, T, Integer> createBiFunction(String
strAttrVal, int intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return (r, t) -> {
+ checker.run();
+
+ return 0;
+ };
+ }
+
+ /** */
+ static <T> Function<T, Integer> createFunction(String strAttrVal, int
intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return a -> {
+ checker.run();
+
+ return 0;
+ };
+ }
+
+ /** */
+ static Function<Integer, CompletionStage<Integer>>
createCompletableStageFactory(String strAttrVal, int intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return a -> {
+ checker.run();
+
+ return IgniteCompletableFuture.completedFuture(0);
+ };
+ }
+
+ /** */
+ static Supplier<Integer> createSupplier(String strAttrVal, int
intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return () -> {
+ checker.run();
+
+ return 0;
+ };
+ }
+
+ /** */
+ static Consumer<Integer> createConsumer(String strAttrVal, int
intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return a -> checker.run();
+ }
+
+ /** */
+ static <T, R> BiConsumer<T, R> createBiConsumer(String strAttrVal, int
intAttrVal) {
+ AttributeValueChecker checker = new
AttributeValueChecker(strAttrVal, intAttrVal);
+
+ return (r, t) -> checker.run();
+ }
}
/** */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
index 7a9faef81ba..141b7a34401 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/BusyExecutorTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.stat;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -26,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import
org.apache.ignite.internal.thread.context.concurrent.IgniteCompletableFuture;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.testframework.GridTestUtils;
@@ -125,7 +125,7 @@ public class BusyExecutorTest extends
GridCommonAbstractTest {
be.activate();
be.execute(taskExec);
- CompletableFuture<Boolean> submitFut = be.submit(taskSubmit);
+ IgniteCompletableFuture<Boolean> submitFut = be.submit(taskSubmit);
be.execute(cancellableTask);
Thread.sleep(TIME_TO_START_THREAD);
@@ -155,7 +155,7 @@ public class BusyExecutorTest extends
GridCommonAbstractTest {
BusyExecutor be = new BusyExecutor("testActivateDeactivate", pool, ()
-> false, c -> log);
be.activate();
- CompletableFuture<Boolean> futures[] = new CompletableFuture[100];
+ IgniteCompletableFuture<Boolean> futures[] = new
IgniteCompletableFuture[100];
CountDownLatch executed = new CountDownLatch(futures.length * 2);
for (int i = 0; i < futures.length; i++) {