This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1bc9ab8 IGNITE-14033 .NET: Fix MessagingTest.TestRemoteListen flakiness 1bc9ab8 is described below commit 1bc9ab8e367df56c4febde9ac55a54c6c89b1ff1 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Fri Jan 22 17:53:27 2021 +0300 IGNITE-14033 .NET: Fix MessagingTest.TestRemoteListen flakiness * Improve assertions to understand the failures easier * Add `TestStopRemoteListenRemovesAllCallbacksUponExit` to demonstrate the problem, ignore with a ticket link * Add `AssertHandleRegistryHasItems` with a timeout to fix the flakiness --- .../Apache.Ignite.Core.Tests/MessagingTest.cs | 128 ++++++++++++++++++--- 1 file changed, 110 insertions(+), 18 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs index 659afa7..ead9763 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs @@ -335,13 +335,34 @@ namespace Apache.Ignite.Core.Tests } /// <summary> + /// Tests that <see cref="IMessaging.StopRemoteListen"/> guarantees that all handlers are removed + /// upon method exit. + /// </summary> + [Test] + [Ignore("IGNITE-14032")] + public void TestStopRemoteListenRemovesAllCallbacksUponExit() + { + const string topic = "topic"; + + var messaging =_grid1.GetMessaging(); + var listenId = messaging.RemoteListen(MessagingTestHelper.GetListener("first"), topic); + + TestUtils.AssertHandleRegistryHasItems(-1, 1, _grid1, _grid2, _grid3); + + messaging.Send(1, topic); + messaging.StopRemoteListen(listenId); + + TestUtils.AssertHandleRegistryHasItems(-1, 0, _grid1, _grid2, _grid3); + } + + /// <summary> /// Tests RemoteListen. /// </summary> private void TestRemoteListen(object topic, bool async = false) { var messaging =_grid1.GetMessaging(); - var listener = MessagingTestHelper.GetListener(); + var listener = MessagingTestHelper.GetListener("first"); var listenId = async ? messaging.RemoteListenAsync(listener, topic).Result : messaging.RemoteListen(listener, topic); @@ -353,9 +374,10 @@ namespace Apache.Ignite.Core.Tests CheckNoMessage(NextId()); // Test multiple subscriptions for the same filter + var listener2 = MessagingTestHelper.GetListener("second"); var listenId2 = async - ? messaging.RemoteListenAsync(listener, topic).Result - : messaging.RemoteListen(listener, topic); + ? messaging.RemoteListenAsync(listener2, topic).Result + : messaging.RemoteListen(listener2, topic); CheckSend(topic, msg: messaging, remoteListen: true, repeatMultiplier: 2); // expect twice the messages @@ -364,6 +386,14 @@ namespace Apache.Ignite.Core.Tests else messaging.StopRemoteListen(listenId2); + // Wait for all to unsubscribe: StopRemoteListen (both sync and async) does not remove remote listeners + // upon exit. Remote listeners are removed with disco messages after some delay - + // see TestStopRemoteListenRemovesAllCallbacksUponExit. + TestUtils.AssertHandleRegistryHasItems( + (int)MessagingTestHelper.SleepTimeout.TotalMilliseconds, + 1, + _grid1, _grid2, _grid3); + CheckSend(topic, msg: messaging, remoteListen: true); // back to normal after unsubscription // Test message type mismatch @@ -464,7 +494,7 @@ namespace Apache.Ignite.Core.Tests if (sharedResult.Length != 0) { Assert.Fail("Unexpected messages ({0}): {1}; last sent message: {2}", sharedResult.Length, - string.Join(",", sharedResult), lastMsg); + string.Join(",", sharedResult.Select(x => x.ToString())), lastMsg); } } @@ -570,7 +600,7 @@ namespace Apache.Ignite.Core.Tests public static class MessagingTestHelper { /** */ - public static readonly ConcurrentStack<string> ReceivedMessages = new ConcurrentStack<string>(); + public static readonly ConcurrentStack<ReceivedMessage> ReceivedMessages = new ConcurrentStack<ReceivedMessage>(); /** */ private static readonly ConcurrentStack<string> Failures = new ConcurrentStack<string>(); @@ -579,9 +609,6 @@ namespace Apache.Ignite.Core.Tests private static readonly CountdownEvent ReceivedEvent = new CountdownEvent(0); /** */ - private static readonly ConcurrentStack<Guid> LastNodeIds = new ConcurrentStack<Guid>(); - - /** */ public static volatile bool ListenResult = true; /** */ @@ -598,7 +625,6 @@ namespace Apache.Ignite.Core.Tests { ReceivedMessages.Clear(); ReceivedEvent.Reset(expectedCount); - LastNodeIds.Clear(); } /// <summary> @@ -611,18 +637,28 @@ namespace Apache.Ignite.Core.Tests public static void VerifyReceive(IClusterGroup cluster, IEnumerable<string> expectedMessages, Func<IEnumerable<string>, IEnumerable<string>> resultFunc, int expectedRepeat) { + expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat)).ToArray(); + var expectedMessagesStr = string.Join(", ", expectedMessages); + // check if expected message count has been received; Wait returns false if there were none. Assert.IsTrue(ReceivedEvent.Wait(MessageTimeout), string.Format("expectedMessages: {0}, expectedRepeat: {1}, remaining: {2}", - expectedMessages, expectedRepeat, ReceivedEvent.CurrentCount)); + expectedMessagesStr, expectedRepeat, ReceivedEvent.CurrentCount)); - expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat)); + var receivedMessages = ReceivedMessages.ToArray(); + var actualMessages = resultFunc(receivedMessages.Select(m => m.Message)).ToArray(); - Assert.AreEqual(expectedMessages, resultFunc(ReceivedMessages)); + CollectionAssert.AreEqual( + expectedMessages, + actualMessages, + string.Format("Expected messages: '{0}', actual messages: '{1}', expectedRepeat: {2}", + expectedMessagesStr, + string.Join(", ", receivedMessages.Select(x => x.ToString())), + expectedRepeat)); // check that all messages came from local node. var localNodeId = cluster.Ignite.GetCluster().GetLocalNode().Id; - Assert.AreEqual(localNodeId, LastNodeIds.Distinct().Single()); + Assert.AreEqual(localNodeId, ReceivedMessages.Select(m => m.NodeId).Distinct().Single()); AssertFailures(); } @@ -631,9 +667,9 @@ namespace Apache.Ignite.Core.Tests /// Gets the message listener. /// </summary> /// <returns>New instance of message listener.</returns> - public static IMessageListener<string> GetListener() + public static RemoteListener GetListener(string name = null) { - return new RemoteListener(); + return new RemoteListener(name); } /// <summary> @@ -651,15 +687,25 @@ namespace Apache.Ignite.Core.Tests /// <summary> /// Remote listener. /// </summary> - private class RemoteListener : IMessageListener<string> + public class RemoteListener : IMessageListener<string> { + /** */ + private readonly string _name; + + /** */ + public RemoteListener(string name) + { + _name = name; + } + /** <inheritdoc /> */ public bool Invoke(Guid nodeId, string message) { + var receivedMessage = new ReceivedMessage(message, nodeId, GetHashCode(), _name); + try { - LastNodeIds.Push(nodeId); - ReceivedMessages.Push(message); + ReceivedMessages.Push(receivedMessage); ReceivedEvent.Signal(); @@ -674,6 +720,52 @@ namespace Apache.Ignite.Core.Tests } } } + + /// <summary> + /// Received message data. + /// </summary> + public class ReceivedMessage + { + /** */ + private readonly string _message; + + /** */ + private readonly Guid _nodeId; + + /** */ + private readonly int _listenerId; + + /** */ + private readonly string _listenerName; + + /** */ + public ReceivedMessage(string message, Guid nodeId, int listenerId, string listenerName) + { + _message = message; + _nodeId = nodeId; + _listenerId = listenerId; + _listenerName = listenerName; + } + + /** */ + public string Message + { + get { return _message; } + } + + /** */ + public Guid NodeId + { + get { return _nodeId; } + } + + /** <inheritdoc /> */ + public override string ToString() + { + return string.Format( + "ReceivedMessage [{0}, {1}, {2}, {3}]", _message, _nodeId, _listenerId, _listenerName); + } + } } /// <summary>