This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6c69aa31 IGNITE-12346 .NET: Fix query cursor thread safety 6c69aa31 is described below commit 6c69aa31edc8519fc3dfcda735afaf85e40dce49 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Thu Jul 9 12:43:33 2020 +0300 IGNITE-12346 .NET: Fix query cursor thread safety --- .../Cache/Query/CacheQueriesTest.cs | 42 +++++++++- .../Impl/Cache/Query/QueryCursorBase.cs | 96 +++++++++++++--------- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 2 + 3 files changed, 101 insertions(+), 39 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs index 61a9844..5418035 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs @@ -24,6 +24,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Text; + using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Configuration; @@ -518,6 +520,41 @@ namespace Apache.Ignite.Core.Tests.Cache.Query } /// <summary> + /// Checks that scan query is thread-safe and throws correct exception when disposed from another thread. + /// </summary> + [Test] + public void TestScanQueryDisposedFromAnotherThreadThrowsObjectDisposedException() + { + var cache = GetIgnite().GetOrCreateCache<int, int>(TestUtils.TestName); + + const int totalCount = 10000; + cache.PutAll(Enumerable.Range(1, totalCount).ToDictionary(x => x, x => x)); + + var scanQuery = new ScanQuery<int, int> + { + Filter = new ScanQueryFilter<int> {AcceptAll = true} + }; + + var cursor = cache.Query(scanQuery); + + long count = 0; + Task.Factory.StartNew(() => + { + // ReSharper disable once AccessToModifiedClosure + while (Interlocked.Read(ref count) < totalCount / 10) { } + cursor.Dispose(); + }); + + Assert.Throws<ObjectDisposedException>(() => + { + foreach (var unused in cursor) + { + Interlocked.Increment(ref count); + } + }); + } + + /// <summary> /// Tests that query attempt on non-indexed cache causes an exception. /// </summary> [Test] @@ -1116,6 +1153,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query // Error flag public bool ThrowErr { get; set; } + // Error flag + public bool AcceptAll { get; set; } + // Injection test [InstanceResource] public IIgnite Ignite { get; set; } @@ -1128,7 +1168,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query if (ThrowErr) throw new Exception(ErrMessage); - return entry.Key < 50; + return entry.Key < 50 || AcceptAll; } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs index 63004a9..b65ae11 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursorBase.cs @@ -41,6 +41,9 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /** Read func. */ private readonly Func<BinaryReader, T> _readFunc; + + /** Lock object. */ + private readonly object _syncRoot = new object(); /** Whether "GetAll" was called. */ private bool _getAllCalled; @@ -93,14 +96,17 @@ namespace Apache.Ignite.Core.Impl.Cache.Query throw new InvalidOperationException("Failed to get all entries because GetEnumerator() " + "method has already been called."); - ThrowIfDisposed(); + lock (_syncRoot) + { + ThrowIfDisposed(); - var res = GetAllInternal(); + var res = GetAllInternal(); - _getAllCalled = true; - _hasNext = false; + _getAllCalled = true; + _hasNext = false; - return res; + return res; + } } #region Public IEnumerable methods @@ -148,13 +154,16 @@ namespace Apache.Ignite.Core.Impl.Cache.Query { ThrowIfDisposed(); - if (_batchPos == BatchPosBeforeHead) - throw new InvalidOperationException("MoveNext has not been called."); - - if (_batch == null) - throw new InvalidOperationException("Previous call to MoveNext returned false."); + lock (_syncRoot) + { + if (_batchPos == BatchPosBeforeHead) + throw new InvalidOperationException("MoveNext has not been called."); - return _batch[_batchPos]; + if (_batch == null) + throw new InvalidOperationException("Previous call to MoveNext returned false."); + + return _batch[_batchPos]; + } } } @@ -169,22 +178,25 @@ namespace Apache.Ignite.Core.Impl.Cache.Query { ThrowIfDisposed(); - if (_batch == null) + lock (_syncRoot) { - if (_batchPos == BatchPosBeforeHead) - // Standing before head, let's get batch and advance position. - RequestBatch(); - } - else - { - _batchPos++; + if (_batch == null) + { + if (_batchPos == BatchPosBeforeHead) + // Standing before head, let's get batch and advance position. + RequestBatch(); + } + else + { + _batchPos++; - if (_batch.Length == _batchPos) - // Reached batch end => request another. - RequestBatch(); - } + if (_batch.Length == _batchPos) + // Reached batch end => request another. + RequestBatch(); + } - return _batch != null; + return _batch != null; + } } /** <inheritdoc /> */ @@ -205,9 +217,14 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// </summary> private void RequestBatch() { - _batch = _hasNext ? GetBatch() : null; + lock (_syncRoot) + { + ThrowIfDisposed(); + + _batch = _hasNext ? GetBatch() : null; - _batchPos = 0; + _batchPos = 0; + } } /// <summary> @@ -245,28 +262,31 @@ namespace Apache.Ignite.Core.Impl.Cache.Query var size = reader.ReadInt(); - if (size == 0) + lock (_syncRoot) { - _hasNext = false; - return null; - } + if (size == 0) + { + _hasNext = false; + return null; + } - var res = new T[size]; + var res = new T[size]; - for (var i = 0; i < size; i++) - { - res[i] = _readFunc(reader); - } + for (var i = 0; i < size; i++) + { + res[i] = _readFunc(reader); + } - _hasNext = stream.ReadBool(); + _hasNext = stream.ReadBool(); - return res; + return res; + } } /** <inheritdoc /> */ public void Dispose() { - lock (this) + lock (_syncRoot) { if (_disposed) { diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index c941134..ea50b29 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -377,6 +377,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged { var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong(), true); + Debug.Assert(t != null); + return t.Invoke(stream); } }