This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 38d279a IGNITE-12033 Move async continuations away from striped pool 38d279a is described below commit 38d279afd18f21e6d26a6f3730999600372e039f Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Fri Apr 16 18:53:08 2021 +0300 IGNITE-12033 Move async continuations away from striped pool User continuations should not run on striped pool - this can cause unexpected, hard to diagnose performance issues and deadlocks. * Add `IgniteConfiguration#asyncContinuationExecutor`. Defaults to `null`, which means `ForkJoinPool#commonPool` should be used. * Use default executor for Cache async operations * Add .NET and Java tests IEP: https://cwiki.apache.org/confluence/display/IGNITE/IEP-70%3A+Async+Continuation+Executor --- .../jmh/cache/JmhCacheAsyncListenBenchmark.java | 162 +++++++++++++ .../ignite/configuration/IgniteConfiguration.java | 37 +++ .../apache/ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 10 + .../processors/cache/IgniteCacheFutureImpl.java | 8 +- .../processors/cache/IgniteCacheProxyImpl.java | 26 ++- .../cache/IgniteFinishedCacheFutureImpl.java | 2 +- .../wal/reader/StandaloneGridKernalContext.java | 6 + .../processors/datastreamer/DataStreamerImpl.java | 4 +- .../platform/utils/PlatformConfigurationUtils.java | 40 ++++ .../internal/util/future/IgniteFutureImpl.java | 24 +- .../cache/CacheAsyncContinuationExecutorTest.java | 254 +++++++++++++++++++++ ...eAsyncContinuationSynchronousExecutorTest.java} | 26 ++- .../util/future/IgniteCacheFutureImplTest.java | 2 +- .../ignite/platform/PlatformTestExecutor.java} | 16 +- .../ignite/platform/PlatformThreadUtils.java | 9 + .../ignite/testsuites/IgniteCacheTestSuite.java | 4 + .../Cache/CacheTestAsyncAwait.cs | 115 ++++++++++ .../Client/Cache/CacheTestAsyncAwait.cs | 2 +- .../Compute/CancellationTest.cs | 2 +- .../Compute/ComputeApiTest.cs | 6 +- .../ComputeTestAsyncAwait.cs} | 29 ++- .../Config/full-config.xml | 2 +- .../Config/spring-test.xml | 4 + .../IgniteConfigurationSerializerTest.cs | 5 +- .../IgniteConfigurationTest.cs | 2 + .../Apache.Ignite.Core.Tests/ProjectFilesTest.cs | 16 +- .../Services/PlatformTestService.cs | 1 + .../Apache.Ignite.Core.Tests/TestUtilsJni.cs | 21 ++ .../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 1 + .../Configuration/AsyncContinuationExecutor.cs | 60 +++++ .../Apache.Ignite.Core/IgniteConfiguration.cs | 17 +- .../IgniteConfigurationSection.xsd | 12 + .../Impl/Compute/ComputeTaskHolder.cs | 20 +- .../Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs | 2 +- 35 files changed, 882 insertions(+), 73 deletions(-) diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java new file mode 100644 index 0000000..d494d39 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.cache; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.apache.ignite.internal.benchmarks.model.IntValue; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Mode; + + +/** + * Cache async listen benchmark. + * Measures {@link IgniteFuture#listen(IgniteInClosure)} performance. + * + * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275: + * + * Without ForkJoinPool async continuation executor: + * Benchmark Mode Cnt Score Error Units + * JmhCacheAsyncListenBenchmark.get thrpt 10 82052.664 ± 2891.182 ops/s + * JmhCacheAsyncListenBenchmark.put thrpt 10 77859.584 ± 2071.196 ops/s + * + * With ForkJoinPool async continuation executor: + * Benchmark Mode Cnt Score Error Units + * JmhCacheAsyncListenBenchmark.get thrpt 10 76008.272 ± 1506.928 ops/s + * JmhCacheAsyncListenBenchmark.put thrpt 10 73393.986 ± 1336.420 ops/s + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class JmhCacheAsyncListenBenchmark extends JmhCacheAbstractBenchmark { + /** {@inheritDoc} */ + @Override public void setup() throws Exception { + super.setup(); + + IgniteDataStreamer<Integer, IntValue> dataLdr = node.dataStreamer(cache.getName()); + + for (int i = 0; i < CNT; i++) + dataLdr.addData(i, new IntValue(i)); + + dataLdr.close(); + + System.out.println("Cache populated."); + } + + /** + * Test PUT operation. + * + * @throws Exception If failed. + */ + @Benchmark + public void put() throws Exception { + int key = ThreadLocalRandom.current().nextInt(CNT); + + blockingListen(cache.putAsync(key, new IntValue(key))); + } + + /** + * Test GET operation. + * + * @throws Exception If failed. + */ + @Benchmark + public void get() throws Exception { + int key = ThreadLocalRandom.current().nextInt(CNT); + + blockingListen(cache.getAsync(key)); + } + + /** + * Blocks until future completion using {@link IgniteFuture#listen(IgniteInClosure)}. + * + * @param future Future + */ + private static void blockingListen(IgniteFuture future) throws Exception { + AtomicBoolean ab = new AtomicBoolean(); + + future.listen(f -> { + try { + synchronized (ab) { + ab.set(true); + ab.notifyAll(); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + + synchronized (ab) { + while (!ab.get()) { + ab.wait(5000); + } + } + } + + /** + * Run benchmarks. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + run(CacheAtomicityMode.ATOMIC); + } + + /** + * Run benchmarks for atomic cache. + * + * @param atomicityMode Atomicity mode. + * @throws Exception If failed. + */ + private static void run(CacheAtomicityMode atomicityMode) throws Exception { + run(4, true, atomicityMode, CacheWriteSynchronizationMode.PRIMARY_SYNC); + } + + /** + * Run benchmark. + * + * @param threads Amount of threads. + * @param client Client mode flag. + * @param atomicityMode Atomicity mode. + * @param writeSyncMode Write synchronization mode. + * @throws Exception If failed. + */ + private static void run(int threads, boolean client, CacheAtomicityMode atomicityMode, + CacheWriteSynchronizationMode writeSyncMode) throws Exception { + + JmhIdeBenchmarkRunner.create() + .forks(1) + .threads(threads) + .benchmarks(JmhCacheAsyncListenBenchmark.class.getSimpleName()) + .jvmArguments( + "-Xms4g", + "-Xmx4g", + JmhIdeBenchmarkRunner.createProperty(PROP_ATOMICITY_MODE, atomicityMode), + JmhIdeBenchmarkRunner.createProperty(PROP_WRITE_SYNC_MODE, writeSyncMode), + JmhIdeBenchmarkRunner.createProperty(PROP_DATA_NODES, 2), + JmhIdeBenchmarkRunner.createProperty(PROP_CLIENT_MODE, client)) + .benchmarkModes(Mode.Throughput) + .run(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 3801795..aa3109d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -21,6 +21,8 @@ import java.io.Serializable; import java.lang.management.ManagementFactory; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.zip.Deflater; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryListener; @@ -607,6 +609,9 @@ public class IgniteConfiguration { /** SQL configuration. */ private SqlConfiguration sqlCfg = new SqlConfiguration(); + /** Executor for async operations continuations. */ + private Executor asyncContinuationExecutor; + /** Shutdown policy for cluster. */ public ShutdownPolicy shutdown = DFLT_SHUTDOWN_POLICY; @@ -738,6 +743,7 @@ public class IgniteConfiguration { warmupClos = cfg.getWarmupClosure(); sqlCfg = cfg.getSqlConfiguration(); shutdown = cfg.getShutdownPolicy(); + asyncContinuationExecutor = cfg.getAsyncContinuationExecutor(); } /** @@ -3641,6 +3647,37 @@ public class IgniteConfiguration { return this; } + /** + * Gets the continuation executor for cache async APIs. + * <p /> + * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used. + * <p /> + * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners + * will be invoked using this executor. + * + * @return Executor for async continuations. + */ + public Executor getAsyncContinuationExecutor() { + return asyncContinuationExecutor; + } + + /** + * Sets the continuation executor for cache async APIs. + * <p /> + * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used. + * <p /> + * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners + * will be invoked using this executor. + * + * @param asyncContinuationExecutor Executor for async continuations. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setAsyncContinuationExecutor(Executor asyncContinuationExecutor) { + this.asyncContinuationExecutor = asyncContinuationExecutor; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 281b8a0..e0e51fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; @@ -784,4 +785,11 @@ public interface GridKernalContext extends Iterable<GridComponent> { * @return Performance statistics processor. */ public PerformanceStatisticsProcessor performanceStatistics(); + + /** + * Executor that is in charge of processing user async continuations. + * + * @return Executor that is in charge of processing user async continuations. + */ + public Executor getAsyncContinuationExecutor(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index bb8fcf8..25f4ed5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -30,7 +30,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -1333,4 +1336,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @Override public PerformanceStatisticsProcessor performanceStatistics() { return perfStatProc; } + + /** {@inheritDoc} */ + @Override public Executor getAsyncContinuationExecutor() { + return config().getAsyncContinuationExecutor() == null + ? ForkJoinPool.commonPool() + : config().getAsyncContinuationExecutor(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java index 6f6223f..c2dca1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java @@ -38,18 +38,18 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> { * * @param fut Internal future. */ - public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut) { - super(fut); + public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut, Executor defaultExecutor) { + super(fut, defaultExecutor); } /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb) { - return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null)); + return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null), defaultExecutor); } /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec) { - return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec)); + return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 6222f60..c481450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -31,6 +31,7 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -1906,7 +1907,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @Override public IgniteFuture<?> destroyAsync() { GridCacheContext<K, V> ctx = getContextSafe(); - return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null)); + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null), exec()); } /** {@inheritDoc} */ @@ -1918,7 +1919,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @Override public IgniteFuture<?> closeAsync() { GridCacheContext<K, V> ctx = getContextSafe(); - return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName)); + return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName), exec()); } /** {@inheritDoc} */ @@ -2064,7 +2065,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< assert restartFut != null; - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), cacheName); + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), cacheName); } else throw restartingException; @@ -2072,7 +2073,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< if (restartFut != null) { if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class)) - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " + + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), "Cache is restarting: " + cacheName, e); } @@ -2100,7 +2101,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< /** {@inheritDoc} */ @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) { - return new IgniteCacheFutureImpl<>(fut); + return new IgniteCacheFutureImpl<>(fut, exec()); } /** @@ -2216,7 +2217,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @Override public IgniteFuture<Boolean> rebalance() { GridCacheContext<K, V> ctx = getContextSafe(); - return new IgniteFutureImpl<>(ctx.preloader().forceRebalance()); + return new IgniteFutureImpl<>(ctx.preloader().forceRebalance(), exec()); } /** {@inheritDoc} */ @@ -2228,7 +2229,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< if (fut == null) return new IgniteFinishedFutureImpl<>(); - return new IgniteFutureImpl<>(fut); + return new IgniteFutureImpl<>(fut, exec()); } /** @@ -2256,7 +2257,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< //do nothing } - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), cacheName); + throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut, exec()), cacheName); } } @@ -2363,6 +2364,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< } /** + * Async continuation executor. + */ + private Executor exec() { + return context().kernalContext().getAsyncContinuationExecutor(); + } + + /** * */ private class RestartFuture extends GridFutureAdapter<Void> { @@ -2395,7 +2403,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< } throw new IgniteCacheRestartingException( - new IgniteFutureImpl<>(this), + new IgniteFutureImpl<>(this, exec()), "Cache is restarting: " + name ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java index f7bc95b..f515811 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java @@ -27,6 +27,6 @@ public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> { * @param err Error. */ public IgniteFinishedCacheFutureImpl(Throwable err) { - super(new GridFinishedFuture<V>(err)); + super(new GridFinishedFuture<V>(err), null); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 54ae4ab..912876b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -746,4 +747,9 @@ public class StandaloneGridKernalContext implements GridKernalContext { @Override public PerformanceStatisticsProcessor performanceStatistics() { return null; } + + /** {@inheritDoc} */ + @Override public Executor getAsyncContinuationExecutor() { + return null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 9b1f624..452789d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -370,7 +370,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed fut = new DataStreamerFuture(this); - publicFut = new IgniteCacheFutureImpl<>(fut); + publicFut = new IgniteCacheFutureImpl<>(fut, ctx.getAsyncContinuationExecutor()); GridCacheAdapter cache = ctx.cache().internalCache(cacheName); @@ -708,7 +708,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @NotNull protected IgniteCacheFutureImpl createDataLoadFuture() { GridFutureAdapter internalFut0 = new GridFutureAdapter(); - IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0); + IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0, ctx.getAsyncContinuationExecutor()); internalFut0.listen(rmvActiveFut); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index fbe3218..5bead46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.Executor; import javax.cache.configuration.Factory; import javax.cache.expiry.ExpiryPolicy; import javax.net.ssl.SSLContext; @@ -122,6 +123,9 @@ import org.apache.ignite.util.AttributeNodeFilter; */ @SuppressWarnings({"unchecked", "TypeMayBeWeakened"}) public class PlatformConfigurationUtils { + /** */ + private static final Executor synchronousExecutor = Runnable::run; + /** * Write .Net configuration to the stream. * @@ -760,6 +764,8 @@ public class PlatformConfigurationUtils { cfg.setSqlQueryHistorySize(in.readInt()); if (in.readBoolean()) cfg.setPeerClassLoadingEnabled(in.readBoolean()); + if (in.readBoolean()) + cfg.setAsyncContinuationExecutor(getAsyncContinuationExecutor(in.readInt())); int sqlSchemasCnt = in.readInt(); @@ -1366,6 +1372,8 @@ public class PlatformConfigurationUtils { w.writeInt(cfg.getSqlQueryHistorySize()); w.writeBoolean(true); w.writeBoolean(cfg.isPeerClassLoadingEnabled()); + w.writeBoolean(true); + w.writeInt(getAsyncContinuationExecutorMode(cfg.getAsyncContinuationExecutor())); if (cfg.getSqlSchemas() == null) w.writeInt(0); @@ -2335,6 +2343,38 @@ public class PlatformConfigurationUtils { } /** + * Gets the executor. + * + * @param mode Mode. + * @return Executor. + */ + private static Executor getAsyncContinuationExecutor(int mode) { + switch (mode) { + case 0: return null; + case 1: return synchronousExecutor; + default: throw new IgniteException("Invalid AsyncContinuationExecutor mode: " + mode); + } + } + + /** + * Gets the executor mode. + * + * @param executor Executor. + * @return Mode. + */ + private static int getAsyncContinuationExecutorMode(Executor executor) { + if (executor == null) { + return 0; + } + + if (executor.equals(synchronousExecutor)) { + return 1; + } + + return 2; + } + + /** * Private constructor. */ private PlatformConfigurationUtils() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java index ab8aa7d..307a28f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java @@ -38,13 +38,25 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { /** */ protected final IgniteInternalFuture<V> fut; + /** */ + protected final Executor defaultExecutor; + /** * @param fut Future. */ public IgniteFutureImpl(IgniteInternalFuture<V> fut) { + this(fut, null); + } + + /** + * @param fut Future. + * @param defaultExecutor Default executor. + */ + public IgniteFutureImpl(IgniteInternalFuture<V> fut, @Nullable Executor defaultExecutor) { assert fut != null; this.fut = fut; + this.defaultExecutor = defaultExecutor; } /** @@ -68,7 +80,10 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { @Override public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr) { A.notNull(lsnr, "lsnr"); - fut.listen(new InternalFutureListener(lsnr)); + if (defaultExecutor != null && !isDone()) + fut.listen(new InternalFutureListener(new AsyncFutureListener<>(lsnr, defaultExecutor))); + else + fut.listen(new InternalFutureListener(lsnr)); } /** {@inheritDoc} */ @@ -81,7 +96,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { /** {@inheritDoc} */ @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) { - return new IgniteFutureImpl<>(chainInternal(doneCb, null)); + return new IgniteFutureImpl<>(chainInternal(doneCb, null), defaultExecutor); } /** {@inheritDoc} */ @@ -90,7 +105,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { A.notNull(doneCb, "doneCb"); A.notNull(exec, "exec"); - return new IgniteFutureImpl<>(chainInternal(doneCb, exec)); + return new IgniteFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor); } /** @@ -115,6 +130,9 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> { if (exec != null) return fut.chain(clos, exec); + if (defaultExecutor != null && !isDone()) + return fut.chain(clos, defaultExecutor); + return fut.chain(clos); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java new file mode 100644 index 0000000..48bac86 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.junit.Test; + +/** + * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)}. + */ +@SuppressWarnings("rawtypes") +public class CacheAsyncContinuationExecutorTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int backups() { + return 0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName); + + // Use cache store with a write delay to make sure future does not complete before we register a listener. + ccfg.setCacheStoreFactory(new DelayedStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + + return ccfg; + } + + /** + * Gets the expected thread name prefix. + * @return Prefix. + */ + protected String expectedThreadNamePrefix() { + return "ForkJoinPool.commonPool-worker"; + } + + /** + * Gets a value indicating whether continuation thread can execute cache operations. + * @return Value indicating whether continuation thread can execute cache operations. + */ + protected boolean allowCacheOperationsInContinuation() { + return true; + } + + /** + * Tests future listen with default executor. + */ + @Test + public void testRemoteOperationListenContinuesOnDefaultExecutor() throws Exception { + testRemoteOperationContinuesOnDefaultExecutor(false); + } + + /** + * Tests future chain with default executor. + */ + @Test + public void testRemoteOperationChainContinuesOnDefaultExecutor() throws Exception { + testRemoteOperationContinuesOnDefaultExecutor(true); + } + + /** + * Tests that an operation on a local key executes synchronously, and listener is called immediately on the current + * thread. + */ + @Test + public void testLocalOperationListenerExecutesSynchronously() { + final String key = getPrimaryKey(0); + + IgniteCache<String, Integer> cache = jcache(0); + AtomicReference<String> listenThreadName = new AtomicReference<>(""); + + cache.putAsync(key, 1).listen(f -> listenThreadName.set(Thread.currentThread().getName())); + + assertEquals(Thread.currentThread().getName(), listenThreadName.get()); + } + + /** + * Tests that an operation on a remote key executes on striped pool directly when a syncronous executor is provided. + * This demonstrates that default safe behavior can be overridden with a faster, but unsafe old behavior + * for an individual operation. + */ + @Test + public void testRemoteOperationListenerExecutesOnStripedPoolWhenCustomExecutorIsProvided() throws Exception { + final String key = getPrimaryKey(1); + + IgniteCache<String, Integer> cache = jcache(0); + AtomicReference<String> listenThreadName = new AtomicReference<>(""); + CyclicBarrier barrier = new CyclicBarrier(2); + + cache.putAsync(key, 1).listenAsync(f -> { + listenThreadName.set(Thread.currentThread().getName()); + + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + }, Runnable::run); + + barrier.await(10, TimeUnit.SECONDS); + + assertTrue(listenThreadName.get(), listenThreadName.get().startsWith("sys-stripe-")); + } + + /** + * Tests that an operation on a local key executes synchronously, and chain is called immediately on the current + * thread. + */ + @Test + public void testLocalOperationChainExecutesSynchronously() { + final String key = getPrimaryKey(0); + + IgniteCache<String, Integer> cache = jcache(0); + AtomicReference<String> listenThreadName = new AtomicReference<>(""); + + cache.putAsync(key, 1).chain(f -> { + listenThreadName.set(Thread.currentThread().getName()); + + return new IgniteFinishedFutureImpl<>(); + }); + + assertEquals(Thread.currentThread().getName(), listenThreadName.get()); + } + + /** + * Tests future chain / listen with default executor. + * + * This test would hang before {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)} + * was introduced, or if we set {@link Runnable#run()} as the executor. + */ + private void testRemoteOperationContinuesOnDefaultExecutor(boolean chain) throws Exception { + final String key = getPrimaryKey(1); + + IgniteCache<String, Integer> cache = jcache(0); + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicReference<String> listenThreadName = new AtomicReference<>(""); + + IgniteInClosure<IgniteFuture<Void>> clos = f -> { + listenThreadName.set(Thread.currentThread().getName()); + + if (allowCacheOperationsInContinuation()) { + // Check that cache operations do not deadlock. + cache.replace(key, 2); + } + + try { + barrier.await(10, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + }; + + IgniteFuture<Void> fut = cache.putAsync(key, 1); + + if (chain) + fut.chain(f -> { + clos.apply(f); + return new IgniteFinishedFutureImpl<>(); + }); + else + fut.listen(clos); + + barrier.await(10, TimeUnit.SECONDS); + + assertEquals(allowCacheOperationsInContinuation() ? 2 : 1, cache.get(key).intValue()); + assertTrue(listenThreadName.get(), listenThreadName.get().startsWith(expectedThreadNamePrefix())); + } + + /** + * Gets the primary key. + * @param nodeIdx Node index. + * @return Key. + */ + @SuppressWarnings("OptionalGetWithoutIsPresent") + private String getPrimaryKey(int nodeIdx) { + return IntStream.range(0, 1000) + .mapToObj(String::valueOf) + .filter(x -> belongs(x, nodeIdx)) + .findFirst() + .get(); + } + + /** */ + private static class DelayedStoreFactory implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new CacheStoreAdapter() { + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + }; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java similarity index 52% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java copy to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java index f7bc95b..0651f6cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java @@ -17,16 +17,26 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.internal.util.future.GridFinishedFuture; +import java.util.concurrent.Executor; + +import org.apache.ignite.configuration.IgniteConfiguration; /** - * + * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)} with synchronous executor (old behavior). */ -public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> { - /** - * @param err Error. - */ - public IgniteFinishedCacheFutureImpl(Throwable err) { - super(new GridFinishedFuture<V>(err)); +public class CacheAsyncContinuationSynchronousExecutorTest extends CacheAsyncContinuationExecutorTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setAsyncContinuationExecutor(Runnable::run); + } + + /** {@inheritDoc} */ + @Override protected String expectedThreadNamePrefix() { + return "sys-stripe-"; + } + + /** {@inheritDoc} */ + @Override protected boolean allowCacheOperationsInContinuation() { + return false; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java index 46f1706..b74a7fe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl; public class IgniteCacheFutureImplTest extends IgniteFutureImplTest { /** {@inheritDoc} */ @Override protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) { - return new IgniteCacheFutureImpl<>(fut); + return new IgniteCacheFutureImpl<>(fut, Runnable::run); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java similarity index 70% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java copy to modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java index f7bc95b..4bf26c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java @@ -15,18 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.cache; +package org.apache.ignite.platform; -import org.apache.ignite.internal.util.future.GridFinishedFuture; +import java.util.concurrent.Executor; /** - * + * Test executor. */ -public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> { - /** - * @param err Error. - */ - public IgniteFinishedCacheFutureImpl(Throwable err) { - super(new GridFinishedFuture<V>(err)); +public class PlatformTestExecutor implements Executor { + /** {@inheritDoc} */ + @Override public void execute(Runnable runnable) { + runnable.run(); } } diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java index 0029d1e..b60ebeb 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java @@ -48,4 +48,13 @@ public class PlatformThreadUtils { } } } + + /** + * Gets the thread name. + * + * @return Thread name. + */ + public static String getThreadName() { + return Thread.currentThread().getName(); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 31bc4d9..719232c 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -54,6 +54,8 @@ import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConf import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest; +import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationExecutorTest; +import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationSynchronousExecutorTest; import org.apache.ignite.internal.processors.cache.CacheAtomicSingleMessageCountSelfTest; import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteQueueTest; import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest; @@ -396,6 +398,8 @@ public class IgniteCacheTestSuite { GridTestUtils.addTestIfNeeded(suite, InterceptorWithKeepBinaryCacheFullApiTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, BinaryMetadataRegistrationInsideEntryProcessorTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationExecutorTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationSynchronousExecutorTest.class, ignoredTests); suite.add(IgniteGetNonPlainKeyReadThroughSelfTest.class); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs new file mode 100644 index 0000000..f27600e --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ReSharper disable MethodHasAsyncOverload +namespace Apache.Ignite.Core.Tests.Cache +{ + using System.Threading; + using System.Threading.Tasks; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Configuration; + using NUnit.Framework; + + /// <summary> + /// Tests thick cache operations with async/await. + /// </summary> + public class CacheTestAsyncAwait : TestBase + { + /// <summary> + /// Initializes a new instance of <see cref="CacheTestAsyncAwait"/> class. + /// </summary> + public CacheTestAsyncAwait() : base(2) + { + // No-op. + } + + /// <summary> + /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread. + /// </summary> + [Test] + public async Task TestAsyncAwaitContinuationIsExecutedWithConfiguredExecutor() + { + var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName); + var key = TestUtils.GetPrimaryKey(Ignite2, cache.Name); + + // This causes deadlock if async continuation is executed on the striped thread. + await cache.PutAsync(key, 1); + cache.Replace(key, 2); + + Assert.AreEqual(2, cache.Get(key)); + StringAssert.DoesNotContain("sys-stripe-", TestUtilsJni.GetJavaThreadName()); + } + + /// <summary> + /// Tests that local operation executes synchronously and completes on the same thread. + /// </summary> + [Test] + public async Task TestLocalOperationExecutesSynchronously() + { + var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName); + var key = TestUtils.GetPrimaryKey(Ignite, cache.Name); + var origThread = Thread.CurrentThread; + + await cache.PutAsync(key, key); + + Assert.AreEqual(origThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId); + } + + /// <summary> + /// Tests that explicitly configured synchronous executor runs continuations on the striped pool. + /// </summary> + [Test] + public async Task TestSynchronousExecutorRunsContinuationsOnStripedPool() + { + var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration(name: "client")) + { + AsyncContinuationExecutor = AsyncContinuationExecutor.UnsafeSynchronous, + ClientMode = true + }; + + using (var client = Ignition.Start(cfg)) + { + var cache = client.GetOrCreateCache<int, int>(TestUtils.TestName); + + await cache.PutAsync(1, 1); + + StringAssert.StartsWith("sys-stripe-", TestUtilsJni.GetJavaThreadName()); + + Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous, + client.GetConfiguration().AsyncContinuationExecutor); + + // Jump away from striped pool to avoid deadlock on node stop. + await Task.Yield(); + } + } + + /// <summary> + /// Tests that invalid executor configuration is rejected. + /// </summary> + [Test] + public void TestInvalidExecutorConfigurationFailsOnStart() + { + var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + AsyncContinuationExecutor = AsyncContinuationExecutor.Custom + }; + + var ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg)); + Assert.AreEqual("Invalid AsyncContinuationExecutor mode: 2", ex.Message); + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs index a3485d0..e254fc8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs @@ -22,7 +22,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache using NUnit.Framework; /// <summary> - /// Tests cache operations with async/await. + /// Tests thin cache operations with async/await. /// </summary> public class CacheTestAsyncAwait : ClientTestBase { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs index 3e85d497..dbaa94b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs @@ -112,7 +112,7 @@ namespace Apache.Ignite.Core.Tests.Compute cts.Cancel(); - Assert.IsTrue(task.IsCanceled); + TestUtils.WaitForTrueCondition(() => task.IsCanceled); // Pass cancelled token Assert.IsTrue(runner(Compute, cts.Token).IsCanceled); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs index 820435f..ec5750d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs @@ -720,11 +720,11 @@ namespace Apache.Ignite.Core.Tests.Compute // Cancel while executing var task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token); cts.Cancel(); - Assert.IsTrue(task.IsCanceled); + TestUtils.WaitForTrueCondition(() => task.IsCanceled); // Use cancelled token - task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token); - Assert.IsTrue(task.IsCanceled); + var task2 = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token); + Assert.IsTrue(task2.IsCanceled); } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs similarity index 54% copy from modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs copy to modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs index a3485d0..06e4f66 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs @@ -15,29 +15,36 @@ * limitations under the License. */ -namespace Apache.Ignite.Core.Tests.Client.Cache +namespace Apache.Ignite.Core.Tests.Compute { using System.Threading.Tasks; - using Apache.Ignite.Core.Tests.Client; using NUnit.Framework; /// <summary> - /// Tests cache operations with async/await. + /// Tests compute async continuation behavior. /// </summary> - public class CacheTestAsyncAwait : ClientTestBase + public class ComputeTestAsyncAwait : TestBase { /// <summary> - /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread. + /// Tests that RunAsync continuation does not capture Ignite threads. /// </summary> [Test] - public async Task TestAsyncAwaitContinuationIsExecutedOnThreadPool() + public async Task TestComputeRunAsyncContinuation() { - var cache = GetClientCache<int>(); - await cache.PutAsync(1, 1).ConfigureAwait(false); + await Ignite.GetCompute().RunAsync(new ComputeAction()); - // This causes deadlock if async continuation is executed on response handler thread. - cache.PutAsync(2, 2).Wait(); - Assert.AreEqual(2, cache.Get(2)); + StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName()); + } + + /// <summary> + /// Tests that ExecuteAsync continuation does not capture Ignite threads. + /// </summary> + [Test] + public async Task TestComputeExecuteAsyncContinuation() + { + await Ignite.GetCompute().ExecuteAsync(new StringLengthEmptyTask(), "abc"); + + StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName()); } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml index bcc8347..f294afa 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml @@ -24,7 +24,7 @@ isLateAffinityAssignment='false' springConfigUrl='c:\myconfig.xml' autoGenerateIgniteInstanceName='true' peerAssemblyLoadingMode='CurrentAppDomain' longQueryWarningTimeout='1:2:3' isActiveOnStart='false' consistentId='someId012' redirectJavaConsoleOutput='false' authenticationEnabled='true' mvccVacuumFrequency='10000' mvccVacuumThreadCount='4' - sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true'> + sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true' asyncContinuationExecutor='UnsafeSynchronous'> <localhost>127.1.1.1</localhost> <binaryConfiguration compactFooter='false' keepDeserialized='true'> <nameMapper type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+NameMapper' bar='testBar' /> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml index 4488f93..acd5a9b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml @@ -47,5 +47,9 @@ <property name="dataStorageConfiguration"> <bean class="org.apache.ignite.configuration.DataStorageConfiguration"/> </property> + + <property name="asyncContinuationExecutor"> + <bean class="org.apache.ignite.platform.PlatformTestExecutor"/> + </property> </bean> </beans> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs index a98cad9..74d2688 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs @@ -386,6 +386,8 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(2, ec.Count); Assert.AreEqual(new[] {"exec1", "exec2"}, ec.Select(e => e.Name)); Assert.AreEqual(new[] {1, 2}, ec.Select(e => e.Size)); + + Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous, cfg.AsyncContinuationExecutor); } /// <summary> @@ -1081,7 +1083,8 @@ namespace Apache.Ignite.Core.Tests Name = "exec-1", Size = 11 } - } + }, + AsyncContinuationExecutor = AsyncContinuationExecutor.ThreadPool }; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index e49b6c2..5d57aa8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -315,6 +315,7 @@ namespace Apache.Ignite.Core.Tests CheckDefaultProperties(resCfg.ClientConnectorConfiguration); Assert.AreEqual(false, resCfg.JavaPeerClassLoadingEnabled); + Assert.AreEqual(AsyncContinuationExecutor.Custom, resCfg.AsyncContinuationExecutor); } } @@ -555,6 +556,7 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(IgniteConfiguration.DefaultAuthenticationEnabled, cfg.AuthenticationEnabled); Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumFrequency, cfg.MvccVacuumFrequency); Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumThreadCount, cfg.MvccVacuumThreadCount); + Assert.AreEqual(AsyncContinuationExecutor.ThreadPool, cfg.AsyncContinuationExecutor); // Thread pools. Assert.AreEqual(IgniteConfiguration.DefaultManagementThreadPoolSize, cfg.ManagementThreadPoolSize); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs index 5fdf70d..2dcc94a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs @@ -37,7 +37,7 @@ namespace Apache.Ignite.Core.Tests { var projFiles = TestUtils.GetDotNetSourceDir() .GetFiles("*.csproj", SearchOption.AllDirectories) - .Where(x => !x.FullName.ToLower().Contains("dotnetcore") && + .Where(x => !x.FullName.ToLower().Contains("dotnetcore") && !x.FullName.Contains("Benchmark") && !x.FullName.Contains("templates") && !x.FullName.Contains("examples")) @@ -95,7 +95,7 @@ namespace Apache.Ignite.Core.Tests { var excluded = new[] { - "ProjectFilesTest.cs", + "ProjectFilesTest.cs", "CopyOnWriteConcurrentDictionary.cs", "IgniteArgumentCheck.cs", "DelegateConverter.cs", @@ -109,7 +109,7 @@ namespace Apache.Ignite.Core.Tests "HandleRegistry.cs", "BinaryObjectHeader.cs" }; - + var csFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.cs", SearchOption.AllDirectories); foreach (var csFile in csFiles) @@ -142,10 +142,10 @@ namespace Apache.Ignite.Core.Tests public void TestAllCsharpFilesAreIncludedInProject() { var projFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.csproj", SearchOption.AllDirectories) - .Where(x => + .Where(x => !x.Name.Contains("DotNetCore") && - !x.Name.Contains("Benchmark") && - !x.FullName.Contains("templates") && + !x.Name.Contains("Benchmark") && + !x.FullName.Contains("templates") && !x.FullName.Contains("examples")); var excludedFiles = new[] @@ -153,7 +153,9 @@ namespace Apache.Ignite.Core.Tests "IgnitionStartTest.cs", "Common\\TestFixtureSetUp.cs", "Common\\TestFixtureTearDown.cs", - "Client\\Cache\\CacheTestAsyncAwait.cs" + "Client\\Cache\\CacheTestAsyncAwait.cs", + "Cache\\CacheTestAsyncAwait.cs", + "Compute\\ComputeTestAsyncAwait.cs" }; Assert.Multiple(() => diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs index 72be97e..1b6303b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs @@ -395,6 +395,7 @@ namespace Apache.Ignite.Core.Tests.Services /** <inheritDoc /> */ public ServicesTest.PlatformComputeBinarizable[] testBinarizableArray(ServicesTest.PlatformComputeBinarizable[] x) { + // ReSharper disable once CoVariantArrayConversion return (ServicesTest.PlatformComputeBinarizable[])testBinarizableArrayOfObjects(x); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs index ac379a0..d76c9e0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs @@ -95,6 +95,15 @@ namespace Apache.Ignite.Core.Tests CallVoidMethod(ClassPlatformProcessUtils, "destroyProcess", "()V"); } + /// <summary> + /// Gets the Java thread name. + /// </summary> + /// <returns></returns> + public static string GetJavaThreadName() + { + return CallStringMethod(ClassPlatformThreadUtils, "getThreadName", "()Ljava/lang/String;"); + } + /** */ private static unsafe void CallStringMethod(string className, string methodName, string methodSig, string arg) { @@ -122,5 +131,17 @@ namespace Apache.Ignite.Core.Tests env.CallStaticVoidMethod(cls, methodId); } } + + /** */ + private static unsafe string CallStringMethod(string className, string methodName, string methodSig) + { + var env = Jvm.Get().AttachCurrentThread(); + using (var cls = env.FindClass(className)) + { + var methodId = env.GetStaticMethodId(cls, methodName, methodSig); + var res = env.CallStaticObjectMethod(cls, methodId); + return env.JStringToString(res.Target); + } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index a0eb1e4..b61aad3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -81,6 +81,7 @@ <Compile Include="Cluster\IBaselineNode.cs" /> <Compile Include="Common\IgniteIllegalStateException.cs" /> <Compile Include="Common\IgniteProductVersion.cs" /> + <Compile Include="Configuration\AsyncContinuationExecutor.cs" /> <Compile Include="Configuration\CheckpointWriteOrder.cs" /> <Compile Include="Configuration\DataPageEvictionMode.cs" /> <Compile Include="Configuration\DiskPageCompression.cs" /> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs new file mode 100644 index 0000000..36953fb --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Configuration +{ + /// <summary> + /// Defines async continuations behavior. + /// </summary> + public enum AsyncContinuationExecutor + { + /// <summary> + /// Executes async continuations on the thread pool (default). + /// </summary> + ThreadPool = 0, + + /// <summary> + /// Executes async continuations synchronously on the same thread that completes the previous operation. + /// <para /> + /// WARNING: can cause deadlocks and performance issues when not used correctly. + /// <para /> + /// Ignite performs cache operations using a special "striped" thread pool + /// (see <see cref="IgniteConfiguration.StripedThreadPoolSize"/>). Using this synchronous mode means that + /// async continuations (any code coming after <c>await cache.DoAsync()</c>, or code in <c>ContinueWith()</c>) + /// will run on the striped pool: + /// <ul> + /// <li> + /// Cache operations can't execute while user code runs on the striped thread. + /// </li> + /// <li> + /// Attempting other cache operations on the striped thread can cause a deadlock. + /// </li> + /// </ul> + /// <para /> + /// This mode can improve performance, because continuations do not have to be scheduled on another thread. + /// However, special care is required to release striped threads as soon as possible. + /// </summary> + UnsafeSynchronous = 1, + + /// <summary> + /// Indicates that custom executor is configured on the Java side. + /// <para /> + /// This value should not be used explicitly. + /// </summary> + Custom = 2 + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index 3ef2b7f..e6437d2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -221,6 +221,9 @@ namespace Apache.Ignite.Core /** */ private bool? _clientMode; + /** */ + private AsyncContinuationExecutor? _asyncContinuationExecutor; + /// <summary> /// Default network retry count. /// </summary> @@ -342,6 +345,7 @@ namespace Apache.Ignite.Core writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout); writer.WriteIntNullable(_sqlQueryHistorySize); writer.WriteBooleanNullable(_javaPeerClassLoadingEnabled); + writer.WriteIntNullable((int?) _asyncContinuationExecutor); if (SqlSchemas == null) writer.WriteInt(0); @@ -747,6 +751,7 @@ namespace Apache.Ignite.Core _sysWorkerBlockedTimeout = r.ReadTimeSpanNullable(); _sqlQueryHistorySize = r.ReadIntNullable(); _javaPeerClassLoadingEnabled = r.ReadBooleanNullable(); + _asyncContinuationExecutor = (AsyncContinuationExecutor?) r.ReadIntNullable(); int sqlSchemasCnt = r.ReadInt(); @@ -1729,10 +1734,20 @@ namespace Apache.Ignite.Core /// and peer class loading in Java are two distinct and independent features. /// <para /> /// </summary> - public bool JavaPeerClassLoadingEnabled + public bool JavaPeerClassLoadingEnabled { get { return _javaPeerClassLoadingEnabled ?? default(bool); } set { _javaPeerClassLoadingEnabled = value; } } + + /// <summary> + /// Gets or sets the async continuation behavior. + /// See <see cref="Apache.Ignite.Core.Configuration.AsyncContinuationExecutor"/> members for more details. + /// </summary> + public AsyncContinuationExecutor AsyncContinuationExecutor + { + get { return _asyncContinuationExecutor ?? default(AsyncContinuationExecutor); } + set { _asyncContinuationExecutor = value; } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 0b35ca4..950b738 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -128,6 +128,13 @@ </xs:restriction> </xs:simpleType> + <xs:simpleType name="asyncContinuationExecutor" final="restriction"> + <xs:restriction base="xs:string"> + <xs:enumeration value="ThreadPool" /> + <xs:enumeration value="UnsafeSynchronous" /> + </xs:restriction> + </xs:simpleType> + <xs:element name="igniteConfiguration"> <xs:annotation> <xs:documentation>Ignite configuration root.</xs:documentation> @@ -2513,6 +2520,11 @@ <xs:documentation>Whether Java console output should be redirected to Console.Out and Console.Error.</xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="asyncContinuationExecutor" type="asyncContinuationExecutor"> + <xs:annotation> + <xs:documentation>Async continuation behavior.</xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> </xs:schema> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs index 489fe44..e2bed73 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Compute using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; + using System.Threading; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; @@ -59,7 +60,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="stream">Stream.</param> /// <returns>Policy.</returns> int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream); - + /// <summary> /// Perform task reduce. /// </summary> @@ -70,7 +71,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// </summary> /// <param name="taskHandle">Task handle.</param> void Complete(long taskHandle); - + /// <summary> /// Complete task with error. /// </summary> @@ -85,7 +86,7 @@ namespace Apache.Ignite.Core.Impl.Compute internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder { /** Empty results. */ - private static readonly IList<IComputeJobResult<T>> EmptyRes = + private static readonly IList<IComputeJobResult<T>> EmptyRes = new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>()); /** Compute instance. */ @@ -102,7 +103,7 @@ namespace Apache.Ignite.Core.Impl.Compute /** Task future. */ private readonly Future<TR> _fut = new Future<TR>(); - + /** Jobs whose results are cached. */ private ISet<object> _resJobs; @@ -111,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Compute /** Handles for jobs which are not serialized right away. */ private volatile List<long> _jobHandles; - + /// <summary> /// Constructor. /// </summary> @@ -241,7 +242,7 @@ namespace Apache.Ignite.Core.Impl.Compute Finish(default(TR), e); stream.Reset(); - + writer.WriteBoolean(false); // Map failed. writer.WriteString(e.Message); // Write error message. } @@ -345,7 +346,7 @@ namespace Apache.Ignite.Core.Impl.Compute throw; } } - + /** <inheritDoc /> */ public void Reduce() { @@ -489,7 +490,8 @@ namespace Apache.Ignite.Core.Impl.Compute /// <param name="err">Error.</param> private void Finish(TR res, Exception err) { - _fut.OnDone(res, err); + // Always complete the future on a ThreadPool thread to avoid capturing Ignite "pub-" thread. + ThreadPool.QueueUserWorkItem(_ => _fut.OnDone(res, err)); } /// <summary> @@ -503,7 +505,7 @@ namespace Apache.Ignite.Core.Impl.Compute var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry; if (handles != null) - foreach (var handle in handles) + foreach (var handle in handles) handleRegistry.Release(handle, true); handleRegistry.Release(taskHandle, true); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs index 4bada09..86515c2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs @@ -230,7 +230,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged.Jni /// <summary> /// Calls the static object method. /// </summary> - private GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null) + public GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null) { var res = _callStaticObjectMethod(_envPtr, cls.Target, methodId, argsPtr);