IGNITE-1912: Fixed a bug preventing continuous query from correct work with value types.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b96886dd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b96886dd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b96886dd Branch: refs/heads/ignite-1753-1282 Commit: b96886ddb9fae2d77b07acd7303ec6548a65ed63 Parents: 1de6539 Author: Pavel Tupitsyn <ptupit...@gridgain.com> Authored: Tue Nov 17 16:43:14 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Nov 17 16:43:14 2015 +0300 ---------------------------------------------------------------------- .../Continuous/ContinuousQueryAbstractTest.cs | 77 +++++++++++++++++--- .../Cache/Event/ICacheEntryEvent.cs | 7 +- .../Impl/Binary/BinaryReader.cs | 43 ++++++++--- .../Impl/Cache/Event/CacheEntryCreateEvent.cs | 6 ++ .../Impl/Cache/Event/CacheEntryRemoveEvent.cs | 6 ++ .../Impl/Cache/Event/CacheEntryUpdateEvent.cs | 6 ++ .../Query/Continuous/ContinuousQueryUtils.cs | 37 +++------- 7 files changed, 134 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs index 8005e83..b81405b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs @@ -32,9 +32,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl; + using Apache.Ignite.Core.Impl.Cache.Event; using Apache.Ignite.Core.Resource; using NUnit.Framework; - using CQU = Apache.Ignite.Core.Impl.Cache.Query.Continuous.ContinuousQueryUtils; /// <summary> /// Tests for continuous query. @@ -637,6 +637,56 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous (cbEvt.entries.First().Value as IBinaryObject).Deserialize<PortableEntry>()); } } + /// <summary> + /// Test value types (special handling is required for nulls). + /// </summary> + [Test] + public void TestValueTypes() + { + var cache = grid1.GetCache<int, int>(cacheName); + + var qry = new ContinuousQuery<int, int>(new Listener<int>()); + + var key = PrimaryKey(cache); + + using (cache.QueryContinuous(qry)) + { + // First update + cache.Put(key, 1); + + CallbackEvent cbEvt; + + Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500)); + var cbEntry = cbEvt.entries.Single(); + Assert.IsFalse(cbEntry.HasOldValue); + Assert.IsTrue(cbEntry.HasValue); + Assert.AreEqual(key, cbEntry.Key); + Assert.AreEqual(null, cbEntry.OldValue); + Assert.AreEqual(1, cbEntry.Value); + + // Second update + cache.Put(key, 2); + + Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500)); + cbEntry = cbEvt.entries.Single(); + Assert.IsTrue(cbEntry.HasOldValue); + Assert.IsTrue(cbEntry.HasValue); + Assert.AreEqual(key, cbEntry.Key); + Assert.AreEqual(1, cbEntry.OldValue); + Assert.AreEqual(2, cbEntry.Value); + + // Remove + cache.Remove(key); + + Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500)); + cbEntry = cbEvt.entries.Single(); + Assert.IsTrue(cbEntry.HasOldValue); + Assert.IsFalse(cbEntry.HasValue); + Assert.AreEqual(key, cbEntry.Key); + Assert.AreEqual(2, cbEntry.OldValue); + Assert.AreEqual(null, cbEntry.Value); + } + } /// <summary> /// Test whether buffer size works fine. @@ -946,6 +996,20 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous } /// <summary> + /// Creates object-typed event. + /// </summary> + private static ICacheEntryEvent<object, object> CreateEvent<T, V>(ICacheEntryEvent<T,V> e) + { + if (!e.HasOldValue) + return new CacheEntryCreateEvent<object, object>(e.Key, e.Value); + + if (!e.HasValue) + return new CacheEntryRemoveEvent<object, object>(e.Key, e.OldValue); + + return new CacheEntryUpdateEvent<object, object>(e.Key, e.OldValue, e.Value); + } + + /// <summary> /// Portable entry. /// </summary> public class PortableEntry @@ -1003,8 +1067,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous if (err) throw new Exception("Filter error."); - FILTER_EVTS.Add(new FilterEvent(ignite, - CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value))); + FILTER_EVTS.Add(new FilterEvent(ignite, CreateEvent(evt))); return res; } @@ -1090,13 +1153,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous /** <inheritDoc /> */ public void OnEvent(IEnumerable<ICacheEntryEvent<int, V>> evts) { - ICollection<ICacheEntryEvent<object, object>> entries0 = - new List<ICacheEntryEvent<object, object>>(); - - foreach (ICacheEntryEvent<int, V> evt in evts) - entries0.Add(CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value)); - - CB_EVTS.Add(new CallbackEvent(entries0)); + CB_EVTS.Add(new CallbackEvent(evts.Select(CreateEvent).ToList())); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Event/ICacheEntryEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Event/ICacheEntryEvent.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Event/ICacheEntryEvent.cs index 9c2665e..9ecaee0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Event/ICacheEntryEvent.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Event/ICacheEntryEvent.cs @@ -20,7 +20,7 @@ namespace Apache.Ignite.Core.Cache.Event /// <summary> /// Cache entry event. /// </summary> - public interface ICacheEntryEvent<TK, TV> : ICacheEntry<TK, TV> + public interface ICacheEntryEvent<out TK, out TV> : ICacheEntry<TK, TV> { /// <summary> /// Event type. @@ -33,6 +33,11 @@ namespace Apache.Ignite.Core.Cache.Event TV OldValue { get; } /// <summary> + /// Whether value exists. + /// </summary> + bool HasValue { get; } + + /// <summary> /// Whether old value exists. /// </summary> bool HasOldValue { get; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs index 1dec7ba..53f6f4a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs @@ -503,9 +503,11 @@ namespace Apache.Ignite.Core.Impl.Binary /// <summary> /// Enable detach mode for the next object read. /// </summary> - public void DetachNext() + public BinaryReader DetachNext() { _detach = true; + + return this; } /// <summary> @@ -514,6 +516,21 @@ namespace Apache.Ignite.Core.Impl.Binary /// <returns>Deserialized object.</returns> public T Deserialize<T>() { + T res; + + if (!TryDeserialize(out res) && default(T) != null) + throw new BinaryObjectException(string.Format("Invalid data on deserialization. " + + "Expected: '{0}' But was: null", typeof (T))); + + return res; + } + + /// <summary> + /// Deserialize object. + /// </summary> + /// <returns>Deserialized object.</returns> + public bool TryDeserialize<T>(out T res) + { int pos = Stream.Position; byte hdr = Stream.ReadByte(); @@ -525,24 +542,32 @@ namespace Apache.Ignite.Core.Impl.Binary switch (hdr) { case BinaryUtils.HdrNull: - if (default(T) != null) - throw new BinaryObjectException(string.Format("Invalid data on deserialization. " + - "Expected: '{0}' But was: null", typeof (T))); + res = default(T); - return default(T); + return false; case BinaryUtils.HdrHnd: - return ReadHandleObject<T>(pos); + res = ReadHandleObject<T>(pos); + + return true; case BinaryUtils.HdrFull: - return ReadFullObject<T>(pos); + res = ReadFullObject<T>(pos); + + return true; case BinaryUtils.TypeBinary: - return ReadBinaryObject<T>(doDetach); + res = ReadBinaryObject<T>(doDetach); + + return true; } if (BinaryUtils.IsPredefinedType(hdr)) - return BinarySystemHandlers.ReadSystemType<T>(hdr, this); + { + res = BinarySystemHandlers.ReadSystemType<T>(hdr, this); + + return true; + } throw new BinaryObjectException("Invalid header on deserialization [pos=" + pos + ", hdr=" + hdr + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs index 8d9dfef..fc49903 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryCreateEvent.cs @@ -60,6 +60,12 @@ namespace Apache.Ignite.Core.Impl.Cache.Event } /** <inheritdoc /> */ + public bool HasValue + { + get { return true; } + } + + /** <inheritdoc /> */ public bool HasOldValue { get { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs index a44a800..cce3d65 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryRemoveEvent.cs @@ -60,6 +60,12 @@ namespace Apache.Ignite.Core.Impl.Cache.Event } /** <inheritdoc /> */ + public bool HasValue + { + get { return false; } + } + + /** <inheritdoc /> */ public bool HasOldValue { get { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs index e6fb927..6d954e5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Event/CacheEntryUpdateEvent.cs @@ -65,6 +65,12 @@ namespace Apache.Ignite.Core.Impl.Cache.Event } /** <inheritdoc /> */ + public bool HasValue + { + get { return true; } + } + + /** <inheritdoc /> */ public bool HasOldValue { get { return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b96886dd/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs index 96fd621..729b251 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryUtils.cs @@ -74,41 +74,22 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous /// <returns>Event.</returns> private static ICacheEntryEvent<TK, TV> ReadEvent0<TK, TV>(BinaryReader reader) { - reader.DetachNext(); - TK key = reader.ReadObject<TK>(); + var key = reader.DetachNext().ReadObject<TK>(); - reader.DetachNext(); - TV oldVal = reader.ReadObject<TV>(); + // Read as objects: TV may be value type + TV oldVal, val; - reader.DetachNext(); - TV val = reader.ReadObject<TV>(); + var hasOldVal = reader.DetachNext().TryDeserialize(out oldVal); + var hasVal = reader.DetachNext().TryDeserialize(out val); - return CreateEvent(key, oldVal, val); - } - - /// <summary> - /// Create event. - /// </summary> - /// <param name="key">Key.</param> - /// <param name="oldVal">Old value.</param> - /// <param name="val">Value.</param> - /// <returns>Event.</returns> - public static ICacheEntryEvent<TK, TV> CreateEvent<TK, TV>(TK key, TV oldVal, TV val) - { - if (oldVal == null) - { - Debug.Assert(val != null); + Debug.Assert(hasVal || hasOldVal); + if (!hasOldVal) return new CacheEntryCreateEvent<TK, TV>(key, val); - } - - if (val == null) - { - Debug.Assert(oldVal != null); + if (!hasVal) return new CacheEntryRemoveEvent<TK, TV>(key, oldVal); - } - + return new CacheEntryUpdateEvent<TK, TV>(key, oldVal, val); } }