This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 69c5b51 IGNITE-13226 .NET: Fix ClientNotificationHandler leak in ClientSocket 69c5b51 is described below commit 69c5b516686924834876d3f52d349ac1880b20f3 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Wed Jul 8 12:21:44 2020 +0300 IGNITE-13226 .NET: Fix ClientNotificationHandler leak in ClientSocket Fix race condition in ClientSocket.AddNotificationHandler: SetHandler call was causing RemoveNotificationHandler call before ConcurrentDictionary.AddOrUpdate has finished, so AddOrUpdate added the handler back, causing the leak. --- .../Common/TestRunner.cs | 2 +- .../Client/Compute/ComputeClientTests.cs | 2 ++ .../Apache.Ignite.Core.Tests/TestUtils.Common.cs | 33 ++++++++++++++++++++-- .../Impl/Client/ClientNotificationHandler.cs | 25 ++++++++++------ .../Apache.Ignite.Core/Impl/Client/ClientSocket.cs | 25 +++++++++++----- .../Impl/Client/Compute/ComputeClient.cs | 4 +-- 6 files changed, 69 insertions(+), 22 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Common/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Common/TestRunner.cs index 1b0d45c..85bbda7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Common/TestRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests.DotNetCore/Common/TestRunner.cs @@ -30,4 +30,4 @@ namespace Apache.Ignite.Core.Tests.DotNetCore.Common new IgnitionStartTest().TestIgniteStartsFromAppConfig(); } } -} \ No newline at end of file +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs index ceffb11..289468a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Compute/ComputeClientTests.cs @@ -80,6 +80,8 @@ namespace Apache.Ignite.Core.Tests.Client.Compute Assert.Fail(entry.Message); } } + + Assert.IsEmpty(Client.GetActiveNotificationListeners()); } /// <summary> diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs index d7a31e7..f8b14d2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Tests { using System; + using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; @@ -27,11 +28,13 @@ namespace Apache.Ignite.Core.Tests using System.Threading; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Client; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Discovery.Tcp; using Apache.Ignite.Core.Discovery.Tcp.Static; using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Client; using NUnit.Framework; /// <summary> @@ -472,7 +475,7 @@ namespace Apache.Ignite.Core.Tests } } } - + /// <summary> /// Gets the dot net source dir. /// </summary> @@ -491,7 +494,7 @@ namespace Apache.Ignite.Core.Tests throw new InvalidOperationException("Could not resolve Ignite.NET source directory."); } - + /// <summary> /// Gets a value indicating whether specified partition is reserved. /// </summary> @@ -499,7 +502,7 @@ namespace Apache.Ignite.Core.Tests { Debug.Assert(ignite != null); Debug.Assert(cacheName != null); - + const string taskName = "org.apache.ignite.platform.PlatformIsPartitionReservedTask"; return ignite.GetCompute().ExecuteJavaTask<bool>(taskName, new object[] {cacheName, part}); @@ -517,5 +520,29 @@ namespace Apache.Ignite.Core.Tests return ex; } + + /// <summary> + /// Gets the private field value. + /// </summary> + public static T GetPrivateField<T>(object obj, string name) + { + Assert.IsNotNull(obj); + + var field = obj.GetType().GetField(name, BindingFlags.Instance | BindingFlags.NonPublic); + + Assert.IsNotNull(field); + + return (T) field.GetValue(obj); + } + + /// <summary> + /// Gets active notification listeners. + /// </summary> + public static ICollection GetActiveNotificationListeners(this IIgniteClient client) + { + var failoverSocket = GetPrivateField<ClientFailoverSocket>(client, "_socket"); + var socket = GetPrivateField<ClientSocket>(failoverSocket, "_socket"); + return GetPrivateField<ICollection>(socket, "_notificationListeners"); + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientNotificationHandler.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientNotificationHandler.cs index cc031e8..d5d5c0e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientNotificationHandler.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientNotificationHandler.cs @@ -35,12 +35,12 @@ namespace Apache.Ignite.Core.Impl.Client { /** Handler delegate. */ public delegate void Handler(IBinaryStream stream, Exception ex); - + /** Logger. */ private readonly ILogger _logger; /** Nested handler. */ - private Handler _handler; + private volatile Handler _handler; /** Queue. */ private List<KeyValuePair<IBinaryStream, Exception>> _queue; @@ -51,12 +51,20 @@ namespace Apache.Ignite.Core.Impl.Client public ClientNotificationHandler(ILogger logger, Handler handler = null) { Debug.Assert(logger != null); - + _logger = logger; _handler = handler; } /// <summary> + /// Gets a value indicating whether handler is set for this instance. + /// </summary> + public bool HasHandler + { + get { return _handler != null; } + } + + /// <summary> /// Handles the notification. /// </summary> /// <param name="stream">Notification data.</param> @@ -69,7 +77,7 @@ namespace Apache.Ignite.Core.Impl.Client { // NOTE: Back pressure control should be added here when needed (e.g. for Continuous Queries). var handler = _handler; - + if (handler != null) { ThreadPool.QueueUserWorkItem(_ => Handle(handler, stream, exception)); @@ -86,10 +94,11 @@ namespace Apache.Ignite.Core.Impl.Client /// Sets the handler. /// </summary> /// <param name="handler">Handler.</param> - public ClientNotificationHandler SetHandler(Handler handler) + public void SetHandler(Handler handler) { Debug.Assert(handler != null); - + Debug.Assert(_handler == null); + lock (this) { _handler = handler; @@ -102,8 +111,6 @@ namespace Apache.Ignite.Core.Impl.Client ThreadPool.QueueUserWorkItem(_ => Drain(handler, queue)); } } - - return this; } /// <summary> @@ -134,4 +141,4 @@ namespace Apache.Ignite.Core.Impl.Client } } } -} \ No newline at end of file +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs index ba13550..bec7c10 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs @@ -252,12 +252,18 @@ namespace Apache.Ignite.Core.Impl.Client /// Adds a notification handler. /// </summary> /// <param name="notificationId">Notification id.</param> - /// <param name="handler">Handler delegate.</param> - public void AddNotificationHandler(long notificationId, ClientNotificationHandler.Handler handler) + /// <param name="handlerDelegate">Handler delegate.</param> + public void AddNotificationHandler(long notificationId, ClientNotificationHandler.Handler handlerDelegate) { - _notificationListeners.AddOrUpdate(notificationId, - _ => new ClientNotificationHandler(_logger, handler), - (_, oldHandler) => oldHandler.SetHandler(handler)); + var handler = _notificationListeners.GetOrAdd(notificationId, + _ => new ClientNotificationHandler(_logger, handlerDelegate)); + + if (!handler.HasHandler) + { + // We could use AddOrUpdate, but SetHandler must be called outside of Update call, + // because it causes handler execution, which, in turn, may call RemoveNotificationHandler. + handler.SetHandler(handlerDelegate); + } _listenerEvent.Set(); } @@ -269,6 +275,11 @@ namespace Apache.Ignite.Core.Impl.Client /// <returns>True when removed, false otherwise.</returns> public void RemoveNotificationHandler(long notificationId) { + if (IsDisposed) + { + return; + } + ClientNotificationHandler unused; var removed = _notificationListeners.TryRemove(notificationId, out unused); Debug.Assert(removed); @@ -440,8 +451,8 @@ namespace Apache.Ignite.Core.Impl.Client throw new IgniteClientException("Unexpected thin client notification: " + requestId); } - _notificationListeners.GetOrAdd(requestId, _ => new ClientNotificationHandler(_logger)) - .Handle(stream, null); + var handler = _notificationListeners.GetOrAdd(requestId, _ => new ClientNotificationHandler(_logger)); + handler.Handle(stream, null); return true; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Compute/ComputeClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Compute/ComputeClient.cs index a02608f..6e1fdca 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Compute/ComputeClient.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Compute/ComputeClient.cs @@ -191,14 +191,14 @@ namespace Apache.Ignite.Core.Impl.Client.Compute ctx.Socket.AddNotificationHandler(taskId, (stream, ex) => { + ctx.Socket.RemoveNotificationHandler(taskId); + if (ex != null) { tcs.TrySetException(ex); return; } - ctx.Socket.RemoveNotificationHandler(taskId); - var reader = ctx.Marshaller.StartUnmarshal(stream, keepBinary ? BinaryMode.ForceBinary : BinaryMode.Deserialize);