IGNITE-9249 Use async listener notification to avoid excessive thread stack usage with async operations - Fixes #4625.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad2f58ec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad2f58ec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad2f58ec Branch: refs/heads/ignite-7251 Commit: ad2f58ec7b54b52f56a7a3785d33a75a98d04627 Parents: b13f373 Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Fri Sep 14 20:12:16 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Sep 14 20:14:00 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 38 ++++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ad2f58ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c763834..0570b4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4392,20 +4392,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (fut != null && !fut.isDone()) { IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, - new IgniteOutClosure<IgniteInternalFuture>() { - @Override public IgniteInternalFuture<T> apply() { + (IgniteOutClosure<IgniteInternalFuture>)() -> { + GridFutureAdapter resFut = new GridFutureAdapter(); + + ctx.kernalContext().closure().runLocalSafe(() -> { + IgniteInternalFuture fut0; + if (ctx.kernalContext().isStopping()) - return new GridFinishedFuture<>( + fut0 = new GridFinishedFuture<>( new IgniteCheckedException("Operation has been cancelled (node is stopping).")); - - try { - return op.op(tx0, opCtx).chain(clo); - } - finally { - // It is necessary to clear tx context in this thread as well. - ctx.shared().txContextReset(); + else { + try { + fut0 = op.op(tx0, opCtx).chain(clo); + } + finally { + // It is necessary to clear tx context in this thread as well. + ctx.shared().txContextReset(); + } } - } + + fut0.listen((IgniteInClosure<IgniteInternalFuture>)fut01 -> { + try { + resFut.onDone(fut01.get()); + } + catch (Throwable ex) { + resFut.onDone(ex); + } + }); + }, true); + + return resFut; }); saveFuture(holder, f, retry);